From eeb341017c433808d8c77646f90541da8b186f6f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 22 Nov 2013 14:20:50 +0100 Subject: [PATCH] Operator: Generate --- rxjava-core/src/main/java/rx/Observable.java | 110 +++++- .../java/rx/operators/OperationGenerate.java | 225 +++++++++++ .../src/main/java/rx/util/TimeSpan.java | 134 +++++++ .../rx/operators/OperationGenerateTest.java | 373 ++++++++++++++++++ 4 files changed, 841 insertions(+), 1 deletion(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationGenerate.java create mode 100644 rxjava-core/src/main/java/rx/util/TimeSpan.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 56a4664654..d149d02be0 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -49,6 +50,7 @@ import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; +import rx.operators.OperationGenerate; import rx.operators.OperationGroupBy; import rx.operators.OperationInterval; import rx.operators.OperationLast; @@ -105,6 +107,7 @@ import rx.util.Opening; import rx.util.Range; import rx.util.TimeInterval; +import rx.util.TimeSpan; import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -5689,5 +5692,110 @@ private boolean isInternalImplementation(Object o) { return isInternal; } } - + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, Func1 condition, + Func1 iterate, Func1 resultSelector) { + return generate(initialState, condition, iterate, resultSelector, + Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, Func1 condition, + Func1 iterate, Func1 resultSelector, Scheduler scheduler) { + return create(OperationGenerate.generate(initialState, condition, iterate, + resultSelector, scheduler)); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector) { + return generate(initialState, condition, iterate, resultSelector, + timeSelector, Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector, + Scheduler scheduler) { + return create(OperationGenerate.generate(initialState, condition, iterate, resultSelector, + timeSelector, scheduler)); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generateAbsoluteTime(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector + ) { + return generateAbsoluteTime(initialState, condition, iterate, resultSelector, + timeSelector, Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generateAbsoluteTime(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector, + Scheduler scheduler) { + return create(OperationGenerate.generateAbsoluteTime(initialState, condition, + iterate, resultSelector, timeSelector, scheduler)); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationGenerate.java b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java new file mode 100644 index 0000000000..b8191ef60a --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java @@ -0,0 +1,225 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.TimeSpan; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Generates an observable sequence by iterating a state from an initial state + * until the condition returns false. + *

+ * Behaves like a generalized for loop. + */ +public final class OperationGenerate { + /** + * Generates an observable sequence by iterating a state from an initial + * state until the condition returns false. + */ + public static OnSubscribeFunc generate( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return scheduler.schedule(initialState, new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return s.schedule(nextState, this); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }); + } + }; + } + /** + * Generates an observable sequence by iterating a state, in relative timed fashion, + * from an initial state until the condition fails. + */ + public static OnSubscribeFunc generate( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Func1 timeSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + TimeSpan first; + try { + first = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return scheduler.schedule(initialState, + new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + TimeSpan nextDate; + try { + nextDate = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return s.schedule(nextState, this, nextDate.value(), nextDate.unit()); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }, first.value(), first.unit()); + } + }; + } + /** + * Generates an observable sequence by iterating a state, in absolute timed fashion, + * from an initial state until the condition fails. + */ + public static OnSubscribeFunc generateAbsoluteTime( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Func1 timeSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + Date first; + try { + first = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + long delta = Math.max(0, first.getTime() - scheduler.now()); + + return scheduler.schedule(initialState, + new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + Date nextDate; + try { + nextDate = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + long deltaNext = Math.max(0, nextDate.getTime() - s.now()); + return s.schedule(nextState, this, deltaNext, TimeUnit.MILLISECONDS); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }, delta, TimeUnit.MILLISECONDS); + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/util/TimeSpan.java b/rxjava-core/src/main/java/rx/util/TimeSpan.java new file mode 100644 index 0000000000..9e171d1d62 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/TimeSpan.java @@ -0,0 +1,134 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util; + +import java.util.concurrent.TimeUnit; + +/** + * Represents a time value and time unit. + *

+ * Rx.NET note: System.TimeSpan has a fixed unit of measure of 100 nanoseconds + * per value; the Java way is to specify the TimeUnit along with the time value. + *

+ * Usage: + *

+ * TimeSpan oneSecond = TimeSpan.of(1, TimeUnit.SECONDS);
+ * 
+ */ +public final class TimeSpan implements Comparable { + private final long value; + private final TimeUnit unit; + /** Lazy hash. */ + private int hash; + private TimeSpan(long value, TimeUnit unit) { + this.value = value; + this.unit = unit; + } + /** + * Create a TimeSpan instance with the given time value and + * time unit. + * @param value the time value, must be nonnegative + * @param unit the time unit + * @return the TimeSpan instance with the given time value and + * time unit + * @throws IllegalArgumentException if the value < 0 + * @throws NullPointerException if unit is null + */ + public static TimeSpan of(long value, TimeUnit unit) { + if (value < 0) { + throw new IllegalArgumentException("value negative"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return new TimeSpan(value, unit); + } + /** + * Returns the time value. + * @return the time value + */ + public long value() { + return value; + } + /** + * Returns the time unit. + * @return the time unit + */ + public TimeUnit unit() { + return unit; + } + /** + * Create a new TimeSpan with the same time unit as this + * instance and the given newValue as the time value. + * @param newValue the new time value + * @return the new TimeSpan instance with the new time value + * @throws IllegalArgumentException if the value < 0 + */ + public TimeSpan withValue(long newValue) { + if (newValue < 0) { + throw new IllegalArgumentException("newValue negative"); + } + return new TimeSpan(newValue, unit); + } + /** + * Create a new TimeSpan with the same time value as this + * instance and the given newUnit as the time unit. + * @param newUnit the new time unit + * @return the new TimeSpan instance with the new time unit + * @throws NullPointerException if unit is null + */ + public TimeSpan withUnit(TimeUnit newUnit) { + if (unit == null) { + throw new NullPointerException("unit"); + } + return new TimeSpan(value, unit); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (obj.getClass() == getClass()) { + TimeSpan other = (TimeSpan)obj; + return value == other.value && unit == other.unit; + } + return false; + } + + @Override + public int hashCode() { + int h = hash; + if (h == 0) { + h = (int)(value ^ (value >>> 32)); + h = h * 31 + unit.hashCode(); + hash = h; + } + return h; + } + + @Override + public int compareTo(TimeSpan o) { + long t1 = unit.toNanos(value); + long t2 = o.unit.toNanos(value); + + return t1 < t2 ? -1 : (t1 > t2 ? 1 : 0); + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java new file mode 100644 index 0000000000..52dbcf9864 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java @@ -0,0 +1,373 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.concurrency.Schedulers; +import rx.util.TimeSpan; +import rx.util.functions.Func1; + +/** + * + */ +public class OperationGenerateTest { + @Mock + Observer observer; + + Func1 increment = new Func1() { + @Override + public Integer call(Integer t1) { + return t1 + 1; + } + }; + Func1 lessThan(final int value) { + return new Func1() { + @Override + public Boolean call(Integer t1) { + return t1 < value; + } + }; + } + Func1 dbl = new Func1() { + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + }; + Func1 just(final R value) { + return new Func1() { + @Override + public R call(Integer t1) { + return value; + } + }; + } + Func1 fail(R dummy) { + return new Func1() { + @Override + public R call(Integer t1) { + throw new RuntimeException("Forced failure"); + } + }; + } + Func1 fail(final R value, final int call) { + return new Func1() { + int i; + @Override + public R call(Integer t1) { + if (++i >= call) { + throw new RuntimeException("Forced failure"); + } + return value; + } + }; + } + Func1 fail(final Func1 valueFactory, final int call) { + return new Func1() { + int i; + @Override + public R call(Integer t1) { + if (++i >= call) { + throw new RuntimeException("Forced failure"); + } + return valueFactory.call(t1); + } + }; + } + Func1 delay(final int byMillis) { + return new Func1() { + @Override + public Date call(Integer t1) { + return new Date(System.currentTimeMillis() + byMillis); + } + }; + }; + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + @Test + public void basicForLoop() { + Observable m = Observable.generate(0, lessThan(5), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void emptyForLoop() { + Observable m = Observable.generate(0, lessThan(0), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void timedForLoop() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(5), + increment, dbl, just(TimeSpan.of(50, TimeUnit.MILLISECONDS))); + + m.subscribe(observer); + + Thread.sleep(600); // FIXME Shaky + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void timedEmptyForLoop() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(0), + increment, dbl, just(TimeSpan.of(50, TimeUnit.MILLISECONDS))); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void absoluteTimedForLoop() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), + increment, dbl, delay(50)); + + m.subscribe(observer); + + Thread.sleep(500); // FIXME Shaky + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void absoluteTimedEmptyForLoop() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(0), + increment, dbl, delay(50)); + + m.subscribe(observer); + + Thread.sleep(100); // FIXME Shaky + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + @Test + public void conditionFails() { + Observable m = Observable.generate(0, fail(false), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void iterateFails() { + Observable m = Observable.generate(0, lessThan(5), fail(0), dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void resultSelectorFails() { + Observable m = Observable.generate(0, lessThan(5), increment, fail(0), Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedConditionFails() throws InterruptedException { + Observable m = Observable.generate(0, fail(false), increment, dbl, + just(TimeSpan.of(50, TimeUnit.MILLISECONDS))); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedIterateFails() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(5), fail(0), dbl, just(TimeSpan.of(50, TimeUnit.MILLISECONDS))); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedResultSelectorFails() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(5), increment, fail(0), just(TimeSpan.of(50, TimeUnit.MILLISECONDS))); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedTimeSelectorFails() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(5), increment, dbl, + fail((TimeSpan)null)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedTimeSelectorFails2ndCall() throws InterruptedException { + Observable m = Observable.generate(0, lessThan(5), increment, dbl, + fail(TimeSpan.of(50, TimeUnit.MILLISECONDS), 2)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedConditionFails() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, fail(false), increment, dbl, + delay(50)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedIterateFails() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), fail(0), dbl, + delay(50)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedResultSelectorFails() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, fail(0), + delay(50)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedTimeSelectorFails() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, dbl, + fail((Date)null)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedTimeSelectorFails2ndCall() throws InterruptedException { + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, dbl, + fail(delay(50), 2)); + + m.subscribe(observer); + + Thread.sleep(200); // FIXME Shaky + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } +}