From 46ed350120e94a00a660caba93dde407a836c3dc Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 11 Dec 2013 22:16:53 +0100 Subject: [PATCH] Subjects reimplemented. --- .../src/main/java/rx/Notification.java | 27 ++ .../java/rx/subjects/AbstractSubject.java | 150 +++++++++- .../main/java/rx/subjects/AsyncSubject.java | 135 +++++---- .../java/rx/subjects/BehaviorSubject.java | 91 ++++--- .../main/java/rx/subjects/PublishSubject.java | 78 ++++-- .../main/java/rx/subjects/ReplaySubject.java | 256 +++++++++++------- .../java/rx/subjects/AsyncSubjectTest.java | 29 +- .../java/rx/subjects/BehaviorSubjectTest.java | 28 +- .../java/rx/subjects/PublishSubjectTest.java | 28 ++ .../java/rx/subjects/ReplaySubjectTest.java | 31 +++ 10 files changed, 639 insertions(+), 214 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Notification.java b/rxjava-core/src/main/java/rx/Notification.java index 996e217d50..a45f252df1 100644 --- a/rxjava-core/src/main/java/rx/Notification.java +++ b/rxjava-core/src/main/java/rx/Notification.java @@ -125,6 +125,33 @@ public void accept(Observer observer) { observer.onError(getThrowable()); } } + /** + * Emits the notification value to the observer + * and captures the exception thrown by the onNext method. + * @param observer the observer to send the notification to + * @return false if a terminal condition was reached, i.e., + * this is an isOnCompleted, isOnError or the observer.onNext threw + */ + public boolean acceptSafe(Observer observer) { + if (isOnNext()) { + try { + observer.onNext(getValue()); + return true; + } catch (Throwable t) { + observer.onError(t); + return false; + } + } else + if (isOnCompleted()) { + observer.onCompleted(); + return false; + } else + if (isOnError()) { + observer.onError(getThrowable()); + return false; + } + return false; + } public static enum Kind { OnNext, OnError, OnCompleted diff --git a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java index 30db331ac2..c60e834a8e 100644 --- a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java @@ -17,9 +17,13 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import rx.Notification; @@ -30,7 +34,151 @@ import rx.util.functions.Action2; public abstract class AbstractSubject extends Subject { - + /** Base state with lock. */ + static class BaseState { + /** The lock to protect the other fields. */ + private final Lock lock = new ReentrantLock(); + /** Lock. */ + public void lock() { + lock.lock(); + } + /** Unlock. */ + public void unlock() { + lock.unlock(); + } + + } + /** The default state of Subjects.*/ + static class DefaultState extends BaseState { + /** The currently subscribed observers. */ + private final Map> observers = new LinkedHashMap>(); + /** Indicator that the subject has completed. */ + public boolean done; + /** If not null, the subject completed with an error. */ + public Throwable error; + /** + * Add an observer to the observers and create a Subscription for it. + * Caller should hold the lock. + * @param obs + * @return + */ + public Subscription addObserver(Observer obs) { + Subscription s = new Subscription() { + final AtomicBoolean once = new AtomicBoolean(); + @Override + public void unsubscribe() { + if (once.compareAndSet(false, true)) { + remove(this); + } + } + + }; + observers.put(s, obs); + return s; + } + /** + * Returns a live collection of the observers. + *

+ * Caller should hold the lock. + * @return + */ + public Collection> observers() { + return new ArrayList>(observers.values()); + } + /** + * Removes and returns all observers from the mapping. + *

+ * Caller should hold the lock. + * @return + */ + public Collection> removeAll() { + List> list = new ArrayList>(observers.values()); + observers.clear(); + return list; + } + /** + * Remove the subscription. + * @param s + */ + protected void remove(Subscription s) { + lock(); + try { + observers.remove(s); + } finally { + unlock(); + } + } + /** + * Set the error state and dispatch it to the observers. + * @param e + */ + public void defaultOnError(Throwable e) { + lock(); + try { + if (done) { + return; + } + defaultDispatchError(e); + } finally { + unlock(); + } + } + /** + * Set the completion state and dispatch it to the observers. + */ + public void defaultOnCompleted() { + lock(); + try { + if (done) { + return; + } + done = true; + for (Observer o : removeAll()) { + o.onCompleted(); + } + } finally { + unlock(); + } + } + /** + * Dispatch the value to all subscribed observers. + * @param value + */ + public void defaultOnNext(T value) { + lock(); + try { + if (done) { + return; + } + defaultDispatch(value); + } finally { + unlock(); + } + } + /** + * Dispatch the value to all subscribed observers. + *

+ * Caller should hold the lock. + * @param value + */ + public void defaultDispatch(T value) { + for (Observer o : observers()) { + o.onNext(value); + } + } + /** + * Dispatch the exception to all subscribed observers and + * remove them. + * @param e + */ + public void defaultDispatchError(Throwable e) { + done = true; + error = e; + for (Observer o : removeAll()) { + o.onError(e); + } + } + } protected AbstractSubject(rx.Observable.OnSubscribeFunc onSubscribe) { super(onSubscribe); } diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index a9c18be805..028436c57a 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -15,9 +15,11 @@ */ package rx.subjects; -import rx.Notification; +import rx.Observable; import rx.Observer; -import rx.util.functions.Action2; +import rx.Subscription; +import rx.subjects.AbstractSubject.DefaultState; +import rx.subscriptions.Subscriptions; /** * Subject that publishes only the last event to each {@link Observer} that has subscribed when the @@ -48,68 +50,109 @@ * * @param */ -public class AsyncSubject extends AbstractSubject { - +public class AsyncSubject extends Subject { + /** The inner state. */ + protected static final class State extends DefaultState { + protected boolean hasValue; + protected T value; + } /** * Create a new AsyncSubject * * @return a new AsyncSubject */ public static AsyncSubject create() { - final SubjectState state = new SubjectState(); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { - - @Override - public void call(SubjectState state, Observer o) { - // we want the last value + completed so add this extra logic - // to send onCompleted if the last value is an onNext - if (state.completed.get()) { - Notification value = state.currentValue.get(); - if (value != null && value.isOnNext()) { - o.onCompleted(); - } - } - } - }); - return new AsyncSubject(onSubscribe, state); + State state = new State(); + return new AsyncSubject(new AsyncSubjectSubscribeFunc(state), state); } - - private final SubjectState state; - - protected AsyncSubject(OnSubscribeFunc onSubscribe, SubjectState state) { - super(onSubscribe); + /** The state. */ + protected final State state; + + protected AsyncSubject(Observable.OnSubscribeFunc osf, State state) { + super(osf); this.state = state; } @Override - public void onCompleted() { - /** - * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers - */ - emitNotificationAndTerminate(state, new Action2, Observer>() { - - @Override - public void call(SubjectState state, Observer o) { - o.onCompleted(); + public void onNext(T args) { + state.lock(); + try { + if (state.done) { + return; } - }); + state.hasValue = true; + state.value = args; + } finally { + state.unlock(); + } } @Override public void onError(Throwable e) { - /** - * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers - */ - state.currentValue.set(new Notification(e)); - emitNotificationAndTerminate(state, null); + state.lock(); + try { + if (state.done) { + return; + } + state.value = null; + state.hasValue = false; + + state.defaultDispatchError(e); + } finally { + state.unlock(); + } } - + @Override - public void onNext(T v) { - /** - * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs. - */ - state.currentValue.set(new Notification(v)); + public void onCompleted() { + state.lock(); + try { + if (state.done) { + return; + } + state.done = true; + for (Observer o : state.removeAll()) { + if (state.hasValue) { + o.onNext(state.value); + } + o.onCompleted(); + } + } finally { + state.unlock(); + } } + /** The subscription function. */ + protected static final class AsyncSubjectSubscribeFunc implements OnSubscribeFunc { + protected final State state; + protected AsyncSubjectSubscribeFunc(State state) { + this.state = state; + } + @Override + public Subscription onSubscribe(Observer t1) { + Throwable error; + boolean hasValue; + T value; + state.lock(); + try { + if (!state.done) { + return state.addObserver(t1); + } + error = state.error; + hasValue = state.hasValue; + value = state.value; + } finally { + state.unlock(); + } + if (error != null) { + t1.onError(error); + } else { + if (hasValue) { + t1.onNext(value); + } + t1.onCompleted(); + } + return Subscriptions.empty(); + } + } } diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 053a51f0cf..a8016fd298 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -15,9 +15,10 @@ */ package rx.subjects; -import rx.Notification; import rx.Observer; -import rx.util.functions.Action2; +import rx.Subscription; +import rx.subjects.AbstractSubject.DefaultState; +import rx.subscriptions.Subscriptions; /** * Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}. @@ -47,8 +48,11 @@ * * @param */ -public class BehaviorSubject extends AbstractSubject { - +public class BehaviorSubject extends Subject { + /** The inner state. */ + protected static final class State extends DefaultState { + protected T value; + } /** * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. * @@ -69,54 +73,69 @@ public static BehaviorSubject createWithDefaultValue(T defaultValue) { * @return the constructed {@link BehaviorSubject}. */ public static BehaviorSubject create(T defaultValue) { - final SubjectState state = new SubjectState(); - // set a default value so subscriptions will immediately receive this until a new notification is received - state.currentValue.set(new Notification(defaultValue)); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { - - @Override - public void call(SubjectState state, Observer o) { - /** - * When we subscribe we always emit the latest value to the observer, including - * terminal states which are recorded as the last value. - */ - emitNotification(state.currentValue.get(), o); - } - }); - return new BehaviorSubject(onSubscribe, state); + final State state = new State(); + state.value = defaultValue; + return new BehaviorSubject(new BehaviorSubjectSubscribeFunc(state), state); } - private final SubjectState state; + private final State state; - protected BehaviorSubject(OnSubscribeFunc onSubscribe, SubjectState state) { + protected BehaviorSubject(OnSubscribeFunc onSubscribe, State state) { super(onSubscribe); this.state = state; } @Override public void onCompleted() { - /** - * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers - */ - state.currentValue.set(new Notification()); - emitNotificationAndTerminate(state, null); + state.defaultOnCompleted(); } @Override public void onError(Throwable e) { - /** - * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers - */ - state.currentValue.set(new Notification(e)); - emitNotificationAndTerminate(state, null); + state.defaultOnError(e); } @Override public void onNext(T v) { - /** - * Store the latest value and send it to all observers; - */ - state.currentValue.set(new Notification(v)); - emitNotification(state, null); + state.lock(); + try { + if (state.done) { + return; + } + state.value = v; + state.defaultDispatch(v); + } finally { + state.unlock(); + } + } + /** The subscription function. */ + protected static final class BehaviorSubjectSubscribeFunc implements OnSubscribeFunc { + protected final State state; + protected BehaviorSubjectSubscribeFunc(State state) { + this.state = state; + } + + @Override + public Subscription onSubscribe(Observer t1) { + Throwable error; + T value; + state.lock(); + try { + value = state.value; + if (!state.done) { + t1.onNext(value); + return state.addObserver(t1); + } + error = state.error; + } finally { + state.unlock(); + } + if (error != null) { + t1.onError(error); + } else { + t1.onCompleted(); + } + return Subscriptions.empty(); + } } } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 8cf2d75747..844ed5dd63 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -15,8 +15,12 @@ */ package rx.subjects; -import rx.Notification; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Subscription; +import rx.subjects.AbstractSubject.DefaultState; +import rx.subscriptions.Subscriptions; /** * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber. @@ -41,44 +45,62 @@ * * @param */ -public class PublishSubject extends AbstractSubject { +public class PublishSubject extends Subject { + /** The inner state. */ + protected static final class State extends DefaultState { + } + public static PublishSubject create() { - final SubjectState state = new SubjectState(); - OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, null); - return new PublishSubject(onSubscribe, state); + State state = new State(); + return new PublishSubject(new PublishSubjectSubscribeFunc(state), state); } - - private final SubjectState state; - - protected PublishSubject(OnSubscribeFunc onSubscribe, SubjectState state) { - super(onSubscribe); +/** The state. */ + protected final State state; + + protected PublishSubject(Observable.OnSubscribeFunc osf, State state) { + super(osf); this.state = state; } @Override - public void onCompleted() { - /** - * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers - */ - state.currentValue.set(new Notification()); - emitNotificationAndTerminate(state, null); + public void onNext(T args) { + state.defaultOnNext(args); } @Override public void onError(Throwable e) { - /** - * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers - */ - state.currentValue.set(new Notification(e)); - emitNotificationAndTerminate(state, null); + state.defaultOnError(e); } - + @Override - public void onNext(T v) { - /** - * Store the latest value and send it to all observers; - */ - state.currentValue.set(new Notification(v)); - emitNotification(state, null); + public void onCompleted() { + state.defaultOnCompleted(); + } + /** The subscription function. */ + protected static final class PublishSubjectSubscribeFunc implements OnSubscribeFunc { + protected final State state; + protected PublishSubjectSubscribeFunc(State state) { + this.state = state; + } + + @Override + public Subscription onSubscribe(Observer t1) { + Throwable error; + state.lock(); + try { + if (!state.done) { + return state.addObserver(t1); + } + error = state.error; + } finally { + state.unlock(); + } + if (error != null) { + t1.onError(error); + } else { + t1.onCompleted(); + } + return Subscriptions.empty(); + } } } diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index c4985ba84a..6f3dcb15cf 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -15,16 +15,20 @@ */ package rx.subjects; + import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; - +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Notification; import rx.Observer; import rx.Subscription; +import rx.subjects.AbstractSubject.BaseState; +import rx.subjects.ReplaySubject.State.Replayer; import rx.subscriptions.Subscriptions; -import rx.util.functions.Func1; /** * Subject that retains all events and will replay them to an {@link Observer} that subscribes. @@ -49,128 +53,180 @@ * * @param */ -public final class ReplaySubject extends Subject -{ - - private boolean isDone = false; - private Throwable exception = null; - private final Map> subscriptions = new HashMap>(); - private final List history = Collections.synchronizedList(new ArrayList()); - - public static ReplaySubject create() { - return new ReplaySubject(new DelegateSubscriptionFunc()); - } - - private ReplaySubject(DelegateSubscriptionFunc onSubscribe) { - super(onSubscribe); - onSubscribe.wrap(new SubscriptionFunc()); - } - - private static final class DelegateSubscriptionFunc implements OnSubscribeFunc - { - private Func1, ? extends Subscription> delegate = null; - - public void wrap(Func1, ? extends Subscription> delegate) - { - if (this.delegate != null) { - throw new UnsupportedOperationException("delegate already set"); - } - this.delegate = delegate; +public final class ReplaySubject extends Subject { + /** The state class. */ + protected static final class State extends BaseState { + /** The values observed so far. */ + protected final List> values = Collections.synchronizedList(new ArrayList>()); + /** General completion indicator. */ + protected boolean done; + /** The map of replayers. */ + protected final Map replayers = new LinkedHashMap(); + /** + * Returns a live collection of the observers. + *

+ * Caller should hold the lock. + * @return + */ + protected Collection replayers() { + return new ArrayList(replayers.values()); } - - @Override - public Subscription onSubscribe(Observer observer) - { - return delegate.call(observer); - } - } - - private class SubscriptionFunc implements Func1, Subscription> - { - @Override - public Subscription call(Observer observer) { - int item = 0; - Subscription subscription; - - for (;;) { - while (item < history.size()) { - observer.onNext(history.get(item++)); - } - - synchronized (subscriptions) { - if (item < history.size()) { - continue; - } - - if (exception != null) { - observer.onError(exception); - return Subscriptions.empty(); + /** + * Add a replayer to the replayers and create a Subscription for it. + *

+ * Caller should hold the lock. + * + * @param obs + * @return + */ + protected Subscription addReplayer(Observer obs) { + Subscription s = new Subscription() { + final AtomicBoolean once = new AtomicBoolean(); + @Override + public void unsubscribe() { + if (once.compareAndSet(false, true)) { + remove(this); } - if (isDone) { - observer.onCompleted(); - return Subscriptions.empty(); + } + + }; + Replayer rp = new Replayer(obs, s); + replayers.put(s, rp); + rp.replayTill(values.size()); + return s; + } + /** The replayer that holds a value where the given observer is currently at. */ + protected final class Replayer { + protected final Observer wrapped; + protected int index; + protected final Subscription cancel; + protected Replayer(Observer wrapped, Subscription cancel) { + this.wrapped = wrapped; + this.cancel = cancel; + } + /** + * Replay up to the given index + * @param limit + */ + protected void replayTill(int limit) { + while (index < limit) { + Notification not = values.get(index); + index++; + if (!not.acceptSafe(wrapped)) { + replayers.remove(cancel); + return; } - - subscription = new RepeatSubjectSubscription(); - subscriptions.put(subscription, observer); - break; } } - - return subscription; } - } - - private class RepeatSubjectSubscription implements Subscription - { - @Override - public void unsubscribe() - { - synchronized (subscriptions) { - subscriptions.remove(this); + /** + * Remove the subscription. + * @param s + */ + protected void remove(Subscription s) { + lock(); + try { + replayers.remove(s); + } finally { + unlock(); } } } + /** + * Return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + * @return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + */ + public static ReplaySubject create() { + State state = new State(); + return new ReplaySubject(new ReplaySubjectSubscribeFunc(state), state); + } + private final State state; + private ReplaySubject(OnSubscribeFunc onSubscribe, State state) { + super(onSubscribe); + this.state = state; + } + @Override - public void onCompleted() - { - synchronized (subscriptions) { - isDone = true; - for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onCompleted(); + public void onCompleted() { + state.lock(); + try { + if (state.done) { + return; } - subscriptions.clear(); + state.done = true; + state.values.add(new Notification()); + replayValues(); + } finally { + state.unlock(); } } @Override - public void onError(Throwable e) - { - synchronized (subscriptions) { - if (isDone) { + public void onError(Throwable e) { + state.lock(); + try { + if (state.done) { return; } - isDone = true; - exception = e; - for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onError(e); - } - subscriptions.clear(); + state.done = true; + state.values.add(new Notification(e)); + replayValues(); + } finally { + state.unlock(); } } @Override - public void onNext(T args) - { - synchronized (subscriptions) { - if (isDone) { + public void onNext(T args) { + state.lock(); + try { + if (state.done) { return; } - history.add(args); - for (Observer observer : new ArrayList>(subscriptions.values())) { - observer.onNext(args); + state.values.add(new Notification(args)); + replayValues(); + } finally { + state.unlock(); + } + } + /** + * Replay values up to the current index. + */ + protected void replayValues() { + int s = state.values.size(); + for (Replayer rp : state.replayers()) { + rp.replayTill(s); + } + } + /** The subscription function. */ + protected static final class ReplaySubjectSubscribeFunc implements OnSubscribeFunc { + + private final State state; + protected ReplaySubjectSubscribeFunc(State state) { + this.state = state; + } + + @Override + public Subscription onSubscribe(Observer t1) { + List> values; + + state.lock(); + try { + values = state.values; + if (!state.done) { + return state.addReplayer(t1); + } + } finally { + state.unlock(); + } + + for (Notification v : values) { + if (!v.acceptSafe(t1)) { + break; + } } + return Subscriptions.empty(); } } } diff --git a/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java index b483b9c99f..2aff483222 100644 --- a/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java @@ -16,7 +16,6 @@ package rx.subjects; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; @@ -300,5 +299,31 @@ public void run() { } } } - + @Test + public void testFirstErrorOnly() { + AsyncSubject subject = AsyncSubject.create(); + Observer observer = mock(Observer.class); + + subject.subscribe(observer); + + RuntimeException ex = new RuntimeException("Forced failure"); + + subject.onError(ex); + + verify(observer, times(1)).onError(ex); + verify(observer, never()).onCompleted(); + verify(observer, never()).onNext(any()); + + RuntimeException ex2 = new RuntimeException("Forced failure 2"); + + subject.onError(ex2); + + Observer observer2 = mock(Observer.class); + + subject.subscribe(observer2); + verify(observer2, times(1)).onError(ex); + verify(observer2, never()).onError(ex2); + verify(observer2, never()).onCompleted(); + verify(observer2, never()).onNext(any()); + } } diff --git a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java index 57958c108f..51a45b2b2f 100644 --- a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java @@ -15,7 +15,6 @@ */ package rx.subjects; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import org.junit.Test; @@ -179,4 +178,31 @@ public void call(BehaviorSubject DefaultSubject) { } ); } + @Test + public void testFirstErrorOnly() { + BehaviorSubject subject = BehaviorSubject.create(0); + Observer observer = mock(Observer.class); + + subject.subscribe(observer); + + RuntimeException ex = new RuntimeException("Forced failure"); + + subject.onError(ex); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(ex); + verify(observer, never()).onCompleted(); + + RuntimeException ex2 = new RuntimeException("Forced failure 2"); + + subject.onError(ex2); + + Observer observer2 = mock(Observer.class); + + subject.subscribe(observer2); + verify(observer2, never()).onNext(any()); + verify(observer2, times(1)).onError(ex); + verify(observer2, never()).onError(ex2); + verify(observer2, never()).onCompleted(); + } } diff --git a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java index 5e9cc2790f..bb59202aaf 100644 --- a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java @@ -328,4 +328,32 @@ public void testReSubscribe() { private final Throwable testException = new Throwable(); + + @Test + public void testFirstErrorOnly() { + PublishSubject subject = PublishSubject.create(); + Observer observer = mock(Observer.class); + + subject.subscribe(observer); + + RuntimeException ex = new RuntimeException("Forced failure"); + + subject.onError(ex); + + verify(observer, times(1)).onError(ex); + verify(observer, never()).onCompleted(); + verify(observer, never()).onNext(any()); + + RuntimeException ex2 = new RuntimeException("Forced failure 2"); + + subject.onError(ex2); + + Observer observer2 = mock(Observer.class); + + subject.subscribe(observer2); + verify(observer2, times(1)).onError(ex); + verify(observer2, never()).onError(ex2); + verify(observer2, never()).onCompleted(); + verify(observer2, never()).onNext(any()); + } } diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index 005b56ba5a..be1aaa28bc 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -269,4 +269,35 @@ public void call(ReplaySubject repeatSubject) { } ); } + @Test + public void testFirstErrorOnly() { + ReplaySubject subject = ReplaySubject.create(); + Observer observer = mock(Observer.class); + + subject.subscribe(observer); + + RuntimeException ex = new RuntimeException("Forced failure"); + + subject.onNext(0); + subject.onError(ex); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(ex); + verify(observer, never()).onCompleted(); + + RuntimeException ex2 = new RuntimeException("Forced failure 2"); + + subject.onNext(1); + subject.onError(ex2); + + Observer observer2 = mock(Observer.class); + + subject.subscribe(observer2); + verify(observer2, times(1)).onNext(0); + verify(observer2, times(1)).onError(ex); + + verify(observer2, never()).onNext(1); + verify(observer2, never()).onError(ex2); + verify(observer2, never()).onCompleted(); + } }