2323
2424import rx .Notification ;
2525import rx .Observable ;
26+ import rx .Observer ;
2627import rx .operators .SafeObservableSubscription ;
28+ import rx .operators .SafeObserver ;
2729import rx .util .functions .Action1 ;
2830
2931/**
3032 * Default implementation of a join observer.
3133 */
32- public final class JoinObserver1 <T > extends ObserverBase <Notification <T >> implements JoinObserver {
34+ public final class JoinObserver1 <T > implements Observer <Notification <T >>, JoinObserver {
3335 private Object gate ;
3436 private final Observable <T > source ;
3537 private final Action1 <Throwable > onError ;
3638 private final List <ActivePlan0 > activePlans ;
3739 private final Queue <Notification <T >> queue ;
38- private final SafeObservableSubscription subscription ;
40+ private final SafeObservableSubscription subscription = new SafeObservableSubscription () ;
3941 private volatile boolean done ;
4042 private final AtomicBoolean subscribed = new AtomicBoolean (false );
43+ private final SafeObserver <Notification <T >> safeObserver ;
4144
4245 public JoinObserver1 (Observable <T > source , Action1 <Throwable > onError ) {
4346 this .source = source ;
4447 this .onError = onError ;
4548 queue = new LinkedList <Notification <T >>();
46- subscription = new SafeObservableSubscription ();
4749 activePlans = new ArrayList <ActivePlan0 >();
50+ safeObserver = new SafeObserver <Notification <T >>(subscription , new InnerObserver ());
4851 }
4952 public Queue <Notification <T >> queue () {
5053 return queue ;
@@ -67,35 +70,52 @@ public void dequeue() {
6770 queue .remove ();
6871 }
6972
70- @ Override
71- protected void onNextCore (Notification <T > args ) {
72- synchronized (gate ) {
73- if (!done ) {
74- if (args .isOnError ()) {
75- onError .call (args .getThrowable ());
76- return ;
77- }
78- queue .add (args );
79-
80- // remark: activePlans might change while iterating
81- for (ActivePlan0 a : new ArrayList <ActivePlan0 >(activePlans )) {
82- a .match ();
73+ private final class InnerObserver implements Observer <Notification <T >> {
74+
75+ @ Override
76+ public void onNext (Notification <T > args ) {
77+ synchronized (gate ) {
78+ if (!done ) {
79+ if (args .isOnError ()) {
80+ onError .call (args .getThrowable ());
81+ return ;
82+ }
83+ queue .add (args );
84+
85+ // remark: activePlans might change while iterating
86+ for (ActivePlan0 a : new ArrayList <ActivePlan0 >(activePlans )) {
87+ a .match ();
88+ }
8389 }
8490 }
8591 }
92+
93+ @ Override
94+ public void onError (Throwable e ) {
95+ // not expected
96+ }
97+
98+ @ Override
99+ public void onCompleted () {
100+ // not expected or ignored
101+ }
102+ }
103+
104+ @ Override
105+ public void onNext (Notification <T > args ) {
106+ safeObserver .onNext (args );
86107 }
87108
88109 @ Override
89- protected void onErrorCore (Throwable e ) {
90- // not expected
110+ public void onError (Throwable e ) {
111+ safeObserver . onError ( e );
91112 }
92113
93114 @ Override
94- protected void onCompletedCore () {
95- // not expected or ignored
115+ public void onCompleted () {
116+ safeObserver . onCompleted ();
96117 }
97118
98-
99119 void removeActivePlan (ActivePlan0 activePlan ) {
100120 activePlans .remove (activePlan );
101121 if (activePlans .isEmpty ()) {
0 commit comments