diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
index a94fd047ae..dd415d887c 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
@@ -15,103 +15,123 @@
*/
package rx.subscriptions;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
import rx.util.CompositeException;
/**
- * Subscription that represents a group of Subscriptions that are unsubscribed together.
+ * Subscription that represents a group of Subscriptions that are unsubscribed
+ * together.
*
- * @see Rx.Net equivalent CompositeDisposable
+ * @see Rx.Net
+ * equivalent CompositeDisposable
*/
public class CompositeSubscription implements Subscription {
+ private static final Set MUTATE_STATE = unmodifiableSet(new HashSet());
+ private static final Set UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet());
+
+ private final AtomicReference> reference = new AtomicReference>();
- /*
- * The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
- *
- * TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
- */
- private AtomicBoolean unsubscribed = new AtomicBoolean(false);
- private final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
-
- public CompositeSubscription(List subscriptions) {
- for (Subscription s : subscriptions) {
- this.subscriptions.put(s, Boolean.TRUE);
- }
+ public CompositeSubscription(final Subscription... subscriptions) {
+ reference.set(new HashSet(asList(subscriptions)));
}
- public CompositeSubscription(Subscription... subscriptions) {
- for (Subscription s : subscriptions) {
- this.subscriptions.put(s, Boolean.TRUE);
- }
+ public boolean isUnsubscribed() {
+ return reference.get() == UNSUBSCRIBED_STATE;
}
- /**
- * Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
- */
- public void clear() {
- Collection es = null;
- for (Subscription s : subscriptions.keySet()) {
- try {
+ public void add(final Subscription s) {
+ do {
+ final Set existing = reference.get();
+ if (existing == UNSUBSCRIBED_STATE) {
s.unsubscribe();
- this.subscriptions.remove(s);
- } catch (Throwable e) {
- if (es == null) {
- es = new ArrayList();
- }
- es.add(e);
+ break;
}
- }
- if (es != null) {
- throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
- }
- }
- /**
- * Remove the {@link Subscription} and unsubscribe it.
- *
- * @param s
- */
- public void remove(Subscription s) {
- this.subscriptions.remove(s);
- // also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
- s.unsubscribe();
+ if (reference.compareAndSet(existing, MUTATE_STATE)) {
+ existing.add(s);
+ reference.set(existing);
+ break;
+ }
+ } while (true);
}
- public boolean isUnsubscribed() {
- return unsubscribed.get();
+ public void remove(final Subscription s) {
+ do {
+ final Set subscriptions = reference.get();
+ if (subscriptions == UNSUBSCRIBED_STATE) {
+ s.unsubscribe();
+ break;
+ }
+
+ if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
+ // also unsubscribe from it:
+ // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
+ subscriptions.remove(s);
+ reference.set(subscriptions);
+ s.unsubscribe();
+ break;
+ }
+ } while (true);
}
- public synchronized void add(Subscription s) {
- if (unsubscribed.get()) {
- s.unsubscribe();
- } else {
- subscriptions.put(s, Boolean.TRUE);
- }
+ public void clear() {
+ do {
+ final Set subscriptions = reference.get();
+ if (subscriptions == UNSUBSCRIBED_STATE) {
+ break;
+ }
+
+ if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
+ final Set copy = new HashSet(
+ subscriptions);
+ subscriptions.clear();
+ reference.set(subscriptions);
+
+ for (final Subscription subscription : copy) {
+ subscription.unsubscribe();
+ }
+ break;
+ }
+ } while (true);
}
@Override
- public synchronized void unsubscribe() {
- if (unsubscribed.compareAndSet(false, true)) {
- Collection es = null;
- for (Subscription s : subscriptions.keySet()) {
- try {
- s.unsubscribe();
- } catch (Throwable e) {
- if (es == null) {
- es = new ArrayList();
+ public void unsubscribe() {
+ do {
+ final Set subscriptions = reference.get();
+ if (subscriptions == UNSUBSCRIBED_STATE) {
+ break;
+ }
+
+ if (subscriptions == MUTATE_STATE) {
+ continue;
+ }
+
+ if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
+ final Collection es = new ArrayList();
+ for (final Subscription s : subscriptions) {
+ try {
+ s.unsubscribe();
+ } catch (final Throwable e) {
+ es.add(e);
}
- es.add(e);
}
+ if (es.isEmpty()) {
+ break;
+ }
+ throw new CompositeException(
+ "Failed to unsubscribe to 1 or more subscriptions.", es);
}
- if (es != null) {
- throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
- }
- }
+ } while (true);
}
}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
index ebe084f11c..b206d37a7b 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
@@ -1,70 +1,62 @@
-/**
- * Copyright 2013 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rx.subscriptions;
-
-import rx.Subscription;
-
-/**
- * Represents a subscription whose underlying subscription can be swapped for another subscription
- * which causes the previous underlying subscription to be unsubscribed.
- *
- * @see Rx.Net equivalent SerialDisposable
- */
-public class SerialSubscription implements Subscription {
- private boolean unsubscribed;
- private Subscription subscription;
- private final Object gate = new Object();
-
- @Override
- public void unsubscribe() {
- Subscription toUnsubscribe = null;
- synchronized (gate) {
- if (!unsubscribed) {
- if (subscription != null) {
- toUnsubscribe = subscription;
- subscription = null;
- }
- unsubscribed = true;
- }
- }
- if (toUnsubscribe != null) {
- toUnsubscribe.unsubscribe();
- }
- }
-
- public Subscription getSubscription() {
- synchronized (gate) {
- return subscription;
- }
- }
-
- public void setSubscription(Subscription subscription) {
- Subscription toUnsubscribe = null;
- synchronized (gate) {
- if (!unsubscribed) {
- if (this.subscription != null) {
- toUnsubscribe = this.subscription;
- }
- this.subscription = subscription;
- } else {
- toUnsubscribe = subscription;
- }
- }
- if (toUnsubscribe != null) {
- toUnsubscribe.unsubscribe();
- }
- }
-}
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import static rx.subscriptions.Subscriptions.empty;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import rx.Subscription;
+
+/**
+ * Represents a subscription whose underlying subscription can be swapped for another subscription
+ * which causes the previous underlying subscription to be unsubscribed.
+ *
+ * @see Rx.Net equivalent SerialDisposable
+ */
+public class SerialSubscription implements Subscription {
+ private final AtomicReference reference = new AtomicReference(empty());
+
+ private static final Subscription UNSUBSCRIBED = new Subscription() {
+ @Override
+ public void unsubscribe() {
+ }
+ };
+
+ @Override
+ public void unsubscribe() {
+ setSubscription(UNSUBSCRIBED);
+ }
+
+ public void setSubscription(final Subscription subscription) {
+ do {
+ final Subscription current = reference.get();
+ if (current == UNSUBSCRIBED) {
+ subscription.unsubscribe();
+ break;
+ }
+ if (reference.compareAndSet(current, subscription)) {
+ current.unsubscribe();
+ break;
+ }
+ } while (true);
+ }
+
+ public Subscription getSubscription() {
+ final Subscription subscription = reference.get();
+ return subscription == UNSUBSCRIBED ? null : subscription;
+ }
+}
diff --git a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java
index 551a6b2f66..e916ac9795 100644
--- a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java
+++ b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java
@@ -15,8 +15,14 @@
*/
package rx.subscriptions;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
@@ -51,6 +57,48 @@ public void unsubscribe() {
assertEquals(2, counter.get());
}
+ @Test(timeout = 1000)
+ public void shouldUnsubscribeAll() throws InterruptedException {
+ final AtomicInteger counter = new AtomicInteger();
+ final CompositeSubscription s = new CompositeSubscription();
+
+ final int count = 10;
+ final CountDownLatch start = new CountDownLatch(1);
+ for (int i = 0; i < count; i++) {
+ s.add(new Subscription() {
+
+ @Override
+ public void unsubscribe() {
+ counter.incrementAndGet();
+ }
+ });
+ }
+
+ final List threads = new ArrayList();
+ for (int i = 0; i < count; i++) {
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ s.unsubscribe();
+ } catch (final InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+ };
+ t.start();
+ threads.add(t);
+ }
+
+ start.countDown();
+ for (final Thread t : threads) {
+ t.join();
+ }
+
+ assertEquals(count, counter.get());
+ }
+
@Test
public void testException() {
final AtomicInteger counter = new AtomicInteger();
@@ -144,4 +192,46 @@ public void unsubscribe() {
// we should have only unsubscribed once
assertEquals(1, counter.get());
}
+
+ @Test(timeout = 1000)
+ public void testUnsubscribeIdempotenceConcurrently()
+ throws InterruptedException {
+ final AtomicInteger counter = new AtomicInteger();
+ final CompositeSubscription s = new CompositeSubscription();
+
+ final int count = 10;
+ final CountDownLatch start = new CountDownLatch(1);
+ s.add(new Subscription() {
+
+ @Override
+ public void unsubscribe() {
+ counter.incrementAndGet();
+ }
+ });
+
+ final List threads = new ArrayList();
+ for (int i = 0; i < count; i++) {
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ s.unsubscribe();
+ } catch (final InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+ };
+ t.start();
+ threads.add(t);
+ }
+
+ start.countDown();
+ for (final Thread t : threads) {
+ t.join();
+ }
+
+ // we should have only unsubscribed once
+ assertEquals(1, counter.get());
+ }
}
diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
index cd63539c87..6c8648e986 100644
--- a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
+++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
@@ -1,76 +1,211 @@
-/**
- * Copyright 2013 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rx.subscriptions;
-
-import static org.mockito.Mockito.*;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.MockitoAnnotations;
-
-import rx.Subscription;
-
-public class SerialSubscriptionTests {
- private SerialSubscription serialSubscription;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- serialSubscription = new SerialSubscription();
- }
-
- @Test
- public void unsubscribingWithoutUnderlyingDoesNothing() {
- serialSubscription.unsubscribe();
- }
-
- @Test
- public void unsubscribingWithSingleUnderlyingUnsubscribes() {
- Subscription underlying = mock(Subscription.class);
- serialSubscription.setSubscription(underlying);
- underlying.unsubscribe();
- verify(underlying).unsubscribe();
- }
-
- @Test
- public void replacingFirstUnderlyingCausesUnsubscription() {
- Subscription first = mock(Subscription.class);
- serialSubscription.setSubscription(first);
- Subscription second = mock(Subscription.class);
- serialSubscription.setSubscription(second);
- verify(first).unsubscribe();
- }
-
- @Test
- public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
- Subscription first = mock(Subscription.class);
- serialSubscription.setSubscription(first);
- Subscription second = mock(Subscription.class);
- serialSubscription.setSubscription(second);
- serialSubscription.unsubscribe();
- verify(second).unsubscribe();
- }
-
- @Test
- public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription()
- {
- serialSubscription.unsubscribe();
- Subscription underlying = mock(Subscription.class);
- serialSubscription.setSubscription(underlying);
- verify(underlying).unsubscribe();
- }
-}
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import rx.Subscription;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SerialSubscriptionTests {
+ private SerialSubscription serialSubscription;
+
+ @Before
+ public void setUp() {
+ serialSubscription = new SerialSubscription();
+ }
+
+ @Test
+ public void unsubscribingWithoutUnderlyingDoesNothing() {
+ serialSubscription.unsubscribe();
+ }
+
+ @Test
+ public void getSubscriptionShouldReturnSubscriptionAfterUnsubscribe() {
+ final Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+ serialSubscription.unsubscribe();
+ assertEquals(null, serialSubscription.getSubscription());
+ }
+
+ @Test
+ public void getSubscriptionShouldReturnSetSubscription() {
+ final Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+ assertSame(underlying, serialSubscription.getSubscription());
+
+ final Subscription another = mock(Subscription.class);
+ serialSubscription.setSubscription(another);
+ assertSame(another, serialSubscription.getSubscription());
+ }
+
+ @Test
+ public void unsubscribingTwiceDoesUnsubscribeOnce() {
+ Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+
+ serialSubscription.unsubscribe();
+ verify(underlying).unsubscribe();
+
+ serialSubscription.unsubscribe();
+ verifyNoMoreInteractions(underlying);
+ }
+
+ @Test
+ public void settingSameSubscriptionTwiceDoesUnsubscribeIt() {
+ Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+ verifyZeroInteractions(underlying);
+ serialSubscription.setSubscription(underlying);
+ verify(underlying).unsubscribe();
+ }
+
+ @Test
+ public void unsubscribingWithSingleUnderlyingUnsubscribes() {
+ Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+ underlying.unsubscribe();
+ verify(underlying).unsubscribe();
+ }
+
+ @Test
+ public void replacingFirstUnderlyingCausesUnsubscription() {
+ Subscription first = mock(Subscription.class);
+ serialSubscription.setSubscription(first);
+ Subscription second = mock(Subscription.class);
+ serialSubscription.setSubscription(second);
+ verify(first).unsubscribe();
+ }
+
+ @Test
+ public void whenUnsubscribingSecondUnderlyingUnsubscribed() {
+ Subscription first = mock(Subscription.class);
+ serialSubscription.setSubscription(first);
+ Subscription second = mock(Subscription.class);
+ serialSubscription.setSubscription(second);
+ serialSubscription.unsubscribe();
+ verify(second).unsubscribe();
+ }
+
+ @Test
+ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() {
+ serialSubscription.unsubscribe();
+ Subscription underlying = mock(Subscription.class);
+ serialSubscription.setSubscription(underlying);
+ verify(underlying).unsubscribe();
+ }
+
+ @Test(timeout = 1000)
+ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently()
+ throws InterruptedException {
+ final Subscription firstSet = mock(Subscription.class);
+ serialSubscription.setSubscription(firstSet);
+
+ final CountDownLatch start = new CountDownLatch(1);
+
+ final int count = 10;
+ final CountDownLatch end = new CountDownLatch(count);
+
+ final List threads = new ArrayList();
+ for (int i = 0 ; i < count ; i++) {
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ serialSubscription.unsubscribe();
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ } finally {
+ end.countDown();
+ }
+ }
+ };
+ t.start();
+ threads.add(t);
+ }
+
+ final Subscription underlying = mock(Subscription.class);
+ start.countDown();
+ serialSubscription.setSubscription(underlying);
+ end.await();
+ verify(firstSet).unsubscribe();
+ verify(underlying).unsubscribe();
+
+ for (final Thread t : threads) {
+ t.join();
+ }
+ }
+
+ @Test
+ public void concurrentSetSubscriptionShouldNotInterleave()
+ throws InterruptedException {
+ final int count = 10;
+ final List subscriptions = new ArrayList();
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch end = new CountDownLatch(count);
+
+ final List threads = new ArrayList();
+ for (int i = 0 ; i < count ; i++) {
+ final Subscription subscription = mock(Subscription.class);
+ subscriptions.add(subscription);
+
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.await();
+ serialSubscription.setSubscription(subscription);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ } finally {
+ end.countDown();
+ }
+ }
+ };
+ t.start();
+ threads.add(t);
+ }
+
+ start.countDown();
+ end.await();
+ serialSubscription.unsubscribe();
+
+ for(final Subscription subscription : subscriptions) {
+ verify(subscription).unsubscribe();
+ }
+
+ for (final Thread t : threads) {
+ t.join();
+ }
+ }
+}