From d32ed0c2c4a4c5942644ad276b76c57643ab9571 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 23 Dec 2013 22:45:24 +0100 Subject: [PATCH] BO.collect and BO.chunkify --- .../rx/observables/BlockingObservable.java | 28 ++ .../java/rx/operators/OperationCollect.java | 252 ++++++++++++++++++ .../rx/operators/OperationCollectTest.java | 220 +++++++++++++++ 3 files changed, 500 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationCollect.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationCollectTest.java diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 89820b79a4..86290a39ee 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -16,6 +16,7 @@ package rx.observables; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; @@ -23,6 +24,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.operators.OperationCollect; import rx.operators.OperationLatest; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; @@ -31,7 +33,9 @@ import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * An extension of {@link Observable} that provides blocking operators. @@ -411,4 +415,28 @@ public Iterator iterator() { } }; } + + /** + * Return an Iterable sequence that returns elements + * collected/aggregated from the source sequence between consecutive next calls. + * @param the type of the collector + * @param initialCollector factory function to create the initial collector + * @param merger function that merges the current collector with the observed value and returns a (new) collector + * @param replaceCollector function that replaces the current collector with a new collector when the current collector is consumed by an Iterator.next() + * @return an Iterable sequence that returns elements + * collected/aggregated from the source sequence between + * consecutive next calls. + */ + public Iterable collect(Func0 initialCollector, + Func2 merger, + Func1 replaceCollector) { + return OperationCollect.collect(o, initialCollector, merger, replaceCollector); + } + /** + * Return an Iterable that produces a sequence of consecutive (possibly empty) chunks of this Observable sequence. + * @return an Iterable that produces a sequence of consecutive (possibly empty) chunks of this Observable sequence. + */ + public Iterable> chunkify() { + return OperationCollect.chunkify(o); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCollect.java b/rxjava-core/src/main/java/rx/operators/OperationCollect.java new file mode 100644 index 0000000000..5fe33d077e --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationCollect.java @@ -0,0 +1,252 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.SerialSubscription; +import rx.util.Exceptions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Observable to iterable mapping by using custom collector, merger and + * collector-replacer functions. + */ +public final class OperationCollect { + /** Utility class. */ + private OperationCollect() { throw new IllegalStateException("No instances!"); } + + /** + * Produces an Iterable sequence that returns elements + * collected/aggregated from the source sequence between consecutive next calls. + * @param the source value type + * @param the aggregation type + * @param source the source Observable + * @param initialCollector the factory to create the initial collector + * @param merger the merger that combines the current collector with the observed value and returns a (new) collector + * @param replaceCollector the function that replaces the current collector with a new collector. + * @return the iterable sequence + */ + public static Iterable collect( + final Observable source, + final Func0 initialCollector, + final Func2 merger, + final Func1 replaceCollector) { + return new Iterable() { + @Override + public Iterator iterator() { + SerialSubscription sas = new SerialSubscription(); + Collect collect = new Collect(initialCollector, merger, replaceCollector, sas); + if (!sas.isUnsubscribed()) { + sas.set(source.subscribe(collect)); + } + return collect; + } + }; + } + + /** + * Produces an Iterable sequence of consecutive (possibly empty) chunks of the source sequence. + * @param the source value type + * @param source the source Observable + * @return an iterable sequence of the chunks + */ + public static Iterable> chunkify(final Observable source) { + ListManager na = new ListManager(); + return collect(source, na, na, na); + } + /** Creates a new ArrayList and manages its content. */ + private static final class ListManager implements Func1, List>, Func0>, Func2, T, List> { + @Override + public List call() { + return new ArrayList(); + } + + @Override + public List call(List t1) { + return call(); + } + @Override + public List call(List t1, T t2) { + t1.add(t2); + return t1; + } + } + + /** The observer and iterator. */ + private static final class Collect implements Observer, Iterator { + final Func2 merger; + final Func1 replaceCollector; + final Subscription cancel; + final Lock lock = new ReentrantLock(); + U current; + boolean hasDone; + boolean hasError; + Throwable error; + /** Iterator's current collector. */ + U iCurrent; + /** Iterator has unclaimed collector. */ + boolean iHasValue; + /** Iterator completed. */ + boolean iDone; + /** Iterator error. */ + Throwable iError; + + public Collect(final Func0 initialCollector, + final Func2 merger, + final Func1 replaceCollector, + final Subscription cancel) { + this.merger = merger; + this.replaceCollector = replaceCollector; + this.cancel = cancel; + try { + current = initialCollector.call(); + } catch (Throwable t) { + hasError = true; + error = t; + cancel.unsubscribe(); + } + } + + @Override + public void onNext(T args) { + boolean unsubscribe = false; + lock.lock(); + try { + if (hasDone || hasError) { + return; + } + try { + current = merger.call(current, args); + } catch (Throwable t) { + error = t; + hasError = true; + unsubscribe = true; + } + } finally { + lock.unlock(); + } + if (unsubscribe) { + cancel.unsubscribe(); + } + } + + @Override + public void onCompleted() { + boolean unsubscribe = false; + lock.lock(); + try { + if (hasDone || hasError) { + return; + } + hasDone = true; + unsubscribe = true; + } finally { + lock.unlock(); + } + if (unsubscribe) { + cancel.unsubscribe(); + } + } + + @Override + public void onError(Throwable e) { + boolean unsubscribe = false; + lock.lock(); + try { + if (hasDone || hasError) { + return; + } + hasError = true; + error = e; + unsubscribe = true; + } finally { + lock.unlock(); + } + if (unsubscribe) { + cancel.unsubscribe(); + } + } + + @Override + public boolean hasNext() { + if (iError != null) { + throw Exceptions.propagate(iError); + } + if (!iHasValue) { + if (!iDone) { + lock.lock(); + try { + if (hasError) { + iError = error; + iDone = true; + current = null; + iCurrent = null; + } else { + iCurrent = current; + iHasValue = true; + if (hasDone) { + current = null; + iDone = true; + } else { + try { + current = replaceCollector.call(iCurrent); + } catch (Throwable t) { + iError = t; + iDone = true; + } + } + } + } finally { + lock.unlock(); + } + if (iDone && iError != null) { + cancel.unsubscribe(); + throw Exceptions.propagate(iError); + } + return true; + } + return false; + } + return true; + } + + @Override + public U next() { + if (hasNext()) { + U value = iCurrent; + iCurrent = null; + iHasValue = false; + return value; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only sequence"); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationCollectTest.java b/rxjava-core/src/test/java/rx/operators/OperationCollectTest.java new file mode 100644 index 0000000000..9708ff17f2 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationCollectTest.java @@ -0,0 +1,220 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import static org.junit.Assert.*; +import org.junit.Test; +import rx.Observable; +import rx.subjects.PublishSubject; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class OperationCollectTest { + @Test + public void testChunkify() { + PublishSubject source = PublishSubject.create(); + + Iterable> iterable = source.toBlockingObservable().chunkify(); + + Iterator> it = iterable.iterator(); + + assertEquals(Arrays.asList(), it.next()); + + source.onNext(1); + + assertEquals(Arrays.asList(1), it.next()); + + source.onNext(2); + source.onNext(3); + + assertEquals(Arrays.asList(2, 3), it.next()); + + assertEquals(Arrays.asList(), it.next()); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + it.hasNext(); + + source.onNext(7); + source.onNext(8); + source.onNext(9); + source.onNext(10); + + it.hasNext(); + assertEquals(Arrays.asList(4, 5, 6), it.next()); + + assertEquals(Arrays.asList(7, 8, 9, 10), it.next()); + + source.onCompleted(); + + assertEquals(Arrays.asList(), it.next()); + + try { + it.next(); + fail("Should have thrown NoSuchElementException"); + } catch (NoSuchElementException ex) { + // this is fine + } + } + @Test + public void testIterateManyTimes() { + Observable source = Observable.from(1, 2, 3); + + Iterable> iter = source.toBlockingObservable().chunkify(); + + for (int i = 0; i < 3; i++) { + Iterator> it = iter.iterator(); + + assertTrue(it.hasNext()); + + List list = it.next(); + + assertEquals(Arrays.asList(1, 2, 3), list); + } + } + static final class CustomException extends RuntimeException { + public CustomException(String message) { + super(message); + } + } + @Test + public void testInitialBufferThrows() { + Observable source = Observable.from(1, 2, 3); + + Func0 initialBuffer = new Func0() { + @Override + public Integer call() { + throw new CustomException("Forced failure!"); + } + }; + + Func2 collector = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + }; + + Func1 replaceBuffer = new Func1() { + + @Override + public Integer call(Integer t1) { + return 0; + } + + }; + + Iterable iter = source.toBlockingObservable().collect(initialBuffer, collector, replaceBuffer); + + Iterator it = iter.iterator(); + + try { + it.next(); + fail("Failed to throw CustomException"); + } catch (CustomException ex) { + // okay to get here + + } + } + + @Test + public void testCollectorThrows() { + Observable source = Observable.from(1, 2, 3); + + Func0 initialBuffer = new Func0() { + @Override + public Integer call() { + return 0; + } + }; + + Func2 collector = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + throw new CustomException("Forced failure!"); + } + }; + + Func1 replaceBuffer = new Func1() { + + @Override + public Integer call(Integer t1) { + return 0; + } + + }; + + Iterable iter = source.toBlockingObservable().collect(initialBuffer, collector, replaceBuffer); + + Iterator it = iter.iterator(); + + try { + it.next(); + fail("Failed to throw CustomException"); + } catch (CustomException ex) { + // okay to get here + } + } + @Test + public void testReplaceBufferThrows() { + PublishSubject source = PublishSubject.create(); + + Func0 initialBuffer = new Func0() { + @Override + public Integer call() { + return 0; + } + }; + + Func2 collector = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + }; + + Func1 replaceBuffer = new Func1() { + + @Override + public Integer call(Integer t1) { + throw new CustomException("Forced failure!"); + } + + }; + + Iterable iter = source.toBlockingObservable().collect(initialBuffer, collector, replaceBuffer); + + Iterator it = iter.iterator(); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + try { + it.next(); + fail("Failed to throw CustomException"); + } catch (CustomException ex) { + // okay to get here + } + } +}