Skip to content

Commit c3bbd89

Browse files
Merge pull request #661 from benjchristensen/subscriptions
Subscriptions Rewrite
2 parents 44c3739 + 813988b commit c3bbd89

11 files changed

+391
-384
lines changed

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import java.util.LinkedList;
2020
import java.util.List;
2121
import java.util.Queue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
2224
import rx.Notification;
2325
import rx.Observable;
24-
import rx.subscriptions.SingleAssignmentSubscription;
26+
import rx.operators.SafeObservableSubscription;
2527
import rx.util.functions.Action1;
2628

2729
/**
@@ -33,14 +35,15 @@ public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implem
3335
private final Action1<Throwable> onError;
3436
private final List<ActivePlan0> activePlans;
3537
private final Queue<Notification<T>> queue;
36-
private final SingleAssignmentSubscription subscription;
38+
private final SafeObservableSubscription subscription;
3739
private volatile boolean done;
40+
private final AtomicBoolean subscribed = new AtomicBoolean(false);
3841

3942
public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
4043
this.source = source;
4144
this.onError = onError;
4245
queue = new LinkedList<Notification<T>>();
43-
subscription = new SingleAssignmentSubscription();
46+
subscription = new SafeObservableSubscription();
4447
activePlans = new ArrayList<ActivePlan0>();
4548
}
4649
public Queue<Notification<T>> queue() {
@@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) {
5154
}
5255
@Override
5356
public void subscribe(Object gate) {
54-
this.gate = gate;
55-
subscription.set(source.materialize().subscribe(this));
57+
if (subscribed.compareAndSet(false, true)) {
58+
this.gate = gate;
59+
subscription.wrap(source.materialize().subscribe(this));
60+
} else {
61+
throw new IllegalStateException("Can only be subscribed to once.");
62+
}
5663
}
5764

5865
@Override

rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,45 @@
1919

2020
import rx.Observable;
2121
import rx.Subscription;
22+
import rx.util.functions.Action0;
2223

2324
/**
2425
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed.
2526
*
2627
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx">Rx.Net equivalent BooleanDisposable</a>
2728
*/
28-
public class BooleanSubscription implements Subscription {
29+
public final class BooleanSubscription implements Subscription {
2930

3031
private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
32+
private final Action0 action;
33+
34+
public BooleanSubscription() {
35+
action = null;
36+
}
37+
38+
private BooleanSubscription(Action0 action) {
39+
this.action = action;
40+
}
41+
42+
public static BooleanSubscription create() {
43+
return new BooleanSubscription();
44+
}
45+
46+
public static BooleanSubscription create(Action0 onUnsubscribe) {
47+
return new BooleanSubscription(onUnsubscribe);
48+
}
3149

3250
public boolean isUnsubscribed() {
3351
return unsubscribed.get();
3452
}
3553

3654
@Override
37-
public void unsubscribe() {
38-
unsubscribed.set(true);
55+
public final void unsubscribe() {
56+
if (unsubscribed.compareAndSet(false, true)) {
57+
if (action != null) {
58+
action.call();
59+
}
60+
}
3961
}
4062

4163
}
Lines changed: 107 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package rx.subscriptions;
1717

18-
import static java.util.Arrays.asList;
19-
import static java.util.Collections.unmodifiableSet;
20-
2118
import java.util.ArrayList;
19+
import java.util.Arrays;
2220
import java.util.Collection;
23-
import java.util.Collections;
24-
import java.util.HashSet;
25-
import java.util.Set;
21+
import java.util.List;
2622
import java.util.concurrent.atomic.AtomicReference;
2723

2824
import rx.Subscription;
@@ -31,106 +27,118 @@
3127
/**
3228
* Subscription that represents a group of Subscriptions that are unsubscribed
3329
* together.
34-
*
35-
* @see <a
36-
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
37-
* equivalent CompositeDisposable</a>
30+
*
31+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3832
*/
39-
public class CompositeSubscription implements Subscription {
40-
/** Sentinel to indicate a thread is modifying the subscription set. */
41-
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
42-
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
43-
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
44-
/** The reference to the set of subscriptions. */
45-
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();
46-
33+
public final class CompositeSubscription implements Subscription {
34+
35+
private final AtomicReference<State> state = new AtomicReference<State>();
36+
37+
private static final class State {
38+
final boolean isUnsubscribed;
39+
final List<Subscription> subscriptions;
40+
41+
State(boolean u, List<Subscription> s) {
42+
this.isUnsubscribed = u;
43+
this.subscriptions = s;
44+
}
45+
46+
State unsubscribe() {
47+
return new State(true, subscriptions);
48+
}
49+
50+
State add(Subscription s) {
51+
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
52+
newSubscriptions.addAll(subscriptions);
53+
newSubscriptions.add(s);
54+
return new State(isUnsubscribed, newSubscriptions);
55+
}
56+
57+
State remove(Subscription s) {
58+
List<Subscription> newSubscriptions = new ArrayList<Subscription>();
59+
newSubscriptions.addAll(subscriptions);
60+
newSubscriptions.remove(s); // only first occurrence
61+
return new State(isUnsubscribed, newSubscriptions);
62+
}
63+
64+
State clear() {
65+
return new State(isUnsubscribed, new ArrayList<Subscription>());
66+
}
67+
}
68+
4769
public CompositeSubscription(final Subscription... subscriptions) {
48-
reference.set(new HashSet<Subscription>(asList(subscriptions)));
70+
state.set(new State(false, Arrays.asList(subscriptions)));
4971
}
50-
72+
5173
public boolean isUnsubscribed() {
52-
return reference.get() == UNSUBSCRIBED_SENTINEL;
74+
return state.get().isUnsubscribed;
5375
}
54-
76+
5577
public void add(final Subscription s) {
78+
State oldState;
79+
State newState;
5680
do {
57-
final Set<Subscription> existing = reference.get();
58-
if (existing == UNSUBSCRIBED_SENTINEL) {
81+
oldState = state.get();
82+
if (oldState.isUnsubscribed) {
5983
s.unsubscribe();
60-
break;
61-
}
62-
63-
if (existing == MUTATE_SENTINEL) {
64-
continue;
65-
}
66-
67-
if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
68-
existing.add(s);
69-
reference.set(existing);
70-
break;
84+
return;
85+
} else {
86+
newState = oldState.add(s);
7187
}
72-
} while (true);
88+
} while (!state.compareAndSet(oldState, newState));
7389
}
74-
90+
7591
public void remove(final Subscription s) {
92+
State oldState;
93+
State newState;
7694
do {
77-
final Set<Subscription> subscriptions = reference.get();
78-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
79-
s.unsubscribe();
80-
break;
81-
}
82-
83-
if (subscriptions == MUTATE_SENTINEL) {
84-
continue;
85-
}
86-
87-
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
88-
// also unsubscribe from it:
89-
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
90-
subscriptions.remove(s);
91-
reference.set(subscriptions);
92-
s.unsubscribe();
93-
break;
95+
oldState = state.get();
96+
if (oldState.isUnsubscribed) {
97+
return;
98+
} else {
99+
newState = oldState.remove(s);
94100
}
95-
} while (true);
101+
} while (!state.compareAndSet(oldState, newState));
102+
// if we removed successfully we then need to call unsubscribe on it
103+
s.unsubscribe();
96104
}
97-
105+
98106
public void clear() {
107+
State oldState;
108+
State newState;
99109
do {
100-
final Set<Subscription> subscriptions = reference.get();
101-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
102-
break;
103-
}
104-
105-
if (subscriptions == MUTATE_SENTINEL) {
106-
continue;
110+
oldState = state.get();
111+
if (oldState.isUnsubscribed) {
112+
return;
113+
} else {
114+
newState = oldState.clear();
107115
}
108-
109-
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
110-
final Set<Subscription> copy = new HashSet<Subscription>(
111-
subscriptions);
112-
subscriptions.clear();
113-
reference.set(subscriptions);
114-
115-
unsubscribeAll(copy);
116-
break;
116+
} while (!state.compareAndSet(oldState, newState));
117+
// if we cleared successfully we then need to call unsubscribe on all previous
118+
unsubscribeFromAll(oldState.subscriptions);
119+
}
120+
121+
@Override
122+
public void unsubscribe() {
123+
State oldState;
124+
State newState;
125+
do {
126+
oldState = state.get();
127+
if (oldState.isUnsubscribed) {
128+
return;
129+
} else {
130+
newState = oldState.unsubscribe();
117131
}
118-
} while (true);
132+
} while (!state.compareAndSet(oldState, newState));
133+
unsubscribeFromAll(oldState.subscriptions);
119134
}
120-
/**
121-
* Unsubscribe from the collection of subscriptions.
122-
* <p>
123-
* Exceptions thrown by any of the {@code unsubscribe()} methods are
124-
* collected into a {@link CompositeException} and thrown once
125-
* all unsubscriptions have been attempted.
126-
* @param subs the collection of subscriptions
127-
*/
128-
private void unsubscribeAll(Collection<Subscription> subs) {
135+
136+
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
129137
final Collection<Throwable> es = new ArrayList<Throwable>();
130-
for (final Subscription s : subs) {
138+
for (Subscription s : subscriptions) {
131139
try {
132140
s.unsubscribe();
133-
} catch (final Throwable e) {
141+
} catch (Throwable e) {
134142
es.add(e);
135143
}
136144
}
@@ -139,22 +147,4 @@ private void unsubscribeAll(Collection<Subscription> subs) {
139147
"Failed to unsubscribe to 1 or more subscriptions.", es);
140148
}
141149
}
142-
@Override
143-
public void unsubscribe() {
144-
do {
145-
final Set<Subscription> subscriptions = reference.get();
146-
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
147-
break;
148-
}
149-
150-
if (subscriptions == MUTATE_SENTINEL) {
151-
continue;
152-
}
153-
154-
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
155-
unsubscribeAll(subscriptions);
156-
break;
157-
}
158-
} while (true);
159-
}
160150
}

0 commit comments

Comments
 (0)