diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index b01872f226..5d9bba7b7f 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -255,4 +255,52 @@ public long now() { public int degreeOfParallelism() { return Runtime.getRuntime().availableProcessors(); } + /** + * Schedule a task to be run immediately. + * @param r the task to run immediately + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r) { + return schedule(new Action0() { + @Override + public void call() { + r.run(); + } + }); + } + + /** + * Schedule a task to be run after the delay time. + * @param r the task to schedule + * @param delayTime the time to delay the execution + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r, long delayTime, TimeUnit unit) { + return schedule(new Action0() { + @Override + public void call() { + r.run(); + } + }, delayTime, unit); + } + + /** + * Schedule a task to be run after the delay time and after + * each period. + * @param r the task to schedule + * @param initialDelay the initial delay of the schedule + * @param period the between period of the schedule + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(new Action0() { + @Override + public void call() { + r.run(); + } + }, initialDelay, period, unit); + + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index 563e609612..a56e90eff5 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -25,6 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.IncrementalSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -47,18 +48,17 @@ public ExecutorScheduler(ScheduledExecutorService executor) { @Override public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { - final CompositeSubscription subscriptions = new CompositeSubscription(); + CompositeSubscription subscription = new CompositeSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); - ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Subscription s = action.call(ExecutorScheduler.this, state); - subscriptions.add(s); - } - }, initialDelay, period, unit); + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); - subscriptions.add(Subscriptions.from(f)); - return subscriptions; + _scheduler.schedulePeriodically(state, action, initialDelay, period, unit); + + return subscription; } else { return super.schedulePeriodically(state, action, initialDelay, period, unit); @@ -67,81 +67,102 @@ public void run() { @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; + CompositeSubscription subscription = new CompositeSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); + + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); + + _scheduler.schedule(state, action, delayTime, unit); + + return subscription; + } + + @Override + public Subscription schedule(T state, Func2 action) { // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + CompositeSubscription subscription = new CompositeSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); + + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); + + _scheduler.schedule(state, action); + return subscription; + } + + @Override + public Subscription scheduleRunnable(Runnable r, long delayTime, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { // we are a ScheduledExecutorService so can do proper scheduling - ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { - @Override - public void run() { - // when the delay has passed we now do the work on the actual scheduler - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); + ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are not a ScheduledExecutorService so can't directly schedule if (delayTime == 0) { // no delay so put on the thread-pool right now - Subscription s = schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); + return scheduleRunnable(r); } else { // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService // to handle the scheduling and once it's ready then execute on this Executor - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - // now execute on the real Executor (by using the other overload that schedules for immediate execution) - Subscription s = _scheduler.schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + return Subscriptions.from(f); } } - return subscription; } - + @Override - public Subscription schedule(T state, Func2 action) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - - // work to be done on a thread - Runnable r = new Runnable() { - @Override - public void run() { - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }; - + public Subscription scheduleRunnable(Runnable r) { // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe Future f = ((ExecutorService) executor).submit(r); // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are the lowest common denominator so can't unsubscribe once we execute executor.execute(r); + return Subscriptions.empty(); } + } - return subscription; + @Override + public Subscription scheduleRunnable(final Runnable r, long initialDelay, + final long period, final TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit); + return Subscriptions.from(f); + } else { + final IncrementalSubscription fs = new IncrementalSubscription(); + Runnable rerun = new Runnable() { + @Override + public void run() { + if (!fs.isUnsubscribed()) { + long time = System.nanoTime(); + r.run(); + long delta = Math.max(0L, System.nanoTime() - time); + long periodNanos = Math.max(0L, unit.toNanos(period) - delta); + + long index = fs.nextIndex(); + Subscription s = scheduleRunnable(this, periodNanos, TimeUnit.NANOSECONDS); + fs.compareExchange(index, s, false); + } + } + }; + + long index = fs.nextIndex(); + + Subscription s = scheduleRunnable(rerun, initialDelay, unit); + fs.compareExchange(index, s, false); + return fs; + } } - } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java new file mode 100644 index 0000000000..87680fedb1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java @@ -0,0 +1,154 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.TimeUnit; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.IncrementalSubscription; +import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Do not re-enter the main scheduler's schedule() method as it will + * unnecessarily chain the subscriptions of every invocation. + */ +public final class ReentrantScheduler extends Scheduler { + final Scheduler parent; + final IncrementalSubscription scheduleSub; + final IncrementalSubscription actionSub; + final CompositeSubscription composite; + + public ReentrantScheduler( + Scheduler parent, + IncrementalSubscription scheduleSub, + IncrementalSubscription actionSub, + CompositeSubscription composite) { + this.parent = parent; + this.scheduleSub = scheduleSub; + this.actionSub = actionSub; + this.composite = composite; + } + + @Override + public Subscription schedule(T state, Func2 action) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + long index = actionSub.nextIndex(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + + actionSub.compareExchange(index, discardableAction, false); + + Runnable r = new RunTask(discardableAction); + + long sindex = scheduleSub.nextIndex(); + Subscription s = parent.scheduleRunnable(r); + scheduleSub.compareExchange(sindex, s, false); + + return s; + } + + @Override + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + + long index = actionSub.nextIndex(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + actionSub.compareExchange(index, discardableAction, false); + + Runnable r = new RunTask(discardableAction); + + long sindex = scheduleSub.nextIndex(); + Subscription s = parent.scheduleRunnable(r, delayTime, unit); + scheduleSub.compareExchange(sindex, s, false); + + return s; + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + + long index = actionSub.nextIndex(); + final PeriodicAction periodicAction = new PeriodicAction(state, action); + actionSub.compareExchange(index, periodicAction, false); + + Runnable r = new RunTask(periodicAction); + + long sindex = scheduleSub.nextIndex(); + Subscription s = parent.scheduleRunnable(r, initialDelay, period, unit); + scheduleSub.compareExchange(sindex, s, false); + + return s; + } + /** The task runner. */ + private final class RunTask implements Runnable { + final Func1 action; + + public RunTask(Func1 action) { + this.action = action; + } + + @Override + public void run() { + long index = actionSub.nextIndex(); + Subscription s = action.call(ReentrantScheduler.this); + actionSub.compareExchange(index, s, false); + } + + } + /** + * An action that calls the underlying function in a periodic environment. + * @param the state value type + */ + private static final class PeriodicAction implements Subscription, Func1 { + final T state; + final Func2 underlying; + final SerialSubscription ssub; + + public PeriodicAction(T state, Func2 underlying) { + this.state = state; + this.underlying = underlying; + this.ssub = new SerialSubscription(); + } + + @Override + public Subscription call(Scheduler scheduler) { + if (!ssub.isUnsubscribed()) { + Subscription s = underlying.call(scheduler, state); + ssub.setSubscription(s); + return ssub; + } + return Subscriptions.empty(); + } + + @Override + public void unsubscribe() { + ssub.unsubscribe(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java new file mode 100644 index 0000000000..5ebdf8c1ce --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java @@ -0,0 +1,171 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +/** + * Subscription that swaps the underlying subscription only if + * the associated index is greater than the current. + * + * Usage example: + *
+ * IncrementalSubscription is = ...
+ * 
+ * long index = is.nextIndex();
+ * 
+ * // do some recursive work
+ * Subscription s = ...
+ * 
+ * is.compareExchange(index, s, false);
+ * 
+ * + * This will replace the current subscription only if its index + * is less than the new index. This way, if a recursive call + * has already set a more recent subscription, this won't + * replace it back to an older one. + */ +public class IncrementalSubscription implements Subscription { + /** The current index. */ + protected final AtomicLong index = new AtomicLong(); + /** The current reference. */ + protected final AtomicReference reference = new AtomicReference(); + /** The unsubscription sentinel. */ + private static final IndexedRef UNSUBSCRIBE_SENTINEL + = new IndexedRef(Long.MAX_VALUE, Subscriptions.empty()); + + public boolean isUnsubscribed() { + return reference.get() == UNSUBSCRIBE_SENTINEL; + } + + public IncrementalSubscription() { + this(Subscriptions.empty()); + } + public IncrementalSubscription(Subscription initial) { + reference.set(new IndexedRef(0L, initial)); + } + + public Subscription getSubscription() { + return reference.get().value(); // the sentinel holds empty anyway + } + + /** + * Generate the next index. + * @return the next index + */ + public long nextIndex() { + return index.incrementAndGet(); + } + + /** + * Return the current index. For testing purposes only. + * @return the current index + */ + /* public */protected long index() { + return index.get(); + } + /** + * Sets the given subscription as the latest value. + * @param newValue the new subscription to set + * @param unsubscribeOld unsubscribe the old subscription? + */ + public void setSubscription(Subscription newValue, boolean unsubscribeOld) { + do { + IndexedRef r = reference.get(); + if (r == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return; + } + long newIndex = nextIndex(); + if (r.index() < newIndex) { + IndexedRef newRef = new IndexedRef(newIndex, newValue); + if (reference.compareAndSet(r, newRef)) { + if (unsubscribeOld) { + r.unsubscribe(); + } + return; + } + } + } while (true); + } + /** + * Compare the current index with the new index and if newIndex + * is greater, exchange the subscription with the new value + * and optionally unsubscribe the old one. + * @param newIndex + * @param newValue + * @param unsubscribeOld + * @return true if the exchange succeeded, false if not. + */ + public boolean compareExchange(long newIndex, Subscription newValue, boolean unsubscribeOld) { + do { + IndexedRef r = reference.get(); + if (r == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return false; + } + if (r.index >= newIndex) { + return false; + } + IndexedRef newRef = new IndexedRef(newIndex, newValue); + if (reference.compareAndSet(r, newRef)) { + if (unsubscribeOld) { + r.unsubscribe(); + } + return true; + } + } while (true); + } + + + /** The indexed reference object. */ + protected static final class IndexedRef implements Subscription { + private final long index; + private final Subscription value; + + public IndexedRef(long index, Subscription value) { + this.index = index; + this.value = value; + } + + public long index() { + return index; + } + + public Subscription value() { + return value; + } + + @Override + public void unsubscribe() { + if (value != null) { + value.unsubscribe(); + } + } + } + + @Override + public void unsubscribe() { + reference.getAndSet(UNSUBSCRIBE_SENTINEL).unsubscribe(); + } + +} diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java index 62e74f5798..f91dbbc908 100644 --- a/rxjava-core/src/test/java/rx/SchedulersTest.java +++ b/rxjava-core/src/test/java/rx/SchedulersTest.java @@ -327,12 +327,13 @@ public void testRecursiveScheduler2() throws InterruptedException { // use latches instead of Thread.sleep final CountDownLatch latch = new CountDownLatch(10); final CountDownLatch completionLatch = new CountDownLatch(1); + final BooleanSubscription cancel = new BooleanSubscription(); - Observable obs = Observable.create(new OnSubscribeFunc() { + Observable obs = Observable.create(new Observable.OnSubscribeFunc() { @Override public Subscription onSubscribe(final Observer observer) { - return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() { + return Schedulers.threadPoolForComputation().schedule(cancel, new Func2() { @Override public Subscription call(Scheduler scheduler, BooleanSubscription cancel) { if (cancel.isUnsubscribed()) { @@ -378,6 +379,15 @@ public void onNext(Integer args) { fail("Timed out waiting on onNext latch"); } + + // wait some turn to let the action run + Thread.sleep(100); + + cancel.unsubscribe(); + + // allow seeing the cancellation + Thread.sleep(100); + // now unsubscribe and ensure it stops the recursive loop subscribe.unsubscribe(); System.out.println("unsubscribe"); diff --git a/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java new file mode 100644 index 0000000000..9bb03d07a7 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java @@ -0,0 +1,104 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Test; +import rx.Observable; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class ReentrantSchedulerTest { + @Test + public void testReentrantSchedulerIsProvided() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + scheduler.schedule(1, new Func2() { + + @Override + public Subscription call(Scheduler t1, Integer t2) { + ref.set(t1); + cdl.countDown(); + return Subscriptions.empty(); + } + }); + + if (!cdl.await(1000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + assertTrue(ref.get() instanceof ReentrantScheduler); + } + + @Test + public void testReentrantSchedulerIsProvided2() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + scheduler.schedule(1, new Func2() { + + @Override + public Subscription call(Scheduler t1, Integer t2) { + ref.set(t1); + cdl.countDown(); + return Subscriptions.empty(); + } + }, 100, TimeUnit.MILLISECONDS); + + if (!cdl.await(1000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + assertTrue(ref.get() instanceof ReentrantScheduler); + } + + @Test + public void testReentrantSchedulerIsProvided3() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + Subscription s = scheduler.schedulePeriodically(1, new Func2() { + int count; + @Override + public Subscription call(Scheduler t1, Integer t2) { + if (count++ == 3) { + cdl.countDown(); + ref.set(t1); + } + return Subscriptions.empty(); + } + }, 100, 100, TimeUnit.MILLISECONDS); + + if (!cdl.await(5000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + s.unsubscribe(); + + assertTrue(ref.get() instanceof ReentrantScheduler); + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java new file mode 100644 index 0000000000..30b778c231 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java @@ -0,0 +1,73 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +public class IncrementalSubscriptionTest { + IncrementalSubscription is; + @Before + public void before() { + is = new IncrementalSubscription(); + } + @Test + public void testSimple() { + assertFalse(is.isUnsubscribed()); + + BooleanSubscription b1 = new BooleanSubscription(); + + is.setSubscription(b1, false); + + assertEquals(1L, is.index()); + + long index = is.nextIndex(); + + assertTrue(is.compareExchange(index, b1, false)); + + assertFalse(is.compareExchange(index, b1, false)); + + assertEquals(2L, is.index()); + + index = is.nextIndex(); + assertTrue(is.compareExchange(index, b1, false)); + + is.unsubscribe(); + + BooleanSubscription b2 = new BooleanSubscription(); + + assertFalse(is.compareExchange(index, b2, false)); + + assertTrue(is.isUnsubscribed()); + assertTrue(b2.isUnsubscribed()); + + assertEquals(3L, is.index()); + } + @Test + public void testSwapOrder() { + BooleanSubscription b1 = new BooleanSubscription(); + + long idx1 = is.nextIndex(); + + long idx2 = is.nextIndex(); + assertTrue(is.compareExchange(idx2, b1, false)); + + assertFalse(is.compareExchange(idx1, b1, false)); + } +}