diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index d24e48106c..eef6dfe5ff 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -41,6 +41,8 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
+import rx.operators.OperationTake;
+import rx.operators.OperationTakeWhile;
import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
@@ -48,18 +50,20 @@
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
+import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
+import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSynchronize;
-import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
+import rx.operators.OperationWhere;
import rx.operators.OperationZip;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorTakeUntil;
@@ -728,7 +732,7 @@ public Boolean call(T t1) {
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
- *
+ *
* @param that
* the Observable to filter
* @param predicate
@@ -783,6 +787,36 @@ public static Observable range(int start, int count) {
return from(Range.createWithCount(start, count));
}
+ /**
+ * Asynchronously subscribes and unsubscribes observers on the specified scheduler.
+ *
+ * @param source
+ * the source observable.
+ * @param scheduler
+ * the scheduler to perform subscription and unsubscription actions on.
+ * @param
+ * the type of observable.
+ * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
+ */
+ public static Observable subscribeOn(Observable source, Scheduler scheduler) {
+ return _create(OperationSubscribeOn.subscribeOn(source, scheduler));
+ }
+
+ /**
+ * Asynchronously notify observers on the specified scheduler.
+ *
+ * @param source
+ * the source observable.
+ * @param scheduler
+ * the scheduler to notify observers on.
+ * @param
+ * the type of observable.
+ * @return the source sequence whose observations happen on the specified scheduler.
+ */
+ public static Observable observeOn(Observable source, Scheduler scheduler) {
+ return _create(OperationObserveOn.observeOn(source, scheduler));
+ }
+
/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
@@ -1779,7 +1813,7 @@ public static Observable takeLast(final Observable items, final int co
* @return
*/
public static Observable takeWhile(final Observable items, Func1 predicate) {
- return create(OperationTake.takeWhile(items, predicate));
+ return create(OperationTakeWhile.takeWhile(items, predicate));
}
/**
@@ -1811,16 +1845,18 @@ public Boolean call(T t) {
* @return
*/
public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) {
- return create(OperationTake.takeWhileWithIndex(items, predicate));
+ return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
}
public static Observable takeWhileWithIndex(final Observable items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);
- return create(OperationTake.takeWhileWithIndex(items, new Func2() {
+ return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2()
+ {
@Override
- public Boolean call(T t, Integer integer) {
+ public Boolean call(T t, Integer integer)
+ {
return (Boolean) _f.call(t, integer);
}
}));
@@ -2469,7 +2505,7 @@ public Boolean call(T t1) {
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
- *
+ *
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning
* true
if they pass the filter
@@ -2650,6 +2686,28 @@ public Observable> materialize() {
return materialize(this);
}
+ /**
+ * Asynchronously subscribes and unsubscribes observers on the specified scheduler.
+ *
+ * @param scheduler
+ * the scheduler to perform subscription and unsubscription actions on.
+ * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
+ */
+ public Observable subscribeOn(Scheduler scheduler) {
+ return subscribeOn(this, scheduler);
+ }
+
+ /**
+ * Asynchronously notify observers on the specified scheduler.
+ *
+ * @param scheduler
+ * the scheduler to notify observers on.
+ * @return the source sequence whose observations happen on the specified scheduler.
+ */
+ public Observable observeOn(Scheduler scheduler) {
+ return observeOn(this, scheduler);
+ }
+
/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
@@ -2660,7 +2718,7 @@ public Observable> materialize() {
*/
@SuppressWarnings("unchecked")
public Observable dematerialize() {
- return dematerialize((Observable>)this);
+ return dematerialize((Observable>) this);
}
/**
@@ -3005,7 +3063,9 @@ public Observable scan(final T initialValue, final Object accumulator) {
/**
* Determines whether all elements of an observable sequence satisfies a condition.
- * @param predicate a function to test each element for a condition.
+ *
+ * @param predicate
+ * a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable all(Func1 predicate) {
@@ -3014,7 +3074,9 @@ public Observable all(Func1 predicate) {
/**
* Determines whether all elements of an observable sequence satisfies a condition.
- * @param predicate a function to test each element for a condition.
+ *
+ * @param predicate
+ * a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable all(Object predicate) {
diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java
new file mode 100644
index 0000000000..b4d2c5d471
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/Scheduler.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx;
+
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Represents an object that schedules units of work.
+ */
+public interface Scheduler {
+
+ /**
+ * Schedules a cancelable action to be executed.
+ *
+ * @param action action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ Subscription schedule(Func0 action);
+
+ /**
+ * Schedules an action to be executed.
+ *
+ * @param action action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ Subscription schedule(Action0 action);
+
+ /**
+ * Schedules an action to be executed in dueTime.
+ *
+ * @param action action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
+
+ /**
+ * Schedules a cancelable action to be executed in dueTime.
+ *
+ * @param action action
+ * @return a subscription to be able to unsubscribe from action.
+ */
+ Subscription schedule(Func0 action, long dueTime, TimeUnit unit);
+
+ /**
+ * Returns the scheduler's notion of current time.
+ */
+ long now();
+
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
new file mode 100644
index 0000000000..8ad8b436a0
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Scheduler;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractScheduler implements Scheduler {
+
+ @Override
+ public Subscription schedule(Action0 action) {
+ return schedule(asFunc0(action));
+ }
+
+ @Override
+ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
+ return schedule(asFunc0(action), dueTime, unit);
+ }
+
+ @Override
+ public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
+ return schedule(new SleepingAction(action, this, dueTime, unit));
+ }
+
+ @Override
+ public long now() {
+ return System.nanoTime();
+ }
+
+ private static Func0 asFunc0(final Action0 action) {
+ return new Func0() {
+ @Override
+ public Subscription call() {
+ action.call();
+ return Subscriptions.empty();
+ }
+ };
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java
new file mode 100644
index 0000000000..c1a8eed455
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+import rx.Subscription;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.mockito.Mockito.*;
+
+public class CurrentThreadScheduler extends AbstractScheduler {
+ private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
+
+ public static CurrentThreadScheduler getInstance() {
+ return INSTANCE;
+ }
+
+ private static final ThreadLocal> QUEUE = new ThreadLocal>();
+
+// private static final ThreadLocal> QUEUE = new ThreadLocal>() {
+// @Override
+// protected Queue initialValue() {
+// return new LinkedList();
+// }
+//};
+
+ private CurrentThreadScheduler() {
+ }
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ DiscardableAction discardableAction = new DiscardableAction(action);
+ enqueue(discardableAction);
+ return discardableAction;
+ }
+
+ private void enqueue(DiscardableAction action) {
+ Queue queue = QUEUE.get();
+ boolean exec = queue == null;
+
+ if (exec) {
+ queue = new LinkedList();
+ QUEUE.set(queue);
+ }
+
+ // this means enqueue will always synchronously execute everything on the current thread ...
+ // so how would there ever be more than one item in the queue?
+ // SleepingAction is also blocking so it wouldn't skip those, it would block until it finishes sleeping
+ // and if it did skip something what would ever trigger it eventually being executed unless something else
+ // is enqueued?
+
+ queue.add(action);
+
+ if (exec) {
+ System.out.println("exec");
+ while (!queue.isEmpty()) {
+ System.out.println("call in queue");
+ queue.poll().call();
+ }
+
+ QUEUE.set(null);
+ }
+ }
+
+ public static class UnitTest {
+
+ @Test
+ public void testNestedActions() {
+ final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
+
+ final Action0 firstStepStart = mock(Action0.class);
+ final Action0 firstStepEnd = mock(Action0.class);
+
+ final Action0 secondStepStart = mock(Action0.class);
+ final Action0 secondStepEnd = mock(Action0.class);
+
+ final Action0 thirdStepStart = mock(Action0.class);
+ final Action0 thirdStepEnd = mock(Action0.class);
+
+ final Action0 firstAction = new Action0() {
+ @Override
+ public void call() {
+ firstStepStart.call();
+ firstStepEnd.call();
+ }
+ };
+ final Action0 secondAction = new Action0() {
+ @Override
+ public void call() {
+ secondStepStart.call();
+ scheduler.schedule(firstAction);
+ secondStepEnd.call();
+
+ }
+ };
+ final Action0 thirdAction = new Action0() {
+ @Override
+ public void call() {
+ thirdStepStart.call();
+ scheduler.schedule(secondAction);
+ thirdStepEnd.call();
+ }
+ };
+
+ InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);
+
+ scheduler.schedule(thirdAction);
+
+ inOrder.verify(thirdStepStart, times(1)).call();
+ inOrder.verify(thirdStepEnd, times(1)).call();
+ inOrder.verify(secondStepStart, times(1)).call();
+ inOrder.verify(secondStepEnd, times(1)).call();
+ inOrder.verify(firstStepStart, times(1)).call();
+ inOrder.verify(firstStepEnd, times(1)).call();
+ }
+
+ @Test
+ public void testSequenceOfActions() {
+ final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
+
+ final Action0 first = mock(Action0.class);
+ final Action0 second = mock(Action0.class);
+
+ scheduler.schedule(first);
+ scheduler.schedule(second);
+
+ verify(first, times(1)).call();
+ verify(second, times(1)).call();
+
+ }
+
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java
new file mode 100644
index 0000000000..632ec69b1a
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Subscription;
+import rx.util.AtomicObservableSubscription;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DiscardableAction implements Func0, Subscription {
+ private final Func0 underlying;
+
+ private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
+ private final AtomicBoolean ready = new AtomicBoolean(true);
+
+ public DiscardableAction(Func0 underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public Subscription call() {
+ if (ready.compareAndSet(true, false)) {
+ Subscription subscription = underlying.call();
+ wrapper.wrap(subscription);
+ return subscription;
+ }
+ return wrapper;
+ }
+
+ @Override
+ public void unsubscribe() {
+ ready.set(false);
+ wrapper.unsubscribe();
+ }
+}
+
diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
new file mode 100644
index 0000000000..55427c874e
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Subscription;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorScheduler extends AbstractScheduler {
+ private final Executor executor;
+
+ public ExecutorScheduler(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
+
+ // this should delegate to ScheduledExecutorServiceScheduler
+ // and that should be an implemenation detail I think ... not be a choice someone needs to make
+
+ return super.schedule(action, dueTime, unit);
+ }
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ final DiscardableAction discardableAction = new DiscardableAction(action);
+
+ // if it's a delayed Action (has a TimeUnit) then we should use a timer
+ // otherwise it will tie up a thread and sleep
+ // ... see the method above ...
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ discardableAction.call();
+ }
+ });
+
+ return discardableAction;
+
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java
new file mode 100644
index 0000000000..a32f72aa9f
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Scheduler;
+import rx.Subscription;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.TimeUnit;
+
+public class ForwardingScheduler implements Scheduler {
+ private final Scheduler underlying;
+
+ public ForwardingScheduler(Scheduler underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public Subscription schedule(Action0 action) {
+ return underlying.schedule(action);
+ }
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ return underlying.schedule(action);
+ }
+
+ @Override
+ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
+ return underlying.schedule(action, dueTime, unit);
+ }
+
+ @Override
+ public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
+ return underlying.schedule(action, dueTime, unit);
+ }
+
+ @Override
+ public long now() {
+ return underlying.now();
+ }
+}
\ No newline at end of file
diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java
new file mode 100644
index 0000000000..22c49b6437
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+public final class ImmediateScheduler extends AbstractScheduler {
+ private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
+
+ public static ImmediateScheduler getInstance() {
+ return INSTANCE;
+ }
+
+ private ImmediateScheduler() {
+ }
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ // do we need to wrap this when we're executing it directly?
+ // the Func0 already returns a subscription so does this buy us anything?
+ // it will only ever return if the Func0 is actually async anyways and if it's async its
+ // subscription would do what's needed
+ DiscardableAction discardableAction = new DiscardableAction(action);
+ discardableAction.call();
+ return discardableAction;
+ }
+
+ public static class UnitTest {
+
+ @Test
+ public void testNestedActions() {
+ final ImmediateScheduler scheduler = new ImmediateScheduler();
+
+ final Action0 firstStepStart = mock(Action0.class);
+ final Action0 firstStepEnd = mock(Action0.class);
+
+ final Action0 secondStepStart = mock(Action0.class);
+ final Action0 secondStepEnd = mock(Action0.class);
+
+ final Action0 thirdStepStart = mock(Action0.class);
+ final Action0 thirdStepEnd = mock(Action0.class);
+
+ final Action0 firstAction = new Action0() {
+ @Override
+ public void call() {
+ firstStepStart.call();
+ firstStepEnd.call();
+ }
+ };
+ final Action0 secondAction = new Action0() {
+ @Override
+ public void call() {
+ secondStepStart.call();
+ scheduler.schedule(firstAction);
+ secondStepEnd.call();
+
+ }
+ };
+ final Action0 thirdAction = new Action0() {
+ @Override
+ public void call() {
+ thirdStepStart.call();
+ scheduler.schedule(secondAction);
+ thirdStepEnd.call();
+ }
+ };
+
+ InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);
+
+ scheduler.schedule(thirdAction);
+
+ inOrder.verify(thirdStepStart, times(1)).call();
+ inOrder.verify(secondStepStart, times(1)).call();
+ inOrder.verify(firstStepStart, times(1)).call();
+ inOrder.verify(firstStepEnd, times(1)).call();
+ inOrder.verify(secondStepEnd, times(1)).call();
+ inOrder.verify(thirdStepEnd, times(1)).call();
+ }
+
+ }
+
+
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
new file mode 100644
index 0000000000..a78c9633f1
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Subscription;
+import rx.util.functions.Func0;
+
+public class NewThreadScheduler extends AbstractScheduler {
+ private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
+
+ public static NewThreadScheduler getInstance() {
+ return INSTANCE;
+ }
+
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ final DiscardableAction discardableAction = new DiscardableAction(action);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ discardableAction.call();
+ }
+ });
+
+ t.start();
+
+ return discardableAction;
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java
new file mode 100644
index 0000000000..a219dd5f61
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Subscription;
+import rx.util.functions.Func0;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduledExecutorServiceScheduler extends AbstractScheduler {
+ private final ScheduledExecutorService executorService;
+
+
+ // this should probably just become an implementation detail of ExecutorScheduler
+
+
+ public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ @Override
+ public Subscription schedule(Func0 action) {
+ return schedule(action, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
+ final DiscardableAction discardableAction = new DiscardableAction(action);
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ discardableAction.call();
+ }
+ }, dueTime, unit);
+ return discardableAction;
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java
new file mode 100644
index 0000000000..9748c660d1
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import rx.Scheduler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class Schedulers {
+ private Schedulers() {
+
+ }
+
+ public static Scheduler immediate() {
+ return ImmediateScheduler.getInstance();
+ }
+
+ public static Scheduler currentThread() {
+ return CurrentThreadScheduler.getInstance();
+ }
+
+ public static Scheduler newThread() {
+ return NewThreadScheduler.getInstance();
+ }
+
+ public static Scheduler executor(Executor executor) {
+ return new ExecutorScheduler(executor);
+ }
+
+ // do we need this one?
+ // since the Scheduler interface allows both scheduled and non-scheduled it seems awkward to make someone choose what scheduler to use
+ // because the wrong choice will make the Scheduler not work correctly if a TimeUnit is given
+ public static Scheduler scheduledExecutor(ScheduledExecutorService executor) {
+ return new ScheduledExecutorServiceScheduler(executor);
+ }
+
+ public static Scheduler forwardingScheduler(Scheduler underlying) {
+ return new ForwardingScheduler(underlying);
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
new file mode 100644
index 0000000000..6f845aee8e
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import java.util.concurrent.TimeUnit;
+
+import rx.Scheduler;
+import rx.Subscription;
+import rx.util.functions.Func0;
+
+public class SleepingAction implements Func0 {
+ private final Func0 underlying;
+ private final Scheduler scheduler;
+ private final long execTime;
+
+ public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
+ this.underlying = underlying;
+ this.scheduler = scheduler;
+ this.execTime = scheduler.now() + timeUnit.toMillis(timespan);
+ }
+
+ @Override
+ public Subscription call() {
+ if (execTime < scheduler.now()) {
+ try {
+ // this will block the current thread ... which doesn't seem to work well with CurrentThreadScheduler
+ // shouldn't CurrentThreadScheduler be capable of doing other things while this is sleeping?
+ // In fact, this will block any of the concurrent systems -- it will take up a thread in a threadpool and make it sleep
+ // whereas I would think it should schedule itself on a timer
+ Thread.sleep(scheduler.now() - execTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ return underlying.call();
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java
new file mode 100644
index 0000000000..ec38b90d0e
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java
@@ -0,0 +1,45 @@
+package rx.observables;
+
+import rx.Observer;
+import rx.Scheduler;
+import rx.util.functions.Action0;
+
+public class ScheduledObserver implements Observer {
+ private final Observer underlying;
+ private final Scheduler scheduler;
+
+ public ScheduledObserver(Observer underlying, Scheduler scheduler) {
+ this.underlying = underlying;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void onCompleted() {
+ scheduler.schedule(new Action0() {
+ @Override
+ public void call() {
+ underlying.onCompleted();
+ }
+ });
+ }
+
+ @Override
+ public void onError(final Exception e) {
+ scheduler.schedule(new Action0() {
+ @Override
+ public void call() {
+ underlying.onError(e);
+ }
+ });
+ }
+
+ @Override
+ public void onNext(final T args) {
+ scheduler.schedule(new Action0() {
+ @Override
+ public void call() {
+ underlying.onNext(args);
+ }
+ });
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/operators/AbstractOperation.java b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java
new file mode 100644
index 0000000000..c76deec109
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java
@@ -0,0 +1,256 @@
+package rx.operators;
+
+import static org.junit.Assert.*;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Func1;
+
+/**
+ * Common utility functions for operator implementations and tests.
+ */
+/* package */class AbstractOperation
+{
+ private AbstractOperation() {
+ }
+
+ public static class UnitTest {
+
+ public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source)
+ {
+ return new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ return source.call(new TestingObserver(observer));
+ }
+ };
+ }
+
+ public static class TestingObserver implements Observer {
+
+ private final Observer actual;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
+ private final AtomicBoolean isInCallback = new AtomicBoolean(false);
+
+ public TestingObserver(Observer actual) {
+ this.actual = actual;
+ }
+
+ @Override
+ public void onCompleted() {
+ assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
+ assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
+ actual.onCompleted();
+ isInCallback.set(false);
+ }
+
+ @Override
+ public void onError(Exception e) {
+ assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
+ assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
+ actual.onError(e);
+ isInCallback.set(false);
+ }
+
+ @Override
+ public void onNext(T args) {
+ assertFalse("previous call to onCompleted() or onError()", isFinished.get());
+ assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
+ actual.onNext(args);
+ isInCallback.set(false);
+ }
+
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testDoubleCompleted() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onCompleted();
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testCompletedError() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onCompleted();
+ observer.onError(new Exception());
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testCompletedNext() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onCompleted();
+ observer.onNext("one");
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testErrorCompleted() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onError(new Exception());
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testDoubleError() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onError(new Exception());
+ observer.onError(new Exception());
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testErrorNext() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onError(new Exception());
+ observer.onNext("one");
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test
+ public void testNextCompleted() {
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onNext("one");
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+ })).lastOrDefault("end");
+ }
+
+ @Test
+ public void testConcurrentNextNext() {
+ final List threads = new ArrayList();
+ final AtomicReference threadFailure = new AtomicReference();
+ Observable.create(assertTrustedObservable(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(final Observer observer)
+ {
+ threads.add(new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ observer.onNext("one");
+ }
+ }));
+ threads.add(new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ observer.onNext("two");
+ }
+ }));
+ return Subscriptions.empty();
+ }
+ })).subscribe(new SlowObserver());
+ for (Thread thread : threads) {
+ thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable)
+ {
+ threadFailure.set(throwable);
+ }
+ });
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ // Junit seems pretty bad about exposing test failures inside of created threads.
+ assertNotNull("exception thrown by thread", threadFailure.get());
+ assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass());
+ }
+
+ private static class SlowObserver implements Observer
+ {
+ @Override
+ public void onCompleted()
+ {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ @Override
+ public void onError(Exception e)
+ {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ @Override
+ public void onNext(String args)
+ {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
new file mode 100644
index 0000000000..16eed89270
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import org.junit.Test;
+import rx.Observable;
+import rx.Observer;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.concurrency.Schedulers;
+import rx.observables.ScheduledObserver;
+import rx.util.functions.Action0;
+import rx.util.functions.Func1;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class OperationObserveOn {
+
+ public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) {
+ return new ObserveOn(source, scheduler);
+ }
+
+ private static class ObserveOn implements Func1, Subscription> {
+ private final Observable source;
+ private final Scheduler scheduler;
+
+ public ObserveOn(Observable source, Scheduler scheduler) {
+ this.source = source;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public Subscription call(final Observer observer) {
+ return source.subscribe(new ScheduledObserver(observer, scheduler));
+ }
+ }
+
+ public static class UnitTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testObserveOn() {
+
+ Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate()));
+
+ Observer observer = mock(Observer.class);
+ Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);
+
+ verify(scheduler, times(4)).schedule(any(Action0.class));
+ verifyNoMoreInteractions(scheduler);
+
+ verify(observer, times(1)).onNext(1);
+ verify(observer, times(1)).onNext(2);
+ verify(observer, times(1)).onNext(3);
+ verify(observer, times(1)).onCompleted();
+ }
+
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java
new file mode 100644
index 0000000000..104a134657
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import org.junit.Test;
+import rx.Observable;
+import rx.Observer;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.concurrency.Schedulers;
+import rx.util.functions.Action0;
+import rx.util.functions.Func0;
+import rx.util.functions.Func1;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+public class OperationSubscribeOn {
+
+ public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) {
+ return new SubscribeOn(source, scheduler);
+ }
+
+ private static class SubscribeOn implements Func1, Subscription> {
+ private final Observable source;
+ private final Scheduler scheduler;
+
+ public SubscribeOn(Observable source, Scheduler scheduler) {
+ this.source = source;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public Subscription call(final Observer observer) {
+ return scheduler.schedule(new Func0() {
+ @Override
+ public Subscription call() {
+ return new ScheduledSubscription(source.subscribe(observer), scheduler);
+ }
+ });
+ }
+ }
+
+ private static class ScheduledSubscription implements Subscription {
+ private final Subscription underlying;
+ private final Scheduler scheduler;
+
+ private ScheduledSubscription(Subscription underlying, Scheduler scheduler) {
+ this.underlying = underlying;
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void unsubscribe() {
+ scheduler.schedule(new Action0() {
+ @Override
+ public void call() {
+ underlying.unsubscribe();
+ }
+ });
+ }
+ }
+
+ public static class UnitTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSubscribeOn() {
+ Observable w = Observable.toObservable(1, 2, 3);
+
+ Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate()));
+
+ Observer observer = mock(Observer.class);
+ Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer);
+
+ verify(scheduler, times(1)).schedule(any(Func0.class));
+ subscription.unsubscribe();
+ verify(scheduler, times(1)).schedule(any(Action0.class));
+ verifyNoMoreInteractions(scheduler);
+
+ verify(observer, times(1)).onNext(1);
+ verify(observer, times(1)).onNext(2);
+ verify(observer, times(1)).onNext(3);
+ verify(observer, times(1)).onCompleted();
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java
index e86263e562..5ea6b627e4 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationTake.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java
@@ -18,7 +18,9 @@
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import static rx.operators.AbstractOperation.UnitTest.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
@@ -26,10 +28,10 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
+import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;
-import rx.util.functions.Func2;
-import rx.subjects.Subject;
+
/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*/
@@ -43,61 +45,17 @@ public final class OperationTake {
* @return
*/
public static Func1, Subscription> take(final Observable items, final int num) {
- return takeWhileWithIndex(items, OperationTake. numPredicate(num));
- }
-
- /**
- * Returns a specified number of contiguous values from the start of an observable sequence.
- *
- * @param items
- * @param predicate
- * a function to test each source element for a condition
- * @return
- */
- public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) {
- return takeWhileWithIndex(items, OperationTake. skipIndex(predicate));
- }
-
- /**
- * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
- *
- * @param items
- * @param predicate
- * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
- * @return
- */
- public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) {
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe.
return new Func1, Subscription>() {
@Override
public Subscription call(Observer observer) {
- return new TakeWhile(items, predicate).call(observer);
- }
-
- };
- }
-
- private static Func2 numPredicate(final int num) {
- return new Func2() {
-
- @Override
- public Boolean call(T input, Integer index) {
- return index < num;
+ return new Take(items, num).call(observer);
}
};
}
- private static Func2 skipIndex(final Func1 underlying) {
- return new Func2() {
- @Override
- public Boolean call(T input, Integer index) {
- return underlying.call(input);
- }
- };
- }
-
/**
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
*
@@ -109,19 +67,41 @@ public Boolean call(T input, Integer index) {
*
* @param
*/
- private static class TakeWhile implements Func1, Subscription> {
+ private static class Take implements Func1, Subscription> {
private final AtomicInteger counter = new AtomicInteger();
private final Observable items;
- private final Func2 predicate;
+ private final int num;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
- private TakeWhile(Observable items, Func2 predicate) {
+ private Take(Observable items, int num) {
this.items = items;
- this.predicate = predicate;
+ this.num = num;
}
@Override
public Subscription call(Observer observer) {
+ if (num < 1) {
+ items.subscribe(new Observer()
+ {
+ @Override
+ public void onCompleted()
+ {
+ }
+
+ @Override
+ public void onError(Exception e)
+ {
+ }
+
+ @Override
+ public void onNext(T args)
+ {
+ }
+ }).unsubscribe();
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}
@@ -134,20 +114,28 @@ public ItemObserver(Observer observer) {
@Override
public void onCompleted() {
- observer.onCompleted();
+ if (counter.getAndSet(num) < num) {
+ observer.onCompleted();
+ }
}
@Override
public void onError(Exception e) {
- observer.onError(e);
+ if (counter.getAndSet(num) < num) {
+ observer.onError(e);
+ }
}
@Override
public void onNext(T args) {
- if (predicate.call(args, counter.getAndIncrement())) {
+ final int count = counter.incrementAndGet();
+ if (count <= num) {
observer.onNext(args);
- } else {
- observer.onCompleted();
+ if (count == num) {
+ observer.onCompleted();
+ }
+ }
+ if (count >= num) {
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
@@ -160,65 +148,9 @@ public void onNext(T args) {
public static class UnitTest {
@Test
- public void testTakeWhile1() {
- Observable w = Observable.toObservable(1, 2, 3);
- Observable take = Observable.create(takeWhile(w, new Func1() {
- @Override
- public Boolean call(Integer input) {
- return input < 3;
- }
- }));
-
- @SuppressWarnings("unchecked")
- Observer aObserver = mock(Observer.class);
- take.subscribe(aObserver);
- verify(aObserver, times(1)).onNext(1);
- verify(aObserver, times(1)).onNext(2);
- verify(aObserver, never()).onNext(3);
- verify(aObserver, never()).onError(any(Exception.class));
- verify(aObserver, times(1)).onCompleted();
- }
-
- @Test
- public void testTakeWhileOnSubject1() {
- Subject s = Subject.create();
- Observable w = (Observable)s;
- Observable take = Observable.create(takeWhile(w, new Func1() {
- @Override
- public Boolean call(Integer input) {
- return input < 3;
- }
- }));
-
- @SuppressWarnings("unchecked")
- Observer aObserver = mock(Observer.class);
- take.subscribe(aObserver);
-
- s.onNext(1);
- s.onNext(2);
- s.onNext(3);
- s.onNext(4);
- s.onNext(5);
- s.onCompleted();
-
- verify(aObserver, times(1)).onNext(1);
- verify(aObserver, times(1)).onNext(2);
- verify(aObserver, never()).onNext(3);
- verify(aObserver, never()).onNext(4);
- verify(aObserver, never()).onNext(5);
- verify(aObserver, never()).onError(any(Exception.class));
- verify(aObserver, times(1)).onCompleted();
- }
-
- @Test
- public void testTakeWhile2() {
+ public void testTake1() {
Observable w = Observable.toObservable("one", "two", "three");
- Observable take = Observable.create(takeWhileWithIndex(w, new Func2() {
- @Override
- public Boolean call(String input, Integer index) {
- return index < 2;
- }
- }));
+ Observable take = Observable.create(assertTrustedObservable(take(w, 2)));
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
@@ -231,33 +163,59 @@ public Boolean call(String input, Integer index) {
}
@Test
- public void testTake1() {
+ public void testTake2() {
Observable w = Observable.toObservable("one", "two", "three");
- Observable take = Observable.create(take(w, 2));
+ Observable take = Observable.create(assertTrustedObservable(take(w, 1)));
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
- verify(aObserver, times(1)).onNext("two");
+ verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onNext("three");
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}
@Test
- public void testTake2() {
- Observable w = Observable.toObservable("one", "two", "three");
- Observable take = Observable.create(take(w, 1));
+ public void testTakeDoesntLeakErrors() {
+ Observable source = Observable.create(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onNext("one");
+ observer.onError(new Exception("test failed"));
+ return Subscriptions.empty();
+ }
+ });
+ Observable.create(assertTrustedObservable(take(source, 1))).last();
+ }
- @SuppressWarnings("unchecked")
- Observer aObserver = mock(Observer.class);
- take.subscribe(aObserver);
- verify(aObserver, times(1)).onNext("one");
- verify(aObserver, never()).onNext("two");
- verify(aObserver, never()).onNext("three");
- verify(aObserver, never()).onError(any(Exception.class));
- verify(aObserver, times(1)).onCompleted();
+ @Test
+ public void testTakeZeroDoesntLeakError() {
+ final AtomicBoolean subscribed = new AtomicBoolean(false);
+ final AtomicBoolean unSubscribed = new AtomicBoolean(false);
+ Observable source = Observable.create(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ subscribed.set(true);
+ observer.onError(new Exception("test failed"));
+ return new Subscription()
+ {
+ @Override
+ public void unsubscribe()
+ {
+ unSubscribed.set(true);
+ }
+ };
+ }
+ });
+ Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
+ assertTrue("source subscribed", subscribed.get());
+ assertTrue("source unsubscribed", unSubscribed.get());
}
@Test
@@ -267,7 +225,7 @@ public void testUnsubscribeAfterTake() {
@SuppressWarnings("unchecked")
Observer aObserver = mock(Observer.class);
- Observable take = Observable.create(take(w, 1));
+ Observable take = Observable.create(assertTrustedObservable(take(w, 1)));
take.subscribe(aObserver);
// wait for the Observable to complete
diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java
new file mode 100644
index 0000000000..f45efabc92
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java
@@ -0,0 +1,313 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.AtomicObservableSubscription;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+import rx.subjects.Subject;
+/**
+ * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
+ */
+public final class OperationTakeWhile {
+
+ /**
+ * Returns a specified number of contiguous values from the start of an observable sequence.
+ *
+ * @param items
+ * @param predicate
+ * a function to test each source element for a condition
+ * @return
+ */
+ public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) {
+ return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate));
+ }
+
+ /**
+ * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
+ *
+ * @param items
+ * @param predicate
+ * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
+ * @return
+ */
+ public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) {
+ // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe.
+ return new Func1, Subscription>() {
+
+ @Override
+ public Subscription call(Observer observer) {
+ return new TakeWhile(items, predicate).call(observer);
+ }
+
+ };
+ }
+
+ private static Func2 skipIndex(final Func1 underlying) {
+ return new Func2() {
+ @Override
+ public Boolean call(T input, Integer index) {
+ return underlying.call(input);
+ }
+ };
+ }
+
+ /**
+ * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
+ *
+ * It IS thread-safe from within it while receiving onNext events from multiple threads.
+ *
+ * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above.
+ *
+ * Note how the takeWhileWithIndex() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow.
+ *
+ * @param
+ */
+ private static class TakeWhile implements Func1, Subscription> {
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Observable items;
+ private final Func2 predicate;
+ private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
+
+ private TakeWhile(Observable items, Func2 predicate) {
+ this.items = items;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public Subscription call(Observer observer) {
+ return subscription.wrap(items.subscribe(new ItemObserver(observer)));
+ }
+
+ private class ItemObserver implements Observer {
+ private final Observer observer;
+
+ public ItemObserver(Observer observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void onCompleted() {
+ observer.onCompleted();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ observer.onError(e);
+ }
+
+ @Override
+ public void onNext(T args) {
+ if (predicate.call(args, counter.getAndIncrement())) {
+ observer.onNext(args);
+ } else {
+ observer.onCompleted();
+ // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
+ subscription.unsubscribe();
+ }
+ }
+
+ }
+
+ }
+
+ public static class UnitTest {
+
+ @Test
+ public void testTakeWhile1() {
+ Observable w = Observable.toObservable(1, 2, 3);
+ Observable take = Observable.create(takeWhile(w, new Func1()
+ {
+ @Override
+ public Boolean call(Integer input)
+ {
+ return input < 3;
+ }
+ }));
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ take.subscribe(aObserver);
+ verify(aObserver, times(1)).onNext(1);
+ verify(aObserver, times(1)).onNext(2);
+ verify(aObserver, never()).onNext(3);
+ verify(aObserver, never()).onError(any(Exception.class));
+ verify(aObserver, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testTakeWhileOnSubject1() {
+ Subject s = Subject.create();
+ Observable w = (Observable)s;
+ Observable take = Observable.create(takeWhile(w, new Func1()
+ {
+ @Override
+ public Boolean call(Integer input)
+ {
+ return input < 3;
+ }
+ }));
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ take.subscribe(aObserver);
+
+ s.onNext(1);
+ s.onNext(2);
+ s.onNext(3);
+ s.onNext(4);
+ s.onNext(5);
+ s.onCompleted();
+
+ verify(aObserver, times(1)).onNext(1);
+ verify(aObserver, times(1)).onNext(2);
+ verify(aObserver, never()).onNext(3);
+ verify(aObserver, never()).onNext(4);
+ verify(aObserver, never()).onNext(5);
+ verify(aObserver, never()).onError(any(Exception.class));
+ verify(aObserver, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testTakeWhile2() {
+ Observable w = Observable.toObservable("one", "two", "three");
+ Observable take = Observable.create(takeWhileWithIndex(w, new Func2()
+ {
+ @Override
+ public Boolean call(String input, Integer index)
+ {
+ return index < 2;
+ }
+ }));
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ take.subscribe(aObserver);
+ verify(aObserver, times(1)).onNext("one");
+ verify(aObserver, times(1)).onNext("two");
+ verify(aObserver, never()).onNext("three");
+ verify(aObserver, never()).onError(any(Exception.class));
+ verify(aObserver, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testTakeWhileDoesntLeakErrors() {
+ Observable source = Observable.create(new Func1, Subscription>()
+ {
+ @Override
+ public Subscription call(Observer observer)
+ {
+ observer.onNext("one");
+ observer.onError(new Exception("test failed"));
+ return Subscriptions.empty();
+ }
+ });
+
+ Observable.create(takeWhile(source, new Func1()
+ {
+ @Override
+ public Boolean call(String s)
+ {
+ return false;
+ }
+ })).last();
+ }
+
+ @Test
+ public void testUnsubscribeAfterTake() {
+ Subscription s = mock(Subscription.class);
+ TestObservable w = new TestObservable(s, "one", "two", "three");
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ Observable take = Observable.create(takeWhileWithIndex(w, new Func2()
+ {
+ @Override
+ public Boolean call(String s, Integer index)
+ {
+ return index < 1;
+ }
+ }));
+ take.subscribe(aObserver);
+
+ // wait for the Observable to complete
+ try {
+ w.t.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ System.out.println("TestObservable thread finished");
+ verify(aObserver, times(1)).onNext("one");
+ verify(aObserver, never()).onNext("two");
+ verify(aObserver, never()).onNext("three");
+ verify(s, times(1)).unsubscribe();
+ }
+
+ private static class TestObservable extends Observable {
+
+ final Subscription s;
+ final String[] values;
+ Thread t = null;
+
+ public TestObservable(Subscription s, String... values) {
+ this.s = s;
+ this.values = values;
+ }
+
+ @Override
+ public Subscription subscribe(final Observer observer) {
+ System.out.println("TestObservable subscribed to ...");
+ t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("running TestObservable thread");
+ for (String s : values) {
+ System.out.println("TestObservable onNext: " + s);
+ observer.onNext(s);
+ }
+ observer.onCompleted();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ });
+ System.out.println("starting TestObservable thread");
+ t.start();
+ System.out.println("done starting TestObservable thread");
+ return s;
+ }
+
+ }
+ }
+
+}