diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index 56a4664654..d149d02be0 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -49,6 +50,7 @@
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
+import rx.operators.OperationGenerate;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationLast;
@@ -105,6 +107,7 @@
import rx.util.Opening;
import rx.util.Range;
import rx.util.TimeInterval;
+import rx.util.TimeSpan;
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
@@ -5689,5 +5692,110 @@ private boolean isInternalImplementation(Object o) {
return isInternal;
}
}
-
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generate(TState initialState, Func1 condition,
+ Func1 iterate, Func1 resultSelector) {
+ return generate(initialState, condition, iterate, resultSelector,
+ Schedulers.threadPoolForComputation());
+ }
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @param scheduler he scheduler on which to run the generator loop.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generate(TState initialState, Func1 condition,
+ Func1 iterate, Func1 resultSelector, Scheduler scheduler) {
+ return create(OperationGenerate.generate(initialState, condition, iterate,
+ resultSelector, scheduler));
+ }
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @param timeSelector The time selector function to control the speed of values being produced each iteration.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generate(TState initialState,
+ Func1 condition,
+ Func1 iterate,
+ Func1 resultSelector,
+ Func1 timeSelector) {
+ return generate(initialState, condition, iterate, resultSelector,
+ timeSelector, Schedulers.threadPoolForComputation());
+ }
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @param timeSelector The time selector function to control the speed of values being produced each iteration.
+ * @param scheduler he scheduler on which to run the generator loop.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generate(TState initialState,
+ Func1 condition,
+ Func1 iterate,
+ Func1 resultSelector,
+ Func1 timeSelector,
+ Scheduler scheduler) {
+ return create(OperationGenerate.generate(initialState, condition, iterate, resultSelector,
+ timeSelector, scheduler));
+ }
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @param timeSelector The time selector function to control the speed of values being produced each iteration.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generateAbsoluteTime(TState initialState,
+ Func1 condition,
+ Func1 iterate,
+ Func1 resultSelector,
+ Func1 timeSelector
+ ) {
+ return generateAbsoluteTime(initialState, condition, iterate, resultSelector,
+ timeSelector, Schedulers.threadPoolForComputation());
+ }
+ /**
+ * Generates an observable sequence by iterating a state from an initial state until the condition fails.
+ * @param initialState The initial state.
+ * @param condition The condition to terminate generation.
+ * @param iterate The iteration step function.
+ * @param resultSelector The selector function for results produced in the sequence.
+ * @param timeSelector The time selector function to control the speed of values being produced each iteration.
+ * @param scheduler he scheduler on which to run the generator loop.
+ * @return The generated sequence.
+ * @see MSDN: Observable.Generate
+ */
+ public static Observable generateAbsoluteTime(TState initialState,
+ Func1 condition,
+ Func1 iterate,
+ Func1 resultSelector,
+ Func1 timeSelector,
+ Scheduler scheduler) {
+ return create(OperationGenerate.generateAbsoluteTime(initialState, condition,
+ iterate, resultSelector, timeSelector, scheduler));
+ }
}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationGenerate.java b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java
new file mode 100644
index 0000000000..b8191ef60a
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperationGenerate.java
@@ -0,0 +1,225 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package rx.operators;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import rx.Observable.OnSubscribeFunc;
+import rx.Observer;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.TimeSpan;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+
+/**
+ * Generates an observable sequence by iterating a state from an initial state
+ * until the condition returns false.
+ *
+ * Behaves like a generalized for loop.
+ */
+public final class OperationGenerate {
+ /**
+ * Generates an observable sequence by iterating a state from an initial
+ * state until the condition returns false.
+ */
+ public static OnSubscribeFunc generate(
+ final TState initialState,
+ final Func1 condition,
+ final Func1 iterate,
+ final Func1 resultSelector,
+ final Scheduler scheduler) {
+ return new OnSubscribeFunc() {
+ @Override
+ public Subscription onSubscribe(final Observer super R> observer) {
+ return scheduler.schedule(initialState, new Func2() {
+ @Override
+ public Subscription call(Scheduler s, TState state) {
+ boolean hasNext;
+ try {
+ hasNext = condition.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ if (hasNext) {
+ R result;
+ try {
+ result = resultSelector.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ observer.onNext(result);
+
+ TState nextState;
+ try {
+ nextState = iterate.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ return s.schedule(nextState, this);
+ }
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ });
+ }
+ };
+ }
+ /**
+ * Generates an observable sequence by iterating a state, in relative timed fashion,
+ * from an initial state until the condition fails.
+ */
+ public static OnSubscribeFunc generate(
+ final TState initialState,
+ final Func1 condition,
+ final Func1 iterate,
+ final Func1 resultSelector,
+ final Func1 timeSelector,
+ final Scheduler scheduler) {
+ return new OnSubscribeFunc() {
+ @Override
+ public Subscription onSubscribe(final Observer super R> observer) {
+ TimeSpan first;
+ try {
+ first = timeSelector.call(initialState);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ return scheduler.schedule(initialState,
+ new Func2() {
+ @Override
+ public Subscription call(Scheduler s, TState state) {
+ boolean hasNext;
+ try {
+ hasNext = condition.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ if (hasNext) {
+ R result;
+ try {
+ result = resultSelector.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ observer.onNext(result);
+
+ TState nextState;
+ try {
+ nextState = iterate.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ TimeSpan nextDate;
+ try {
+ nextDate = timeSelector.call(initialState);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ return s.schedule(nextState, this, nextDate.value(), nextDate.unit());
+ }
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ }, first.value(), first.unit());
+ }
+ };
+ }
+ /**
+ * Generates an observable sequence by iterating a state, in absolute timed fashion,
+ * from an initial state until the condition fails.
+ */
+ public static OnSubscribeFunc generateAbsoluteTime(
+ final TState initialState,
+ final Func1 condition,
+ final Func1 iterate,
+ final Func1 resultSelector,
+ final Func1 timeSelector,
+ final Scheduler scheduler) {
+ return new OnSubscribeFunc() {
+ @Override
+ public Subscription onSubscribe(final Observer super R> observer) {
+ Date first;
+ try {
+ first = timeSelector.call(initialState);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ long delta = Math.max(0, first.getTime() - scheduler.now());
+
+ return scheduler.schedule(initialState,
+ new Func2() {
+ @Override
+ public Subscription call(Scheduler s, TState state) {
+ boolean hasNext;
+ try {
+ hasNext = condition.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ if (hasNext) {
+ R result;
+ try {
+ result = resultSelector.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+ observer.onNext(result);
+
+ TState nextState;
+ try {
+ nextState = iterate.call(state);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ Date nextDate;
+ try {
+ nextDate = timeSelector.call(initialState);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return Subscriptions.empty();
+ }
+
+ long deltaNext = Math.max(0, nextDate.getTime() - s.now());
+ return s.schedule(nextState, this, deltaNext, TimeUnit.MILLISECONDS);
+ }
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ }, delta, TimeUnit.MILLISECONDS);
+ }
+ };
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/util/TimeSpan.java b/rxjava-core/src/main/java/rx/util/TimeSpan.java
new file mode 100644
index 0000000000..9e171d1d62
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/util/TimeSpan.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Represents a time value and time unit.
+ *
+ * Rx.NET note: System.TimeSpan has a fixed unit of measure of 100 nanoseconds
+ * per value; the Java way is to specify the TimeUnit along with the time value.
+ *
+ */
+public final class TimeSpan implements Comparable {
+ private final long value;
+ private final TimeUnit unit;
+ /** Lazy hash. */
+ private int hash;
+ private TimeSpan(long value, TimeUnit unit) {
+ this.value = value;
+ this.unit = unit;
+ }
+ /**
+ * Create a TimeSpan instance with the given time value and
+ * time unit.
+ * @param value the time value, must be nonnegative
+ * @param unit the time unit
+ * @return the TimeSpan instance with the given time value and
+ * time unit
+ * @throws IllegalArgumentException if the value < 0
+ * @throws NullPointerException if unit is null
+ */
+ public static TimeSpan of(long value, TimeUnit unit) {
+ if (value < 0) {
+ throw new IllegalArgumentException("value negative");
+ }
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+ return new TimeSpan(value, unit);
+ }
+ /**
+ * Returns the time value.
+ * @return the time value
+ */
+ public long value() {
+ return value;
+ }
+ /**
+ * Returns the time unit.
+ * @return the time unit
+ */
+ public TimeUnit unit() {
+ return unit;
+ }
+ /**
+ * Create a new TimeSpan with the same time unit as this
+ * instance and the given newValue as the time value.
+ * @param newValue the new time value
+ * @return the new TimeSpan instance with the new time value
+ * @throws IllegalArgumentException if the value < 0
+ */
+ public TimeSpan withValue(long newValue) {
+ if (newValue < 0) {
+ throw new IllegalArgumentException("newValue negative");
+ }
+ return new TimeSpan(newValue, unit);
+ }
+ /**
+ * Create a new TimeSpan with the same time value as this
+ * instance and the given newUnit as the time unit.
+ * @param newUnit the new time unit
+ * @return the new TimeSpan instance with the new time unit
+ * @throws NullPointerException if unit is null
+ */
+ public TimeSpan withUnit(TimeUnit newUnit) {
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+ return new TimeSpan(value, unit);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (obj.getClass() == getClass()) {
+ TimeSpan other = (TimeSpan)obj;
+ return value == other.value && unit == other.unit;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int h = hash;
+ if (h == 0) {
+ h = (int)(value ^ (value >>> 32));
+ h = h * 31 + unit.hashCode();
+ hash = h;
+ }
+ return h;
+ }
+
+ @Override
+ public int compareTo(TimeSpan o) {
+ long t1 = unit.toNanos(value);
+ long t2 = o.unit.toNanos(value);
+
+ return t1 < t2 ? -1 : (t1 > t2 ? 1 : 0);
+ }
+
+}
diff --git a/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java
new file mode 100644
index 0000000000..52dbcf9864
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/operators/OperationGenerateTest.java
@@ -0,0 +1,373 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import org.mockito.Mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.mockito.MockitoAnnotations;
+import rx.Observable;
+import rx.Observer;
+import rx.concurrency.Schedulers;
+import rx.util.TimeSpan;
+import rx.util.functions.Func1;
+
+/**
+ *
+ */
+public class OperationGenerateTest {
+ @Mock
+ Observer