Skip to content

Commit de6e1a2

Browse files
Merge pull request ReactiveX#615 from abersnaze/issue614
Copied the code from OperationMerge to fix synchronization
2 parents 792e08a + 311ca29 commit de6e1a2

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperationMergeDelayError.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observable.OnSubscribeFunc;
2525
import rx.Observer;
2626
import rx.Subscription;
27+
import rx.subscriptions.CompositeSubscription;
2728
import rx.util.CompositeException;
2829

2930
/**
@@ -151,13 +152,26 @@ private MergeDelayErrorObservable(Observable<? extends Observable<? extends T>>
151152
}
152153

153154
public Subscription onSubscribe(Observer<? super T> actualObserver) {
155+
CompositeSubscription completeSubscription = new CompositeSubscription();
156+
157+
/**
158+
* We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting.
159+
* <p>
160+
* The calls from each sequence must be serialized.
161+
* <p>
162+
* Bug report: https://github.com/Netflix/RxJava/issues/614
163+
*/
164+
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
165+
completeSubscription.add(subscription);
166+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
167+
154168
/**
155169
* Subscribe to the parent Observable to get to the children Observables
156170
*/
157-
sequences.subscribe(new ParentObserver(actualObserver));
171+
completeSubscription.add(sequences.subscribe(new ParentObserver(actualObserver)));
158172

159173
/* return our subscription to allow unsubscribing */
160-
return ourSubscription;
174+
return completeSubscription;
161175
}
162176

163177
/**

0 commit comments

Comments
 (0)