Skip to content

Operator Generate again #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -51,6 +52,7 @@
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGenerate;
import rx.operators.OperationGroupBy;
import rx.operators.OperationInterval;
import rx.operators.OperationJoinPatterns;
Expand Down Expand Up @@ -5942,5 +5944,113 @@ public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9));
}

/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229642.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generate(TState initialState, Func1<TState, Boolean> condition,
Func1<TState, TState> iterate, Func1<TState, R> resultSelector) {
return generate(initialState, condition, iterate, resultSelector,
Schedulers.threadPoolForComputation());
}
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @param scheduler he scheduler on which to run the generator loop.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh212014.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generate(TState initialState, Func1<TState, Boolean> condition,
Func1<TState, TState> iterate, Func1<TState, R> resultSelector, Scheduler scheduler) {
return create(OperationGenerate.generate(initialState, condition, iterate,
resultSelector, scheduler));
}
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @param timeSelector The time selector function to control the speed of values being produced each iteration. Returns a nanosecond resolution time delay value.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh212066.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generate(TState initialState,
Func1<TState, Boolean> condition,
Func1<TState, TState> iterate,
Func1<TState, R> resultSelector,
Func1<TState, Long> timeSelector) {
return generate(initialState, condition, iterate, resultSelector,
timeSelector, Schedulers.threadPoolForComputation());
}
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @param timeSelector The time selector function to control the speed of values being produced each iteration. Returns a nanosecond resolution time delay value.
* @param scheduler he scheduler on which to run the generator loop.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211794.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generate(TState initialState,
Func1<TState, Boolean> condition,
Func1<TState, TState> iterate,
Func1<TState, R> resultSelector,
Func1<TState, Long> timeSelector,
Scheduler scheduler) {
return create(OperationGenerate.generate(initialState, condition, iterate, resultSelector,
timeSelector, scheduler));
}
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @param timeSelector The time selector function to control the speed of values being produced each iteration.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229270.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generateAbsoluteTime(TState initialState,
Func1<TState, Boolean> condition,
Func1<TState, TState> iterate,
Func1<TState, R> resultSelector,
Func1<TState, Date> timeSelector
) {
return generateAbsoluteTime(initialState, condition, iterate, resultSelector,
timeSelector, Schedulers.threadPoolForComputation());
}
/**
* Generates an observable sequence by iterating a state from an initial state until the condition fails.
* @param initialState The initial state.
* @param condition The condition to terminate generation.
* @param iterate The iteration step function.
* @param resultSelector The selector function for results produced in the sequence.
* @param timeSelector The time selector function to control the speed of values being produced each iteration.
* @param scheduler he scheduler on which to run the generator loop.
* @return The generated sequence.
* @see <a href='http://msdn.microsoft.com/en-us/library/hh244290.aspx'>MSDN: Observable.Generate</a>
*/
public static <TState, R> Observable<R> generateAbsoluteTime(TState initialState,
Func1<TState, Boolean> condition,
Func1<TState, TState> iterate,
Func1<TState, R> resultSelector,
Func1<TState, Date> timeSelector,
Scheduler scheduler) {
return create(OperationGenerate.generateAbsoluteTime(initialState, condition,
iterate, resultSelector, timeSelector, scheduler));
}

}

224 changes: 224 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationGenerate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Generates an observable sequence by iterating a state from an initial state
* until the condition returns false.
* <p>
* Behaves like a generalized for loop.
*/
public final class OperationGenerate {
/**
* Generates an observable sequence by iterating a state from an initial
* state until the condition returns false.
*/
public static <TState, R> OnSubscribeFunc<R> generate(
final TState initialState,
final Func1<TState, Boolean> condition,
final Func1<TState, TState> iterate,
final Func1<TState, R> resultSelector,
final Scheduler scheduler) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return scheduler.schedule(initialState, new Func2<Scheduler, TState, Subscription>() {
@Override
public Subscription call(Scheduler s, TState state) {
boolean hasNext;
try {
hasNext = condition.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
if (hasNext) {
R result;
try {
result = resultSelector.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
observer.onNext(result);

TState nextState;
try {
nextState = iterate.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

return s.schedule(nextState, this);
}
observer.onCompleted();
return Subscriptions.empty();
}
});
}
};
}
/**
* Generates an observable sequence by iterating a state, in relative timed fashion,
* from an initial state until the condition fails.
*/
public static <TState, R> OnSubscribeFunc<R> generate(
final TState initialState,
final Func1<TState, Boolean> condition,
final Func1<TState, TState> iterate,
final Func1<TState, R> resultSelector,
final Func1<TState, Long> timeSelector,
final Scheduler scheduler) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
Long first;
try {
first = timeSelector.call(initialState);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

return scheduler.schedule(initialState,
new Func2<Scheduler, TState, Subscription>() {
@Override
public Subscription call(Scheduler s, TState state) {
boolean hasNext;
try {
hasNext = condition.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
if (hasNext) {
R result;
try {
result = resultSelector.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
observer.onNext(result);

TState nextState;
try {
nextState = iterate.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

Long nextDate;
try {
nextDate = timeSelector.call(initialState);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

return s.schedule(nextState, this, nextDate, TimeUnit.NANOSECONDS);
}
observer.onCompleted();
return Subscriptions.empty();
}
}, first, TimeUnit.NANOSECONDS);
}
};
}
/**
* Generates an observable sequence by iterating a state, in absolute timed fashion,
* from an initial state until the condition fails.
*/
public static <TState, R> OnSubscribeFunc<R> generateAbsoluteTime(
final TState initialState,
final Func1<TState, Boolean> condition,
final Func1<TState, TState> iterate,
final Func1<TState, R> resultSelector,
final Func1<TState, Date> timeSelector,
final Scheduler scheduler) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
Date first;
try {
first = timeSelector.call(initialState);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

long delta = Math.max(0, first.getTime() - scheduler.now());

return scheduler.schedule(initialState,
new Func2<Scheduler, TState, Subscription>() {
@Override
public Subscription call(Scheduler s, TState state) {
boolean hasNext;
try {
hasNext = condition.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
if (hasNext) {
R result;
try {
result = resultSelector.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
observer.onNext(result);

TState nextState;
try {
nextState = iterate.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

Date nextDate;
try {
nextDate = timeSelector.call(initialState);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}

long deltaNext = Math.max(0, nextDate.getTime() - s.now());
return s.schedule(nextState, this, deltaNext, TimeUnit.MILLISECONDS);
}
observer.onCompleted();
return Subscriptions.empty();
}
}, delta, TimeUnit.MILLISECONDS);
}
};
}
}
Loading