From 36e6c31b5cd77bba987f5499e39b8d8309a2f972 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Thu, 12 Sep 2013 15:11:50 +0200 Subject: [PATCH 1/2] implemented distinct operator --- rxjava-core/src/main/java/rx/Observable.java | 9 + .../java/rx/operators/OperationDistinct.java | 212 ++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationDistinct.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 8bfe25ce63..b286b11d51 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,6 +36,7 @@ import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; +import rx.operators.OperationDistinct; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; @@ -2925,6 +2926,14 @@ public Observable filter(Func1 predicate) { return create(OperationFilter.filter(this, predicate)); } + public Observable distinct() { + return create(OperationDistinct.distinct(this)); + } + + public Observable distinct(Func1 keySelector) { + return create(OperationDistinct.distinct(this, keySelector)); + } + /** * Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationDistinct.java b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java new file mode 100644 index 0000000000..b9cb0c6ebe --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDistinct.java @@ -0,0 +1,212 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.create; +import static rx.Observable.empty; +import static rx.Observable.from; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * Returns an Observable that emits all distinct items emitted by the source. + * + * Be careful with this operation when using infinite or very large observables + * as it has to store all distinct values it has received. + */ +public final class OperationDistinct { + + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source, Func1 keySelector) { + return new Distinct(source, keySelector); + } + + /** + * Returns an Observable that emits all distinct items emitted by the source + * @param source + * The source Observable to emit the distinct items for. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc distinct(Observable source) { + return new Distinct(source, Functions.identity()); + } + + private static class Distinct implements OnSubscribeFunc { + private final Observable source; + private final Func1 keySelector; + + private Distinct(Observable source, Func1 keySelector) { + this.source = source; + this.keySelector = keySelector; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + private final Set emittedKeys = new HashSet(); + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + try { + U nextKey = keySelector.call(next); + if (!emittedKeys.contains(nextKey)) { + emittedKeys.add(nextKey); + observer.onNext(next); + } + } catch (Throwable t) { + // keySelector is a user function, may throw something + observer.onError(t); + } + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + @Mock + Observer w; + + // nulls lead to exceptions + final Func1 TO_UPPER_WITH_EXCEPTION = new Func1() { + @Override + public String call(String s) { + return s.toUpperCase(); + } + }; + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testDistinctOfNone() { + Observable src = empty(); + create(distinct(src)).subscribe(w); + + verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testDistinctOfNoneWithKeySelector() { + Observable src = empty(); + create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w); + + verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testDistinctOfNormalSource() { + Observable src = from("a", "b", "c", "c", "c", "b", "b", "a", "e"); + create(distinct(src)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("b"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("e"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfNormalSourceWithKeySelector() { + Observable src = from("a", "B", "c", "C", "c", "B", "b", "a", "E"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("B"); + inOrder.verify(w, times(1)).onNext("c"); + inOrder.verify(w, times(1)).onNext("E"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfSourceWithNulls() { + Observable src = from(null, "a", "a", null, null, "b", null); + create(distinct(src)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(null); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("b"); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onNext(anyString()); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testDistinctOfSourceWithExceptionsFromKeySelector() { + Observable src = from("a", "b", null, "c"); + create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext("a"); + inOrder.verify(w, times(1)).onNext("b"); + inOrder.verify(w, times(1)).onError(any(NullPointerException.class)); + inOrder.verify(w, never()).onNext(anyString()); + inOrder.verify(w, never()).onCompleted(); + } + } +} From 243e624c8e932f756c226dcf0797d95951fa091b Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Thu, 12 Sep 2013 15:14:34 +0200 Subject: [PATCH 2/2] added javadoc comments to Observable.distinct --- rxjava-core/src/main/java/rx/Observable.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b286b11d51..47ddc5c927 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2926,10 +2926,26 @@ public Observable filter(Func1 predicate) { return create(OperationFilter.filter(this, predicate)); } + /** + * Returns an Observable that forwards all distinct items emitted from the source Observable. + * + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ public Observable distinct() { return create(OperationDistinct.distinct(this)); } + /** + * Returns an Observable that forwards all items emitted from the source Observable that are distinct according to + * a key selector function. + * + * @param keySelector + * a function that projects an emitted item to a key value which is used for deciding whether an item is + * distinct from another one or not + * @return an Observable of distinct items + * @see MSDN: Observable.distinct + */ public Observable distinct(Func1 keySelector) { return create(OperationDistinct.distinct(this, keySelector)); }