diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 355ed7a370..ce259cdc8a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2207,6 +2207,19 @@ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } + /** + * Wraps each item emitted by a source Observable in a {@link Timestamped} + * object with timestamps provided by the given Scheduler. + * + * @param scheduler the {@link Scheduler} to use as a time source. + * @return an Observable that emits timestamped items from the source + * Observable with timestamps provided by the given Scheduler + * @see MSDN: Observable.Timestamp + */ + public Observable> timestamp(Scheduler scheduler) { + return create(OperationTimestamp.timestamp(this, scheduler)); + } + /** * Converts a {@link Future} into an Observable. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java b/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java index bcd680216e..a1044ba748 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimestamp.java @@ -17,6 +17,7 @@ import rx.Observable; import rx.Observable.OnSubscribeFunc; +import rx.Scheduler; import rx.util.Timestamped; import rx.util.functions.Func1; @@ -44,4 +45,15 @@ public Timestamped call(T value) { } }); } + /** + * Timestamp the source elements based on the timing provided by the scheduler. + */ + public static OnSubscribeFunc> timestamp(Observable source, final Scheduler scheduler) { + return OperationMap.map(source, new Func1>() { + @Override + public Timestamped call(T value) { + return new Timestamped(scheduler.now(), value); + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java b/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java new file mode 100644 index 0000000000..9794b6c7f5 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java @@ -0,0 +1,86 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import static org.mockito.Mockito.*; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.Timestamped; + +public class OperationTimestampTest { + @Mock + Observer observer; + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + @Test + public void timestampWithScheduler() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + Observable> m = source.timestamp(scheduler); + m.subscribe(observer); + + source.onNext(1); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + source.onNext(3); + + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, times(1)).onNext(new Timestamped(0, 1)); + inOrder.verify(observer, times(1)).onNext(new Timestamped(100, 2)); + inOrder.verify(observer, times(1)).onNext(new Timestamped(200, 3)); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } + @Test + public void timestampWithScheduler2() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + Observable> m = source.timestamp(scheduler); + m.subscribe(observer); + + source.onNext(1); + source.onNext(2); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + source.onNext(3); + + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, times(1)).onNext(new Timestamped(0, 1)); + inOrder.verify(observer, times(1)).onNext(new Timestamped(0, 2)); + inOrder.verify(observer, times(1)).onNext(new Timestamped(200, 3)); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } +}