diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5a703c5019..6643d787be 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -51,6 +52,7 @@ import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; +import rx.operators.OperationGenerate; import rx.operators.OperationGroupBy; import rx.operators.OperationInterval; import rx.operators.OperationJoinPatterns; @@ -5942,5 +5944,113 @@ public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8, Plan0 p9) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)); } + + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, Func1 condition, + Func1 iterate, Func1 resultSelector) { + return generate(initialState, condition, iterate, resultSelector, + Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, Func1 condition, + Func1 iterate, Func1 resultSelector, Scheduler scheduler) { + return create(OperationGenerate.generate(initialState, condition, iterate, + resultSelector, scheduler)); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. Returns a nanosecond resolution time delay value. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector) { + return generate(initialState, condition, iterate, resultSelector, + timeSelector, Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. Returns a nanosecond resolution time delay value. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generate(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector, + Scheduler scheduler) { + return create(OperationGenerate.generate(initialState, condition, iterate, resultSelector, + timeSelector, scheduler)); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generateAbsoluteTime(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector + ) { + return generateAbsoluteTime(initialState, condition, iterate, resultSelector, + timeSelector, Schedulers.threadPoolForComputation()); + } + /** + * Generates an observable sequence by iterating a state from an initial state until the condition fails. + * @param initialState The initial state. + * @param condition The condition to terminate generation. + * @param iterate The iteration step function. + * @param resultSelector The selector function for results produced in the sequence. + * @param timeSelector The time selector function to control the speed of values being produced each iteration. + * @param scheduler he scheduler on which to run the generator loop. + * @return The generated sequence. + * @see MSDN: Observable.Generate + */ + public static Observable generateAbsoluteTime(TState initialState, + Func1 condition, + Func1 iterate, + Func1 resultSelector, + Func1 timeSelector, + Scheduler scheduler) { + return create(OperationGenerate.generateAbsoluteTime(initialState, condition, + iterate, resultSelector, timeSelector, scheduler)); + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationGenerate.java b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java new file mode 100644 index 0000000000..9243c2f214 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java @@ -0,0 +1,224 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.operators; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Generates an observable sequence by iterating a state from an initial state + * until the condition returns false. + *

+ * Behaves like a generalized for loop. + */ +public final class OperationGenerate { + /** + * Generates an observable sequence by iterating a state from an initial + * state until the condition returns false. + */ + public static OnSubscribeFunc generate( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return scheduler.schedule(initialState, new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return s.schedule(nextState, this); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }); + } + }; + } + /** + * Generates an observable sequence by iterating a state, in relative timed fashion, + * from an initial state until the condition fails. + */ + public static OnSubscribeFunc generate( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Func1 timeSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + Long first; + try { + first = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return scheduler.schedule(initialState, + new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + Long nextDate; + try { + nextDate = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + return s.schedule(nextState, this, nextDate, TimeUnit.NANOSECONDS); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }, first, TimeUnit.NANOSECONDS); + } + }; + } + /** + * Generates an observable sequence by iterating a state, in absolute timed fashion, + * from an initial state until the condition fails. + */ + public static OnSubscribeFunc generateAbsoluteTime( + final TState initialState, + final Func1 condition, + final Func1 iterate, + final Func1 resultSelector, + final Func1 timeSelector, + final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + Date first; + try { + first = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + long delta = Math.max(0, first.getTime() - scheduler.now()); + + return scheduler.schedule(initialState, + new Func2() { + @Override + public Subscription call(Scheduler s, TState state) { + boolean hasNext; + try { + hasNext = condition.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + if (hasNext) { + R result; + try { + result = resultSelector.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + observer.onNext(result); + + TState nextState; + try { + nextState = iterate.call(state); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + Date nextDate; + try { + nextDate = timeSelector.call(initialState); + } catch (Throwable t) { + observer.onError(t); + return Subscriptions.empty(); + } + + long deltaNext = Math.max(0, nextDate.getTime() - s.now()); + return s.schedule(nextState, this, deltaNext, TimeUnit.MILLISECONDS); + } + observer.onCompleted(); + return Subscriptions.empty(); + } + }, delta, TimeUnit.MILLISECONDS); + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java new file mode 100644 index 0000000000..e6a151919e --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java @@ -0,0 +1,417 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.util.functions.Func1; + +/** + * + */ +public class OperationGenerateTest { + @Mock + Observer observer; + + Func1 increment = new Func1() { + @Override + public Integer call(Integer t1) { + return t1 + 1; + } + }; + Func1 lessThan(final int value) { + return new Func1() { + @Override + public Boolean call(Integer t1) { + return t1 < value; + } + }; + } + Func1 dbl = new Func1() { + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + }; + Func1 just(final R value) { + return new Func1() { + @Override + public R call(Integer t1) { + return value; + } + }; + } + Func1 fail(R dummy) { + return new Func1() { + @Override + public R call(Integer t1) { + throw new RuntimeException("Forced failure"); + } + }; + } + Func1 fail(final R value, final int call) { + return new Func1() { + int i; + @Override + public R call(Integer t1) { + if (++i >= call) { + throw new RuntimeException("Forced failure"); + } + return value; + } + }; + } + Func1 fail(final Func1 valueFactory, final int call) { + return new Func1() { + int i; + @Override + public R call(Integer t1) { + if (++i >= call) { + throw new RuntimeException("Forced failure"); + } + return valueFactory.call(t1); + } + }; + } + Func1 delay(final int byMillis) { + return new Func1() { + @Override + public Date call(Integer t1) { + return new Date(System.currentTimeMillis() + byMillis); + } + }; + }; + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + @Test + public void basicForLoop() { + Observable m = Observable.generate(0, lessThan(5), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void emptyForLoop() { + Observable m = Observable.generate(0, lessThan(0), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void timedForLoop() throws InterruptedException { + + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(5), + increment, dbl, just(TimeUnit.MILLISECONDS.toNanos(50)), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS); + + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void timedEmptyForLoop() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(0), + increment, dbl, just(TimeUnit.MILLISECONDS.toNanos(50)), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void absoluteTimedForLoop() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), + increment, dbl, delay(50), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onNext(6); + verify(observer, times(1)).onNext(8); + verify(observer, times(1)).onCompleted(); + + } + @Test + public void absoluteTimedEmptyForLoop() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(0), + increment, dbl, delay(50), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + @Test + public void conditionFails() { + Observable m = Observable.generate(0, fail(false), increment, dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void iterateFails() { + Observable m = Observable.generate(0, lessThan(5), fail(0), dbl, Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void resultSelectorFails() { + Observable m = Observable.generate(0, lessThan(5), increment, fail(0), Schedulers.immediate()); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedConditionFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, fail(false), increment, dbl, + just(TimeUnit.MILLISECONDS.toNanos(50)), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedIterateFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(5), fail(0), dbl, just(TimeUnit.MILLISECONDS.toNanos(50)), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedResultSelectorFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(5), increment, fail(0), just(TimeUnit.MILLISECONDS.toNanos(50)), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedTimeSelectorFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(5), increment, dbl, + fail((Long)null), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void timedTimeSelectorFails2ndCall() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + Observable m = Observable.generate(0, lessThan(5), increment, dbl, + fail(TimeUnit.MILLISECONDS.toNanos(50), 2), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedConditionFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, fail(false), increment, dbl, + delay(50), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedIterateFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), fail(0), dbl, + delay(50), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedResultSelectorFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, fail(0), + delay(50), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedTimeSelectorFails() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, dbl, + fail((Date)null), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void absoluteTimedTimeSelectorFails2ndCall() throws InterruptedException { + TestScheduler scheduler = new TestScheduler(); + + scheduler.advanceTimeBy(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + Observable m = Observable.generateAbsoluteTime(0, lessThan(5), increment, dbl, + fail(delay(50), 2), scheduler); + + m.subscribe(observer); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + + } +}