diff --git a/rxjava-contrib/rxjava-computation-expressions/build.gradle b/rxjava-contrib/rxjava-computation-expressions/build.gradle
new file mode 100644
index 0000000000..21bc395344
--- /dev/null
+++ b/rxjava-contrib/rxjava-computation-expressions/build.gradle
@@ -0,0 +1,20 @@
+apply plugin: 'osgi'
+
+sourceCompatibility = JavaVersion.VERSION_1_6
+targetCompatibility = JavaVersion.VERSION_1_6
+
+dependencies {
+ compile project(':rxjava-core')
+ testCompile project(":rxjava-core").sourceSets.test.output
+ provided 'junit:junit-dep:4.10'
+ provided 'org.mockito:mockito-core:1.8.5'
+}
+
+jar {
+ manifest {
+ name = 'rxjava-computation-expressions'
+ instruction 'Bundle-Vendor', 'Netflix'
+ instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
+ instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
+ }
+}
diff --git a/rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/Statement.java b/rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/Statement.java
new file mode 100644
index 0000000000..7ed5201327
--- /dev/null
+++ b/rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/Statement.java
@@ -0,0 +1,191 @@
+package rx;
+
+import java.util.Map;
+
+import rx.operators.OperationConditionals;
+import rx.util.functions.Func0;
+
+/**
+ * Imperative statements expressed as Observable operators.
+ */
+public class Statement {
+
+ /**
+ * Return a particular one of several possible Observables based on a case
+ * selector.
+ *
+ *
+ *
+ * @param
+ * the case key type
+ * @param
+ * the result value type
+ * @param caseSelector
+ * the function that produces a case key when an
+ * Observer subscribes
+ * @param mapOfCases
+ * a map that maps a case key to an Observable
+ * @return a particular Observable chosen by key from the map of
+ * Observables, or an empty Observable if no Observable matches the
+ * key
+ */
+ public static Observable switchCase(Func0 extends K> caseSelector,
+ Map super K, ? extends Observable extends R>> mapOfCases) {
+ return switchCase(caseSelector, mapOfCases, Observable. empty());
+ }
+
+ /**
+ * Return a particular one of several possible Observables based on a case
+ * selector and run it on the designated scheduler.
+ *
+ *
+ *
+ * @param
+ * the case key type
+ * @param
+ * the result value type
+ * @param caseSelector
+ * the function that produces a case key when an
+ * Observer subscribes
+ * @param mapOfCases
+ * a map that maps a case key to an Observable
+ * @param scheduler
+ * the scheduler where the empty observable is observed
+ * @return a particular Observable chosen by key from the map of
+ * Observables, or an empty Observable if no Observable matches the
+ * key, but one that runs on the designated scheduler in either case
+ */
+ public static Observable switchCase(Func0 extends K> caseSelector,
+ Map super K, ? extends Observable extends R>> mapOfCases, Scheduler scheduler) {
+ return switchCase(caseSelector, mapOfCases, Observable. empty(scheduler));
+ }
+
+ /**
+ * Return a particular one of several possible Observables based on a case
+ * selector, or a default Observable if the case selector does not map to
+ * a particular one.
+ *
+ *
+ *
+ * @param
+ * the case key type
+ * @param
+ * the result value type
+ * @param caseSelector
+ * the function that produces a case key when an
+ * Observer subscribes
+ * @param mapOfCases
+ * a map that maps a case key to an Observable
+ * @param defaultCase
+ * the default Observable if the {@code mapOfCases} doesn't contain a value for the key returned by the {@case caseSelector}
+ * @return a particular Observable chosen by key from the map of
+ * Observables, or the default case if no Observable matches the key
+ */
+ public static Observable switchCase(Func0 extends K> caseSelector,
+ Map super K, ? extends Observable extends R>> mapOfCases,
+ Observable extends R> defaultCase) {
+ return Observable.create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase));
+ }
+
+ /**
+ * Return an Observable that replays the emissions from the source
+ * Observable, and then continues to replay them so long as a condtion is
+ * true.
+ *
+ *
+ *
+ * @param postCondition
+ * the post condition to test after the source
+ * Observable completes
+ * @return an Observable that replays the emissions from the source
+ * Observable, and then continues to replay them so long as the post
+ * condition is true
+ */
+ public static Observable doWhile(Observable source, Func0 postCondition) {
+ return Observable.create(OperationConditionals.doWhile(source, postCondition));
+ }
+
+ /**
+ * Return an Observable that replays the emissions from the source
+ * Observable so long as a condtion is true.
+ *
+ *
+ *
+ * @param preCondition
+ * the condition to evaluate before subscribing to or
+ * replaying the source Observable
+ * @return an Observable that replays the emissions from the source
+ * Observable so long as preCondition
is true
+ */
+ public static Observable whileDo(Observable source, Func0 preCondition) {
+ return Observable.create(OperationConditionals.whileDo(source, preCondition));
+ }
+
+ /**
+ * Return an Observable that emits the emissions from a specified Observable
+ * if a condition evaluates to true, otherwise return an empty Observable.
+ *
+ *
+ *
+ * @param
+ * the result value type
+ * @param condition
+ * the condition that decides whether to emit the emissions
+ * from the then
Observable
+ * @param then
+ * the Observable sequence to emit to if {@code condition} is {@code true}
+ * @return an Observable that mimics the {@code then} Observable if the {@code condition} function evaluates to true, or an empty
+ * Observable otherwise
+ */
+ public static Observable ifThen(Func0 condition, Observable extends R> then) {
+ return ifThen(condition, then, Observable. empty());
+ }
+
+ /**
+ * Return an Observable that emits the emissions from a specified Observable
+ * if a condition evaluates to true, otherwise return an empty Observable
+ * that runs on a specified Scheduler.
+ *
+ *
+ *
+ * @param
+ * the result value type
+ * @param condition
+ * the condition that decides whether to emit the emissions
+ * from the then
Observable
+ * @param then
+ * the Observable sequence to emit to if {@code condition} is {@code true}
+ * @param scheduler
+ * the Scheduler on which the empty Observable runs if the
+ * in case the condition returns false
+ * @return an Observable that mimics the {@code then} Observable if the {@code condition} function evaluates to true, or an empty
+ * Observable running on the specified Scheduler otherwise
+ */
+ public static Observable ifThen(Func0 condition, Observable extends R> then, Scheduler scheduler) {
+ return ifThen(condition, then, Observable. empty(scheduler));
+ }
+
+ /**
+ * Return an Observable that emits the emissions from one specified
+ * Observable if a condition evaluates to true, or from another specified
+ * Observable otherwise.
+ *
+ *
+ *
+ * @param
+ * the result value type
+ * @param condition
+ * the condition that decides which Observable to emit the
+ * emissions from
+ * @param then
+ * the Observable sequence to emit to if {@code condition} is {@code true}
+ * @param orElse
+ * the Observable sequence to emit to if {@code condition} is {@code false}
+ * @return an Observable that mimics either the {@code then} or {@code orElse} Observables depending on a condition function
+ */
+ public static Observable ifThen(Func0 condition, Observable extends R> then,
+ Observable extends R> orElse) {
+ return Observable.create(OperationConditionals.ifThen(condition, then, orElse));
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationConditionals.java b/rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/operators/OperationConditionals.java
similarity index 78%
rename from rxjava-core/src/main/java/rx/operators/OperationConditionals.java
rename to rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/operators/OperationConditionals.java
index 06e14e4328..8424b90e02 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationConditionals.java
+++ b/rxjava-contrib/rxjava-computation-expressions/src/main/java/rx/operators/OperationConditionals.java
@@ -16,6 +16,7 @@
package rx.operators;
import java.util.Map;
+
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
@@ -29,32 +30,47 @@
*/
public final class OperationConditionals {
/** Utility class. */
- private OperationConditionals() { throw new IllegalStateException("No instances!"); }
+ private OperationConditionals() {
+ throw new IllegalStateException("No instances!");
+ }
+
/**
* Return a subscription function that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to the
* default observable.
- * @param the case key type
- * @param the result value type
- * @param caseSelector the function that produces a case key when an Observer subscribes
- * @param mapOfCases a map that maps a case key to an observable sequence
- * @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for
- * the key returned by the {@case caseSelector}
+ *
+ * @param
+ * the case key type
+ * @param
+ * the result value type
+ * @param caseSelector
+ * the function that produces a case key when an Observer subscribes
+ * @param mapOfCases
+ * a map that maps a case key to an observable sequence
+ * @param defaultCase
+ * the default observable if the {@code mapOfCases} doesn't contain a value for
+ * the key returned by the {@case caseSelector}
* @return a subscription function
*/
public static OnSubscribeFunc switchCase(
- Func0 extends K> caseSelector,
- Map super K, ? extends Observable extends R>> mapOfCases,
+ Func0 extends K> caseSelector,
+ Map super K, ? extends Observable extends R>> mapOfCases,
Observable extends R> defaultCase) {
return new SwitchCase(caseSelector, mapOfCases, defaultCase);
}
+
/**
* Return a subscription function that subscribes to either the
* then or orElse Observables depending on a condition function.
- * @param the result value type
- * @param condition the condition to decide which Observables to subscribe to
- * @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
- * @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false}
+ *
+ * @param
+ * the result value type
+ * @param condition
+ * the condition to decide which Observables to subscribe to
+ * @param then
+ * the Observable sequence to subscribe to if {@code condition} is {@code true}
+ * @param orElse
+ * the Observable sequence to subscribe to if {@code condition} is {@code false}
* @return a subscription function
*/
public static OnSubscribeFunc ifThen(
@@ -63,40 +79,55 @@ public static OnSubscribeFunc ifThen(
Observable extends R> orElse) {
return new IfThen(condition, then, orElse);
}
+
/**
* Return a subscription function that subscribes to the source Observable,
* then resubscribes only if the postCondition evaluates to true.
- * @param the result value type
- * @param source the source Observable
- * @param postCondition the post condition after the source completes
+ *
+ * @param
+ * the result value type
+ * @param source
+ * the source Observable
+ * @param postCondition
+ * the post condition after the source completes
* @return a subscription function.
*/
public static OnSubscribeFunc doWhile(Observable extends T> source, Func0 postCondition) {
return new WhileDoWhile(source, TRUE, postCondition);
}
+
/**
* Return a subscription function that subscribes and resubscribes to the source
* Observable if the preCondition evaluates to true.
- * @param the result value type
- * @param source the source Observable
- * @param preCondition the condition to evaluate before subscribing to source,
- * and subscribe to source if it returns {@code true}
+ *
+ * @param
+ * the result value type
+ * @param source
+ * the source Observable
+ * @param preCondition
+ * the condition to evaluate before subscribing to source,
+ * and subscribe to source if it returns {@code true}
* @return a subscription function.
*/
public static OnSubscribeFunc whileDo(Observable extends T> source, Func0 preCondition) {
return new WhileDoWhile(source, preCondition, preCondition);
}
+
/**
* Select an observable from a map based on a case key returned by a selector
* function when an observer subscribes.
- * @param the case key type
- * @param the result value type
+ *
+ * @param
+ * the case key type
+ * @param
+ * the result value type
*/
private static final class SwitchCase implements OnSubscribeFunc {
final Func0 extends K> caseSelector;
final Map super K, ? extends Observable extends R>> mapOfCases;
final Observable extends R> defaultCase;
- public SwitchCase(Func0 extends K> caseSelector,
+
+ public SwitchCase(Func0 extends K> caseSelector,
Map super K, ? extends Observable extends R>> mapOfCases,
Observable extends R> defaultCase) {
this.caseSelector = caseSelector;
@@ -121,6 +152,7 @@ public Subscription onSubscribe(Observer super R> t1) {
return target.subscribe(t1);
}
}
+
/** Returns always true. */
private static final class Func0True implements Func0 {
@Override
@@ -128,22 +160,28 @@ public Boolean call() {
return true;
}
}
+
/** Returns always true function. */
private static final Func0True TRUE = new Func0True();
+
/**
* Given a condition, subscribe to one of the observables when an Observer
* subscribes.
- * @param the result value type
+ *
+ * @param
+ * the result value type
*/
private static final class IfThen implements OnSubscribeFunc {
final Func0 condition;
final Observable extends R> then;
final Observable extends R> orElse;
+
public IfThen(Func0 condition, Observable extends R> then, Observable extends R> orElse) {
this.condition = condition;
this.then = then;
this.orElse = orElse;
}
+
@Override
public Subscription onSubscribe(Observer super R> t1) {
Observable extends R> target;
@@ -160,21 +198,24 @@ public Subscription onSubscribe(Observer super R> t1) {
return target.subscribe(t1);
}
}
+
/**
* Repeatedly subscribes to the source observable if the pre- or
* postcondition is true.
*
* This combines the While and DoWhile into a single operation through
* the conditions.
- * @param the result value type
+ *
+ * @param
+ * the result value type
*/
private static final class WhileDoWhile implements OnSubscribeFunc {
final Func0 preCondition;
final Func0 postCondition;
final Observable extends T> source;
- public WhileDoWhile(Observable extends T> source,
- Func0 preCondition, Func0 postCondition
- ) {
+
+ public WhileDoWhile(Observable extends T> source,
+ Func0 preCondition, Func0 postCondition) {
this.source = source;
this.preCondition = preCondition;
this.postCondition = postCondition;
@@ -191,19 +232,21 @@ public Subscription onSubscribe(Observer super T> t1) {
}
if (first) {
SerialSubscription ssub = new SerialSubscription();
-
- ssub.setSubscription(source.subscribe(new SourceObserver(t1, ssub)));
-
+
+ ssub.set(source.subscribe(new SourceObserver(t1, ssub)));
+
return ssub;
} else {
t1.onCompleted();
}
return Subscriptions.empty();
}
+
/** Observe the source. */
final class SourceObserver implements Observer {
final SerialSubscription cancel;
final Observer super T> observer;
+
public SourceObserver(Observer super T> observer, SerialSubscription cancel) {
this.observer = observer;
this.cancel = cancel;
@@ -236,7 +279,7 @@ public void onCompleted() {
cancel.unsubscribe();
}
}
-
+
}
}
}
diff --git a/rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java b/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java
similarity index 82%
rename from rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java
rename to rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java
index 3daec59297..f6c4f09ad0 100644
--- a/rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java
+++ b/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java
@@ -15,18 +15,23 @@
*/
package rx.operators;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
-import static org.mockito.Mockito.*;
import org.mockito.MockitoAnnotations;
+
import rx.Observable;
import rx.Observer;
+import rx.Statement;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
@@ -41,13 +46,14 @@ public class OperationConditionalsTest {
Func0 condition;
Func0 conditionError;
int numRecursion = 250;
-
+
@Before
public void before() {
MockitoAnnotations.initMocks(this);
scheduler = new TestScheduler();
func = new Func0() {
int count = 1;
+
@Override
public Integer call() {
return count++;
@@ -55,6 +61,7 @@ public Integer call() {
};
funcError = new Func0() {
int count = 1;
+
@Override
public Integer call() {
if (count == 2) {
@@ -65,15 +72,17 @@ public Integer call() {
};
condition = new Func0() {
boolean r;
+
@Override
public Boolean call() {
r = !r;
return r;
}
-
+
};
conditionError = new Func0() {
boolean r;
+
@Override
public Boolean call() {
r = !r;
@@ -82,9 +91,10 @@ public Boolean call() {
}
return r;
}
-
+
};
}
+
Func0 just(final T value) {
return new Func0() {
@Override
@@ -93,129 +103,132 @@ public T call() {
}
};
}
-
+
@SuppressWarnings("unchecked")
void observe(Observable extends T> source, T... values) {
Observer o = mock(Observer.class);
-
+
Subscription s = source.subscribe(o);
-
+
InOrder inOrder = inOrder(o);
-
+
for (T v : values) {
inOrder.verify(o, times(1)).onNext(v);
}
inOrder.verify(o, times(1)).onCompleted();
verify(o, never()).onError(any(Throwable.class));
-
+
s.unsubscribe();
-
+
inOrder.verifyNoMoreInteractions();
}
-
+
@SuppressWarnings("unchecked")
void observeSequence(Observable extends T> source, Iterable extends T> values) {
Observer o = mock(Observer.class);
-
+
Subscription s = source.subscribe(o);
-
+
InOrder inOrder = inOrder(o);
-
+
for (T v : values) {
inOrder.verify(o, times(1)).onNext(v);
}
inOrder.verify(o, times(1)).onCompleted();
verify(o, never()).onError(any(Throwable.class));
-
+
s.unsubscribe();
-
+
inOrder.verifyNoMoreInteractions();
}
-
+
@SuppressWarnings("unchecked")
void observeError(Observable extends T> source, Class extends Throwable> error, T... valuesBeforeError) {
Observer o = mock(Observer.class);
-
+
Subscription s = source.subscribe(o);
-
+
InOrder inOrder = inOrder(o);
-
+
for (T v : valuesBeforeError) {
inOrder.verify(o, times(1)).onNext(v);
}
inOrder.verify(o, times(1)).onError(any(error));
verify(o, never()).onCompleted();
-
+
s.unsubscribe();
-
+
inOrder.verifyNoMoreInteractions();
}
-
+
@SuppressWarnings("unchecked")
void observeSequenceError(Observable extends T> source, Class extends Throwable> error, Iterable extends T> valuesBeforeError) {
Observer o = mock(Observer.class);
-
+
Subscription s = source.subscribe(o);
-
+
InOrder inOrder = inOrder(o);
-
+
for (T v : valuesBeforeError) {
inOrder.verify(o, times(1)).onNext(v);
}
inOrder.verify(o, times(1)).onError(any(error));
verify(o, never()).onCompleted();
-
+
s.unsubscribe();
-
+
inOrder.verifyNoMoreInteractions();
}
-
+
@Test
public void testSimple() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.from(4, 5, 6);
-
+
Map> map = new HashMap>();
map.put(1, source1);
map.put(2, source2);
-
- Observable result = Observable.switchCase(func, map);
+
+ Observable result = Statement.switchCase(func, map);
observe(result, 1, 2, 3);
observe(result, 4, 5, 6);
observe(result);
}
+
@Test
public void testDefaultCase() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.from(4, 5, 6);
-
+
Map> map = new HashMap>();
map.put(1, source1);
-
- Observable result = Observable.switchCase(func, map, source2);
+
+ Observable result = Statement.switchCase(func, map, source2);
observe(result, 1, 2, 3);
observe(result, 4, 5, 6);
observe(result, 4, 5, 6);
}
+
@Test
public void testCaseSelectorThrows() {
Observable source1 = Observable.from(1, 2, 3);
-
+
Map> map = new HashMap>();
map.put(1, source1);
-
- Observable result = Observable.switchCase(funcError, map);
-
+
+ Observable result = Statement.switchCase(funcError, map);
+
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
+
@Test
public void testMapGetThrows() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.from(4, 5, 6);
-
+
Map> map = new HashMap>() {
@Override
@@ -225,20 +238,21 @@ public Observable get(Object key) {
}
return super.get(key);
}
-
+
};
map.put(1, source1);
map.put(2, source2);
-
- Observable result = Observable.switchCase(func, map);
-
+
+ Observable result = Statement.switchCase(func, map);
+
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
+
@Test
public void testMapContainsKeyThrows() {
Observable source1 = Observable.from(1, 2, 3);
-
+
Map> map = new HashMap>() {
@Override
@@ -248,133 +262,142 @@ public boolean containsKey(Object key) {
}
return super.containsKey(key);
}
-
+
};
map.put(1, source1);
-
- Observable result = Observable.switchCase(func, map);
-
+
+ Observable result = Statement.switchCase(func, map);
+
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
+
@Test
public void testChosenObservableThrows() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.error(new RuntimeException("Forced failure"));
-
+
Map> map = new HashMap>();
map.put(1, source1);
map.put(2, source2);
-
- Observable result = Observable.switchCase(func, map);
-
+
+ Observable result = Statement.switchCase(func, map);
+
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
+
@Test
public void testIfThen() {
Observable source1 = Observable.from(1, 2, 3);
-
- Observable result = Observable.ifThen(condition, source1);
+
+ Observable result = Statement.ifThen(condition, source1);
observe(result, 1, 2, 3);
observe(result);
observe(result, 1, 2, 3);
observe(result);
}
-
+
@Test
public void testIfThenElse() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.from(4, 5, 6);
-
- Observable result = Observable.ifThen(condition, source1, source2);
+
+ Observable result = Statement.ifThen(condition, source1, source2);
observe(result, 1, 2, 3);
observe(result, 4, 5, 6);
observe(result, 1, 2, 3);
observe(result, 4, 5, 6);
}
-
+
@Test
public void testIfThenConditonThrows() {
Observable source1 = Observable.from(1, 2, 3);
-
- Observable result = Observable.ifThen(conditionError, source1);
+
+ Observable result = Statement.ifThen(conditionError, source1);
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
-
+
@Test
public void testIfThenObservableThrows() {
Observable source1 = Observable.error(new RuntimeException("Forced failure!"));
- Observable result = Observable.ifThen(condition, source1);
-
+ Observable result = Statement.ifThen(condition, source1);
+
observeError(result, RuntimeException.class);
observe(result);
observeError(result, RuntimeException.class);
observe(result);
}
+
@Test
public void testIfThenElseObservableThrows() {
Observable source1 = Observable.from(1, 2, 3);
Observable source2 = Observable.error(new RuntimeException("Forced failure!"));
- Observable result = Observable.ifThen(condition, source1, source2);
-
+ Observable result = Statement.ifThen(condition, source1, source2);
+
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
observe(result, 1, 2, 3);
observeError(result, RuntimeException.class);
}
-
+
@Test
public void testDoWhile() {
Observable source1 = Observable.from(1, 2, 3);
-
- Observable result = source1.doWhile(condition);
-
+
+ Observable result = Statement.doWhile(source1, condition);
+
observe(result, 1, 2, 3, 1, 2, 3);
}
+
@Test
public void testDoWhileOnce() {
Observable source1 = Observable.from(1, 2, 3);
-
+
condition.call(); // toggle to false
- Observable result = source1.doWhile(condition);
-
+ Observable result = Statement.doWhile(source1, condition);
+
observe(result, 1, 2, 3);
}
+
@Test
public void testDoWhileConditionThrows() {
Observable source1 = Observable.from(1, 2, 3);
- Observable result = source1.doWhile(conditionError);
-
+ Observable result = Statement.doWhile(source1, conditionError);
+
observeError(result, RuntimeException.class, 1, 2, 3);
}
+
@Test
public void testDoWhileSourceThrows() {
- Observable source1 = Observable.concat(Observable.from(1, 2, 3),
- Observable.error(new RuntimeException("Forced failure!")));
-
- Observable result = source1.doWhile(condition);
+ Observable source1 = Observable.concat(Observable.from(1, 2, 3),
+ Observable. error(new RuntimeException("Forced failure!")));
+
+ Observable result = Statement.doWhile(source1, condition);
observeError(result, RuntimeException.class, 1, 2, 3);
}
- Func0 countdown(final int n) {
+
+ Func0 countdown(final int n) {
return new Func0() {
int count = n;
+
@Override
public Boolean call() {
return count-- > 0;
}
};
}
+
@Test
public void testDoWhileManyTimes() {
Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
@@ -385,32 +408,36 @@ public void testDoWhileManyTimes() {
expected.add(2);
expected.add(3);
}
-
- Observable result = source1.doWhile(countdown(numRecursion));
-
+
+ Observable result = Statement.doWhile(source1, countdown(numRecursion));
+
observeSequence(result, expected);
}
+
@Test
public void testWhileDo() {
Observable source1 = Observable.from(1, 2, 3);
- Observable result = source1.whileDo(countdown(2));
-
+ Observable result = Statement.whileDo(source1, countdown(2));
+
observe(result, 1, 2, 3, 1, 2, 3);
}
+
@Test
public void testWhileDoOnce() {
Observable source1 = Observable.from(1, 2, 3);
- Observable result = source1.whileDo(countdown(1));
-
+ Observable result = Statement.whileDo(source1, countdown(1));
+
observe(result, 1, 2, 3);
}
+
@Test
public void testWhileDoZeroTimes() {
Observable source1 = Observable.from(1, 2, 3);
- Observable result = source1.whileDo(countdown(0));
-
+ Observable result = Statement.whileDo(source1, countdown(0));
+
observe(result);
}
+
@Test
public void testWhileDoManyTimes() {
Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
@@ -421,32 +448,35 @@ public void testWhileDoManyTimes() {
expected.add(2);
expected.add(3);
}
-
- Observable result = source1.whileDo(countdown(numRecursion));
-
+
+ Observable result = Statement.whileDo(source1, countdown(numRecursion));
+
observeSequence(result, expected);
}
+
@Test
public void testWhileDoConditionThrows() {
Observable source1 = Observable.from(1, 2, 3);
- Observable result = source1.whileDo(conditionError);
-
+ Observable result = Statement.whileDo(source1, conditionError);
+
observeError(result, RuntimeException.class, 1, 2, 3);
}
+
@Test
public void testWhileDoConditionThrowsImmediately() {
Observable source1 = Observable.from(1, 2, 3);
conditionError.call();
- Observable result = source1.whileDo(conditionError);
-
+ Observable result = Statement.whileDo(source1, conditionError);
+
observeError(result, RuntimeException.class);
}
+
@Test
public void testWhileDoSourceThrows() {
- Observable source1 = Observable.concat(Observable.from(1, 2, 3),
- Observable.error(new RuntimeException("Forced failure!")));
-
- Observable result = source1.whileDo(condition);
+ Observable source1 = Observable.concat(Observable.from(1, 2, 3),
+ Observable. error(new RuntimeException("Forced failure!")));
+
+ Observable result = Statement.whileDo(source1, condition);
observeError(result, RuntimeException.class, 1, 2, 3);
}
diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index e311414af8..80bc473bc5 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -15,7 +15,6 @@
*/
package rx;
-import static org.junit.Assert.*;
import static rx.util.functions.Functions.*;
import java.util.ArrayList;
@@ -43,7 +42,6 @@
import rx.operators.OperationCast;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
-import rx.operators.OperationConditionals;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
@@ -1995,164 +1993,6 @@ public static Observable switchLatest(Observable extends Observable e
return create(OperationSwitch.switchDo(sequenceOfSequences));
}
- /**
- * Return a particular one of several possible Observables based on a case
- * selector.
- *
- *
- *
- * @param the case key type
- * @param the result value type
- * @param caseSelector the function that produces a case key when an
- * Observer subscribes
- * @param mapOfCases a map that maps a case key to an Observable
- * @return a particular Observable chosen by key from the map of
- * Observables, or an empty Observable if no Observable matches the
- * key
- */
- public static Observable switchCase(Func0 extends K> caseSelector,
- Map super K, ? extends Observable extends R>> mapOfCases) {
- return switchCase(caseSelector, mapOfCases, Observable.empty());
- }
-
- /**
- * Return a particular one of several possible Observables based on a case
- * selector and run it on the designated scheduler.
- *
- *
- *
- * @param the case key type
- * @param the result value type
- * @param caseSelector the function that produces a case key when an
- * Observer subscribes
- * @param mapOfCases a map that maps a case key to an Observable
- * @param scheduler the scheduler where the empty observable is observed
- * @return a particular Observable chosen by key from the map of
- * Observables, or an empty Observable if no Observable matches the
- * key, but one that runs on the designated scheduler in either case
- */
- public static Observable switchCase(Func0 extends K> caseSelector,
- Map super K, ? extends Observable extends R>> mapOfCases, Scheduler scheduler) {
- return switchCase(caseSelector, mapOfCases, Observable.empty(scheduler));
- }
- /**
- * Return a particular one of several possible Observables based on a case
- * selector, or a default Observable if the case selector does not map to
- * a particular one.
- *
- *
- *
- * @param the case key type
- * @param the result value type
- * @param caseSelector the function that produces a case key when an
- * Observer subscribes
- * @param mapOfCases a map that maps a case key to an Observable
- * @param defaultCase the default Observable if the {@code mapOfCases}
- * doesn't contain a value for the key returned by the
- * {@case caseSelector}
- * @return a particular Observable chosen by key from the map of
- * Observables, or the default case if no Observable matches the key
- */
- public static Observable switchCase(Func0 extends K> caseSelector,
- Map super K, ? extends Observable extends R>> mapOfCases,
- Observable extends R> defaultCase) {
- return create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase));
- }
-
- /**
- * Return an Observable that replays the emissions from the source
- * Observable, and then continues to replay them so long as a condtion is
- * true.
- *
- *
- *
- * @param postCondition the post condition to test after the source
- * Observable completes
- * @return an Observable that replays the emissions from the source
- * Observable, and then continues to replay them so long as the post
- * condition is true
- */
- public Observable doWhile(Func0 postCondition) {
- return create(OperationConditionals.doWhile(this, postCondition));
- }
-
- /**
- * Return an Observable that replays the emissions from the source
- * Observable so long as a condtion is true.
- *
- *
- *
- * @param preCondition the condition to evaluate before subscribing to or
- * replaying the source Observable
- * @return an Observable that replays the emissions from the source
- * Observable so long as preCondition
is true
- */
- public Observable whileDo(Func0 preCondition) {
- return create(OperationConditionals.whileDo(this, preCondition));
- }
-
- /**
- * Return an Observable that emits the emissions from a specified Observable
- * if a condition evaluates to true, otherwise return an empty Observable.
- *
- *
- *
- * @param the result value type
- * @param condition the condition that decides whether to emit the emissions
- * from the then
Observable
- * @param then the Observable sequence to emit to if {@code condition} is
- * {@code true}
- * @return an Observable that mimics the {@code then} Observable if the
- * {@code condition} function evaluates to true, or an empty
- * Observable otherwise
- */
- public static Observable ifThen(Func0 condition, Observable extends R> then) {
- return ifThen(condition, then, Observable.empty());
- }
-
- /**
- * Return an Observable that emits the emissions from a specified Observable
- * if a condition evaluates to true, otherwise return an empty Observable
- * that runs on a specified Scheduler.
- *
- *
- *
- * @param the result value type
- * @param condition the condition that decides whether to emit the emissions
- * from the then
Observable
- * @param then the Observable sequence to emit to if {@code condition} is
- * {@code true}
- * @param scheduler the Scheduler on which the empty Observable runs if the
- * in case the condition returns false
- * @return an Observable that mimics the {@code then} Observable if the
- * {@code condition} function evaluates to true, or an empty
- * Observable running on the specified Scheduler otherwise
- */
- public static Observable ifThen(Func0 condition, Observable extends R> then, Scheduler scheduler) {
- return ifThen(condition, then, Observable.empty(scheduler));
- }
-
- /**
- * Return an Observable that emits the emissions from one specified
- * Observable if a condition evaluates to true, or from another specified
- * Observable otherwise.
- *
- *
- *
- * @param the result value type
- * @param condition the condition that decides which Observable to emit the
- * emissions from
- * @param then the Observable sequence to emit to if {@code condition} is
- * {@code true}
- * @param orElse the Observable sequence to emit to if {@code condition} is
- * {@code false}
- * @return an Observable that mimics either the {@code then} or
- * {@code orElse} Observables depending on a condition function
- */
- public static Observable ifThen(Func0 condition, Observable extends R> then,
- Observable extends R> orElse) {
- return create(OperationConditionals.ifThen(condition, then, orElse));
- }
/**
* Accepts an Observable and wraps it in another Observable that ensures
diff --git a/settings.gradle b/settings.gradle
index c40f29adb6..22dd94ec82 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -9,4 +9,5 @@ include 'rxjava-core', \
'rxjava-contrib:rxjava-android', \
'rxjava-contrib:rxjava-apache-http', \
'rxjava-contrib:rxjava-string', \
-'rxjava-contrib:rxjava-async-util'
+'rxjava-contrib:rxjava-async-util', \
+'rxjava-contrib:rxjava-computation-expressions'