Description
This may be a problem with MapObserver or OnErrorResumeXXX and is probably related to #216
If the function returned by OnErrorResumeXXX returns an async Observable then there is some unpredictable behavior with operators (eg Map) that rely on the AtomicObserver to generate a valid event stream
I've created a UnitTest below to demonstrate the problem.
@Test
public void testMapError() {
Observable<String> observable = Observable.from("one","error","two","three");
Observable<String> m = Observable.create(map(observable, new Func1<String,String>() {
@Override
public String call(String in) {
if ("error".equals(in))
throw new RuntimeException("Simulated error");
return in;
}
}));
// Add error handler
m=m.onErrorResumeNext(new Func1<Exception,Observable<String>>() {
public Observable<String> call(Exception e) {
return createSlowSequence(new String[] {"II","III"});
}
});
m.subscribe(stringObserver);
verify(stringObserver, times(1)).onNext("one");
// Should be caught be OnErrorResumeNext
verify(stringObserver, never()).onError(any(Exception.class));
verify(stringObserver, never()).onNext("error");
verify(stringObserver, never()).onNext("two");
verify(stringObserver, never()).onNext("three");
// Resume output
verify(stringObserver, times(1)).onNext("II");
verify(stringObserver, times(1)).onNext("III");
verify(stringObserver, times(1)).onCompleted();
}
private Observable<String> createSlowSequence(final String[] seq) {
return Observable.create(new Func1<Observer<String>,Subscription>() {
public Subscription call(final Observer<String> ob) {
new Thread() {
/** Delay for a given time */
public void waitFor(int ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
}
}
/** Slowly emit a sequence */
public void run() {
for (int i=0;i<seq.length;i++) {
waitFor(100);
ob.onNext(seq[i]);
}
ob.onCompleted();
}
}.start();
return Subscriptions.empty();
}
});
}
There are two issues I see causing the problem, so two potential fixes.
- MapObserver continues to emit onNext/onCompleted even after it has emitted onError. This pushes the problem downstream.
- OnErrorResumeNext continues to propogate onNext/onCompleted even after it has subscribed the Observer(s) to the resume Observable.
This means that if the resume Observable is async OnErrorResumeNext will emit one/more onNext and potentially onCompleted from the source Observable unless the resume Observable completes.
It could also trigger multiple subscriptions if the MapObserver produced another onError.
A side note here is that the AtomicObserver fixes this issue if the resume Observable is sync (because it ignores the source Observable events after the resume Observable has completed). This effectively masks the problem in the OnErrorResumeNext unit-test. Perhaps the unit-tests should disable this behavior, or detect it via a plugin?