diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 04fa8e5132..cea331d876 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -111,6 +111,7 @@ import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; +import rx.subjects.BoundedReplaySubject; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; @@ -4377,7 +4378,7 @@ public ConnectableObservable replay(Scheduler scheduler) { * @see MSDN: Observable.Replay */ public ConnectableObservable replay(int bufferSize) { - return OperationMulticast.multicast(this, OperationReplay.replayBuffered(bufferSize)); + return OperationMulticast.multicast(this, BoundedReplaySubject.createBuffered(bufferSize)); } /** @@ -4399,7 +4400,7 @@ public ConnectableObservable replay(int bufferSize) { public ConnectableObservable replay(int bufferSize, Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject( - OperationReplay.replayBuffered(bufferSize), scheduler)); + BoundedReplaySubject.createBuffered(bufferSize), scheduler)); } /** @@ -4441,7 +4442,7 @@ public ConnectableObservable replay(long time, TimeUnit unit) { * @see MSDN: Observable.Replay */ public ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { - return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, -1, scheduler)); + return OperationMulticast.multicast(this, BoundedReplaySubject.createWindowed(time, unit, scheduler)); } /** @@ -4487,7 +4488,7 @@ public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit, if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, bufferSize, scheduler)); + return OperationMulticast.multicast(this, BoundedReplaySubject.createWindowedAndBuffered(time, unit, bufferSize, scheduler)); } /** @@ -4563,7 +4564,7 @@ public Observable replay(Func1, ? extends Observabl return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { - return OperationReplay.replayBuffered(bufferSize); + return BoundedReplaySubject.createBuffered(bufferSize); } }, selector); } @@ -4591,7 +4592,7 @@ public Observable replay(Func1, ? extends Observabl return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { - return OperationReplay.createScheduledSubject(OperationReplay.replayBuffered(bufferSize), scheduler); + return OperationReplay.createScheduledSubject(BoundedReplaySubject.createBuffered(bufferSize), scheduler); } }, selector); } @@ -4644,7 +4645,7 @@ public Observable replay(Func1, ? extends Observabl return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { - return OperationReplay.replayWindowed(time, unit, -1, scheduler); + return BoundedReplaySubject.createWindowed(time, unit, scheduler); } }, selector); } @@ -4703,7 +4704,7 @@ public Observable replay(Func1, ? extends Observabl return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { - return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); + return BoundedReplaySubject.createWindowedAndBuffered(time, unit, bufferSize, scheduler); } }, selector); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationReplay.java b/rxjava-core/src/main/java/rx/operators/OperationReplay.java index 1a89b3d2ac..6cf4ff004f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationReplay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationReplay.java @@ -15,26 +15,13 @@ */ package rx.operators; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.subjects.Subject; -import rx.subscriptions.Subscriptions; -import rx.util.Timestamped; -import rx.util.functions.Action0; import rx.util.functions.Func1; -import rx.util.functions.Functions; /** * Replay with limited buffer and/or time constraints. @@ -46,12 +33,7 @@ public final class OperationReplay { /** Utility class. */ private OperationReplay() { throw new IllegalStateException("No instances!"); } - /** - * Create a BoundedReplaySubject with the given buffer size. - */ - public static Subject replayBuffered(int bufferSize) { - return CustomReplaySubject.create(bufferSize); - } + /** * Creates a subject whose client observers will observe events * propagated through the given wrapped subject. @@ -62,69 +44,6 @@ public static Subject createScheduledSubject(Subject subject, Sc return s; } - /** - * Create a CustomReplaySubject with the given time window length - * and optional buffer size. - * - * @param the source and return type - * @param time the length of the time window - * @param unit the unit of the time window length - * @param bufferSize the buffer size if >= 0, otherwise, the buffer will be unlimited - * @param scheduler the scheduler from where the current time is retrieved. The - * observers will not observe on this scheduler. - * @return a Subject with the required replay behavior - */ - public static Subject replayWindowed(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { - final long ms = unit.toMillis(time); - if (ms <= 0) { - throw new IllegalArgumentException("The time window is less than 1 millisecond!"); - } - Func1> timestamp = new Func1>() { - @Override - public Timestamped call(T t1) { - return new Timestamped(scheduler.now(), t1); - } - }; - Func1, T> untimestamp = new Func1, T>() { - @Override - public T call(Timestamped t1) { - return t1.getValue(); - } - }; - - ReplayState, T> state; - - if (bufferSize >= 0) { - state = new ReplayState, T>(new VirtualBoundedList>(bufferSize), untimestamp); - } else { - state = new ReplayState, T>(new VirtualArrayList>(), untimestamp); - } - final ReplayState, T> fstate = state; - // time based eviction when a value is added - state.onValueAdded = new Action0() { - @Override - public void call() { - long now = scheduler.now(); - long before = now - ms; - for (int i = fstate.values.start(); i < fstate.values.end(); i++) { - Timestamped v = fstate.values.get(i); - if (v.getTimestampMillis() >= before) { - fstate.values.removeBefore(i); - break; - } - } - } - }; - // time based eviction when a client subscribes - state.onSubscription = state.onValueAdded; - - final CustomReplaySubject, T> brs = new CustomReplaySubject, T>( - new CustomReplaySubjectSubscribeFunc, T>(state), state, timestamp - ); - - return brs; - } - /** * Return an OnSubscribeFunc which delegates the subscription to the given observable. */ @@ -195,557 +114,4 @@ public void onCompleted() { } - /** Base state with lock. */ - static class BaseState { - /** The lock to protect the other fields. */ - private final Lock lock = new ReentrantLock(); - /** Lock. */ - public void lock() { - lock.lock(); - } - /** Unlock. */ - public void unlock() { - lock.unlock(); - } - - } - /** - * Base interface for logically indexing a list. - * @param the value type - */ - public interface VirtualList { - /** @return the number of elements in this list */ - int size(); - /** - * Add an element to the list. - * @param value the value to add - */ - void add(T value); - /** - * Retrieve an element at the specified logical index. - * @param index - * @return - */ - T get(int index); - /** - * Remove elements up before the given logical index and move - * the start() to this index. - *

- * For example, a list contains 3 items. Calling removeUntil 2 will - * remove the first two items. - * @param index - */ - void removeBefore(int index); - /** - * Clear the elements of this list and increase the - * start by the number of elements. - */ - void clear(); - /** - * Returns the current head index of this list. - * @return - */ - int start(); - /** - * Returns the current tail index of this list (where the next value would appear). - * @return - */ - int end(); - /** - * Clears and resets the indexes of the list. - */ - void reset(); - /** - * Returns the current content as a list. - * @return - */ - List toList(); - } - /** - * Behaves like a normal, unbounded ArrayList but with virtual index. - */ - public static final class VirtualArrayList implements VirtualList { - /** The backing list .*/ - final List list = new ArrayList(); - /** The virtual start index of the list. */ - int startIndex; - @Override - public int size() { - return list.size(); - } - @Override - public void add(T value) { - list.add(value); - } - - @Override - public T get(int index) { - return list.get(index - startIndex); - } - - @Override - public void removeBefore(int index) { - int j = index - startIndex; - if (j > 0 && j <= list.size()) { - list.subList(0, j).clear(); - } - startIndex = index; - } - - @Override - public void clear() { - startIndex += list.size(); - list.clear(); - } - - @Override - public int start() { - return startIndex; - } - - @Override - public int end() { - return startIndex + list.size(); - } - - @Override - public void reset() { - list.clear(); - startIndex = 0; - } - @Override - public List toList() { - return new ArrayList(list); - } - - } - /** - * A bounded list which increases its size up to a maximum capacity, then - * behaves like a circular buffer with virtual indexes. - */ - public static final class VirtualBoundedList implements VirtualList { - /** A list that grows up to maxSize. */ - private final List list = new ArrayList(); - /** The maximum allowed size. */ - private final int maxSize; - /** The logical start index of the list. */ - int startIndex; - /** The head index inside the list, where the first readable value sits. */ - int head; - /** The tail index inside the list, where the next value will be added. */ - int tail; - /** The number of items in the list. */ - int count; - /** - * Construct a VirtualBoundedList with the given maximum number of elements. - * @param maxSize - */ - public VirtualBoundedList(int maxSize) { - if (maxSize < 0) { - throw new IllegalArgumentException("maxSize < 0"); - } - this.maxSize = maxSize; - } - @Override - public int start() { - return startIndex; - } - - @Override - public int end() { - return startIndex + count; - } - - @Override - public void clear() { - startIndex += count; - list.clear(); - head = 0; - tail = 0; - count = 0; - } - @Override - public int size() { - return count; - } - - @Override - public void add(T value) { - if (list.size() == maxSize) { - list.set(tail, value); - head = (head + 1) % maxSize; - tail = (tail + 1) % maxSize; - startIndex++; - } else { - list.add(value); - tail = (tail + 1) % maxSize; - count++; - } - } - - @Override - public T get(int index) { - if (index < start() || index >= end()) { - throw new ArrayIndexOutOfBoundsException(index); - } - int idx = (head + (index - startIndex)) % maxSize; - return list.get(idx); - } - - @Override - public void removeBefore(int index) { - if (index <= start()) { - return; - } - if (index >= end()) { - clear(); - startIndex = index; - return; - } - int rc = index - startIndex; - int head2 = head + rc; - for (int i = head; i < head2; i++) { - list.set(i % maxSize, null); - count--; - } - startIndex = index; - head = head2 % maxSize; - } - @Override - public List toList() { - List r = new ArrayList(list.size() + 1); - for (int i = head; i < head + count; i++) { - int idx = i % maxSize; - r.add(list.get(idx)); - } - return r; - } - - @Override - public void reset() { - list.clear(); - count = 0; - head = 0; - tail = 0; - } - - } - /** - * The state class. - * @param the intermediate type stored in the values buffer - * @param the result type transformed via the resultSelector - */ - static final class ReplayState extends BaseState { - /** The values observed so far. */ - final VirtualList values; - /** The result selector. */ - final Func1 resultSelector; - /** The received error. */ - Throwable error; - /** General completion indicator. */ - boolean done; - /** The map of replayers. */ - final Map replayers = new LinkedHashMap(); - /** - * Callback once a value has been added but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onValueAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback once an error has been called but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onErrorAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback once completed has been called but before it is replayed - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onCompletedAdded = new Action0() { - @Override - public void call() { - } - }; - /** - * Callback to pre-manage the values if an observer unsubscribes - * (I.e, run a time based eviction policy). - *

- * Called while holding the state lock. - */ - protected Action0 onSubscription = new Action0() { - @Override - public void call() { - } - }; - /** - * Construct a ReplayState with the supplied buffer and result selectors. - * @param values - * @param resultSelector - */ - public ReplayState(final VirtualList values, - final Func1 resultSelector) { - this.values = values; - this.resultSelector = resultSelector; - } - /** - * Returns a live collection of the observers. - *

- * Caller should hold the lock. - * @return - */ - Collection replayers() { - return new ArrayList(replayers.values()); - } - /** - * Add a replayer to the replayers and create a Subscription for it. - *

- * Caller should hold the lock. - * - * @param obs - * @return - */ - Subscription addReplayer(Observer obs) { - Subscription s = new Subscription() { - final AtomicBoolean once = new AtomicBoolean(); - @Override - public void unsubscribe() { - if (once.compareAndSet(false, true)) { - remove(this); - } - } - - }; - Replayer rp = new Replayer(obs, s); - replayers.put(s, rp); - rp.replayTill(values.start() + values.size()); - return s; - } - /** The replayer that holds a value where the given observer is currently at. */ - final class Replayer { - protected final Observer wrapped; - /** Where this replayer was in reading the list. */ - protected int index; - /** To cancel and unsubscribe this replayer and observer. */ - protected final Subscription cancel; - protected Replayer(Observer wrapped, Subscription cancel) { - this.wrapped = wrapped; - this.cancel = cancel; - } - /** - * Replay up to the given index - * @param limit - */ - void replayTill(int limit) { - int si = values.start(); - if (index < si) { - index = si; - } - while (index < limit) { - TIntermediate value = values.get(index); - index++; - try { - wrapped.onNext(resultSelector.call(value)); - } catch (Throwable t) { - replayers.remove(cancel); - wrapped.onError(t); - return; - } - } - if (done) { - if (error != null) { - wrapped.onError(error); - } else { - wrapped.onCompleted(); - } - } - } - } - /** - * Remove the subscription. - * @param s - */ - void remove(Subscription s) { - lock(); - try { - replayers.remove(s); - } finally { - unlock(); - } - } - /** - * Add a notification value and limit the size of values. - *

- * Caller should hold the lock. - * @param value - */ - void add(TIntermediate value) { - values.add(value); - } - /** Clears the value list. */ - void clearValues() { - lock(); - try { - values.clear(); - } finally { - unlock(); - } - } - } - /** - * A customizable replay subject with support for transformations. - * - * @param the Observer side's value type - * @param the type of the elements in the replay buffer - * @param the value type of the observers subscribing to this subject - */ - public static final class CustomReplaySubject extends Subject { - /** - * Return a subject that retains all events and will replay them to an {@link Observer} that subscribes. - * @return a subject that retains all events and will replay them to an {@link Observer} that subscribes. - */ - public static CustomReplaySubject create() { - ReplayState state = new ReplayState(new VirtualArrayList(), Functions.identity()); - return new CustomReplaySubject( - new CustomReplaySubjectSubscribeFunc(state), state, - Functions.identity()); - } - /** - * Create a bounded replay subject with the given maximum buffer size. - * @param maxSize the maximum size in number of onNext notifications - * @return - */ - public static CustomReplaySubject create(int maxSize) { - ReplayState state = new ReplayState(new VirtualBoundedList(maxSize), Functions.identity()); - return new CustomReplaySubject( - new CustomReplaySubjectSubscribeFunc(state), state, - Functions.identity()); - } - /** The replay state. */ - protected final ReplayState state; - /** The result selector. */ - protected final Func1 intermediateSelector; - - private CustomReplaySubject( - Observable.OnSubscribeFunc onSubscribe, - ReplayState state, - Func1 intermediateSelector) { - super(onSubscribe); - this.state = state; - this.intermediateSelector = intermediateSelector; - } - - - @Override - public void onCompleted() { - state.lock(); - try { - if (state.done) { - return; - } - state.done = true; - state.onCompletedAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - - @Override - public void onError(Throwable e) { - state.lock(); - try { - if (state.done) { - return; - } - state.done = true; - state.error = e; - state.onErrorAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - - @Override - public void onNext(TInput args) { - state.lock(); - try { - if (state.done) { - return; - } - state.add(intermediateSelector.call(args)); - state.onValueAdded.call(); - replayValues(); - } finally { - state.unlock(); - } - } - /** - * Replay values up to the current index. - */ - protected void replayValues() { - int s = state.values.start() + state.values.size(); - for (ReplayState.Replayer rp : state.replayers()) { - rp.replayTill(s); - } - } - } - /** - * The subscription function. - * @param the type of the elements in the replay buffer - * @param the value type of the observers subscribing to this subject - */ - protected static final class CustomReplaySubjectSubscribeFunc - implements Observable.OnSubscribeFunc { - - private final ReplayState state; - protected CustomReplaySubjectSubscribeFunc(ReplayState state) { - this.state = state; - } - - @Override - public Subscription onSubscribe(Observer t1) { - VirtualList values; - Throwable error; - state.lock(); - try { - if (!state.done) { - state.onSubscription.call(); - return state.addReplayer(t1); - } - values = state.values; - error = state.error; - } finally { - state.unlock(); - } - // fully replay the subject - for (int i = values.start(); i < values.end(); i++) { - try { - t1.onNext(state.resultSelector.call(values.get(i))); - } catch (Throwable t) { - t1.onError(t); - return Subscriptions.empty(); - } - } - if (error != null) { - t1.onError(error); - } else { - t1.onCompleted(); - } - return Subscriptions.empty(); - } - } } diff --git a/rxjava-core/src/main/java/rx/subjects/BoundedReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/BoundedReplaySubject.java new file mode 100644 index 0000000000..a3c8069bab --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/BoundedReplaySubject.java @@ -0,0 +1,782 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.Timestamped; +import rx.util.functions.Action0; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * A customizable replay subject with support for transformations + * and a time/space bounded buffer. + * + * @param the Observer side's value type + * @param the type of the elements in the replay buffer + * @param the value type of the observers subscribing to this subject + */ +public final class BoundedReplaySubject extends Subject { + /** + * Return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + * @param the input and output type + * @return a subject that retains all events and will replay them to an {@link Observer} that subscribes. + */ + public static Subject create() { + ReplayState state = new ReplayState(new VirtualArrayList(), Functions.identity()); + return new BoundedReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + Functions.identity()); + } + + /** + * Create a bounded replay subject with the given maximum buffer size. + * @param the input and output type + * @param bufferSize the maximum size in number of onNext notifications + * @return the bounded replay subject with the given buffer size + */ + public static Subject createBuffered(int bufferSize) { + ReplayState state = new ReplayState(new VirtualBoundedList(bufferSize), Functions.identity()); + return new BoundedReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + Functions.identity()); + } + /** + * Create a CustomReplaySubject with the given time window length + * and optional buffer size. + * + * @param the source and return type + * @param time the length of the time window + * @param unit the unit of the time window length + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @return a Subject with the required replay behavior + */ + public static Subject createWindowed(long time, TimeUnit unit, final Scheduler scheduler) { + return create(time, unit, -1, scheduler, Functions.identity()); + } + + /** + * Create a CustomReplaySubject with the given time window length + * and optional buffer size. + * + * @param the source and return type + * @param time the length of the time window + * @param unit the unit of the time window length + * @param bufferSize the buffer size if >= 0, otherwise, the buffer will be unlimited + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @return a Subject with the required replay behavior + */ + public static Subject createWindowedAndBuffered(long time, TimeUnit unit, int bufferSize, final Scheduler scheduler) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize >= 0 required"); + } + return create(time, unit, bufferSize, scheduler, Functions.identity()); + } + /** + * Create a CustomReplaySubject with the given time window length + * and optional buffer size. + * + * @param the source and return type + * @param time the length of the time window + * @param unit the unit of the time window length + * @param bufferSize the buffer size if >= 0, otherwise, the buffer will be unlimited + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @return a Subject with the required replay behavior + */ + private static BoundedReplaySubject, U> create( + long time, TimeUnit unit, int bufferSize, + final Scheduler scheduler, + final Func1 selector) { + final long ms = unit.toMillis(time); + if (ms <= 0) { + throw new IllegalArgumentException("The time window is less than 1 millisecond!"); + } + Func1> timestamp = new Func1>() { + @Override + public Timestamped call(T t1) { + return new Timestamped(scheduler.now(), t1); + } + }; + Func1, U> untimestamp = new Func1, U>() { + @Override + public U call(Timestamped t1) { + return selector.call(t1.getValue()); + } + }; + + ReplayState, U> state; + + if (bufferSize >= 0) { + state = new ReplayState, U>(new VirtualBoundedList>(bufferSize), untimestamp); + } else { + state = new ReplayState, U>(new VirtualArrayList>(), untimestamp); + } + final ReplayState, U> fstate = state; + // time based eviction when a value is added + state.onValueAdded = new Action0() { + @Override + public void call() { + long now = scheduler.now(); + long before = now - ms; + for (int i = fstate.values.start(); i < fstate.values.end(); i++) { + Timestamped v = fstate.values.get(i); + if (v.getTimestampMillis() >= before) { + fstate.values.removeBefore(i); + break; + } + } + } + }; + // time based eviction when a client subscribes + state.onSubscription = state.onValueAdded; + + final BoundedReplaySubject, U> brs = new BoundedReplaySubject, U>( + new CustomReplaySubjectSubscribeFunc, U>(state), state, timestamp + ); + + return brs; + } + + /** + * Create an unbounded BoundedReplaySubject with mapping capability. + * @param the input value type + * @param the output value type + * @param selector the value selector that maps the inputs to the outputs + * @return an unbounded BoundedReplaySubject with mapping capability + */ + public static Subject createMapped( + Func1 selector + ) { + ReplayState state = new ReplayState( + new VirtualArrayList(), Functions.identity()); + return new BoundedReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + selector); + } + + /** + * Create a bounded BoundedReplaySubject with mapping capability. + * @param the input value type + * @param the output value type + * @param bufferSize the maximum number of elements to buffer + * @param selector the value selector that maps the inputs to the outputs + * @return an unbounded BoundedReplaySubject with mapping capability + */ + public static Subject createMappedAndBuffered(int bufferSize, + Func1 selector + ) { + ReplayState state = new ReplayState( + new VirtualBoundedList(bufferSize), Functions.identity()); + return new BoundedReplaySubject( + new CustomReplaySubjectSubscribeFunc(state), state, + selector); + } + + /** + * Create an unbounded BoundedReplaySubject with mapping capability. + * @param the input value type + * @param the output value type + * @param time the length of the time window + * @param unit the unit of the time window length + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @param selector the value selector that maps the inputs to the outputs + * @return an unbounded BoundedReplaySubject with mapping capability + */ + public static Subject createMappedAndWindowed( + long time, TimeUnit unit, Scheduler scheduler, + Func1 selector + ) { + return create(time, unit, -1, scheduler, selector); + } + + /** + * Create a bounded BoundedReplaySubject with mapping capability. + * @param the input value type + * @param the output value type + * @param bufferSize the maximum number of elements to buffer + * @param time the length of the time window + * @param unit the unit of the time window length + * @param scheduler the scheduler from where the current time is retrieved. The + * observers will not observe on this scheduler. + * @param selector the value selector that maps the inputs to the outputs + * @return an unbounded BoundedReplaySubject with mapping capability + */ + public static Subject createMappedWindowedAndBuffered(int bufferSize, + long time, TimeUnit unit, Scheduler scheduler, + Func1 selector + ) { + if (bufferSize < 0) { + throw new IllegalArgumentException("bufferSize >= 0 required"); + } + return create(time, unit, bufferSize, scheduler, selector); + } + + /** The replay state. */ + protected final ReplayState state; + /** The result selector. */ + protected final Func1 intermediateSelector; + + protected BoundedReplaySubject( + Observable.OnSubscribeFunc onSubscribe, + ReplayState state, + Func1 intermediateSelector) { + super(onSubscribe); + this.state = state; + this.intermediateSelector = intermediateSelector; + } + + + @Override + public void onCompleted() { + state.lockWrite(); + try { + if (state.done) { + return; + } + state.done = true; + state.onCompletedAdded.call(); + } finally { + state.unlockWrite(); + } + state.replayValues(); + } + + @Override + public void onError(Throwable e) { + state.lockWrite(); + try { + if (state.done) { + return; + } + state.done = true; + state.error = e; + state.onErrorAdded.call(); + } finally { + state.unlockWrite(); + } + state.replayValues(); + } + + @Override + public void onNext(TInput args) { + state.lockWrite(); + try { + if (state.done) { + return; + } + state.add(intermediateSelector.call(args)); + state.onValueAdded.call(); + } finally { + state.unlockWrite(); + } + state.replayValues(); + } + /** + * The subscription function. + * @param the type of the elements in the replay buffer + * @param the value type of the observers subscribing to this subject + */ + protected static final class CustomReplaySubjectSubscribeFunc + implements Observable.OnSubscribeFunc { + + protected final ReplayState state; + protected CustomReplaySubjectSubscribeFunc(ReplayState state) { + this.state = state; + } + + @Override + public Subscription onSubscribe(Observer t1) { + VirtualList values; + Throwable error; + state.lockRead(); + try { + if (!state.done) { + state.onSubscription.call(); + return state.addReplayer(t1); + } + values = state.values; + error = state.error; + } finally { + state.unlockRead(); + } + // fully replay the completed subject + for (int i = values.start(); i < values.end(); i++) { + try { + t1.onNext(state.resultSelector.call(values.get(i))); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + } + if (error != null) { + t1.onError(error); + } else { + t1.onCompleted(); + } + return Subscriptions.empty(); + } + } + /** + * The state class. + * @param the intermediate type stored in the values buffer + * @param the result type transformed via the resultSelector + */ + static final class ReplayState extends BaseState { + /** The values observed so far. */ + final VirtualList values; + /** The result selector. */ + final Func1 resultSelector; + /** The received error. */ + Throwable error; + /** General completion indicator. */ + boolean done; + /** The map of replayers. */ + final Map replayers = new ConcurrentHashMap(); + /** + * Callback once a value has been added but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onValueAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback once an error has been called but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onErrorAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback once completed has been called but before it is replayed + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onCompletedAdded = new Action0() { + @Override + public void call() { + } + }; + /** + * Callback to pre-manage the values if an observer unsubscribes + * (I.e, run a time based eviction policy). + *

+ * Called while holding the state lock. + */ + protected Action0 onSubscription = new Action0() { + @Override + public void call() { + } + }; + /** + * Construct a ReplayState with the supplied buffer and result selectors. + * @param values + * @param resultSelector + */ + public ReplayState(final VirtualList values, + final Func1 resultSelector) { + this.values = values; + this.resultSelector = resultSelector; + } + /** + * Returns a live collection of the observers. + * @return + */ + Collection replayers() { + return new ArrayList(replayers.values()); + } + /** + * Add a replayer to the replayers and create a Subscription for it. + *

+ * Caller should hold the read lock. + * + * @param obs + * @return + */ + Subscription addReplayer(Observer obs) { + Subscription s = new Subscription() { + final AtomicBoolean once = new AtomicBoolean(); + @Override + public void unsubscribe() { + if (once.compareAndSet(false, true)) { + remove(this); + } + } + + }; + Replayer rp = new Replayer(obs, s); + rp.replayTill(values.start() + values.size()); + replayers.put(s, rp); // make it visible to replayValues() + return s; + } + /** + * Replay values up to the current index. + *

+ * Caller should hold the read lock. + */ + protected void replayValues() { + lockRead(); + try { + int s = values.start() + values.size(); + for (Replayer rp : replayers()) { + rp.replayTill(s); + } + } finally { + unlockRead(); + } + } + /** The replayer that holds a value where the given observer is currently at. */ + final class Replayer { + protected final Observer wrapped; + /** Where this replayer was in reading the list. */ + protected int index; + /** To cancel and unsubscribe this replayer and observer. */ + protected final Subscription cancel; + protected Replayer(Observer wrapped, Subscription cancel) { + this.wrapped = wrapped; + this.cancel = cancel; + } + /** + * Replay up to the given index + *

+ * Caller should hold the read lock. + * @param limit + */ + void replayTill(int limit) { + int si = values.start(); + if (index < si) { + index = si; + } + while (index < limit) { + TIntermediate value = values.get(index); + index++; + try { + wrapped.onNext(resultSelector.call(value)); + } catch (Throwable t) { + replayers.remove(cancel); + wrapped.onError(t); + return; + } + } + if (done) { + if (error != null) { + wrapped.onError(error); + } else { + wrapped.onCompleted(); + } + } + } + } + /** + * Remove the subscription. + * @param s + */ + void remove(Subscription s) { + replayers.remove(s); + } + /** + * Add a notification value and limit the size of values. + *

+ * Caller should hold the lock. + * @param value + */ + void add(TIntermediate value) { + values.add(value); + } + /** Clears the value list. */ + void clearValues() { + lockWrite(); + try { + values.clear(); + } finally { + unlockWrite(); + } + } + } + /** Base state with lock. */ + static class BaseState { + /** The lock to protect the other fields. */ + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + /** The read lock. */ + private final Lock read = rwLock.readLock(); + /** The write lock. */ + private final Lock write = rwLock.writeLock(); + /** Lock read. */ + public void lockRead() { + read.lock(); + } + /** Unlock read. */ + public void unlockRead() { + read.unlock(); + } + /** Lock write. */ + public void lockWrite() { + write.lock(); + } + /** Unlock write. */ + public void unlockWrite() { + write.unlock(); + } + } + /** + * Base interface for logically indexing a list. + * @param the value type + */ + public interface VirtualList { + /** @return the number of elements in this list */ + int size(); + /** + * Add an element to the list. + * @param value the value to add + */ + void add(T value); + /** + * Retrieve an element at the specified logical index. + * @param index + * @return + */ + T get(int index); + /** + * Remove elements up before the given logical index and move + * the start() to this index. + *

+ * For example, a list contains 3 items. Calling removeUntil 2 will + * remove the first two items. + * @param index + */ + void removeBefore(int index); + /** + * Clear the elements of this list and increase the + * start by the number of elements. + */ + void clear(); + /** + * Returns the current head index of this list. + * @return + */ + int start(); + /** + * Returns the current tail index of this list (where the next value would appear). + * @return + */ + int end(); + /** + * Clears and resets the indexes of the list. + */ + void reset(); + /** + * Returns the current content as a list. + * @return + */ + List toList(); + } + /** + * Behaves like a normal, unbounded ArrayList but with virtual index. + * @param the element type + */ + public static final class VirtualArrayList implements VirtualList { + /** The backing list .*/ + final List list = new ArrayList(); + /** The virtual start index of the list. */ + int startIndex; + @Override + public int size() { + return list.size(); + } + @Override + public void add(T value) { + list.add(value); + } + + @Override + public T get(int index) { + return list.get(index - startIndex); + } + + @Override + public void removeBefore(int index) { + int j = index - startIndex; + if (j > 0 && j <= list.size()) { + list.subList(0, j).clear(); + } + startIndex = index; + } + + @Override + public void clear() { + startIndex += list.size(); + list.clear(); + } + + @Override + public int start() { + return startIndex; + } + + @Override + public int end() { + return startIndex + list.size(); + } + + @Override + public void reset() { + list.clear(); + startIndex = 0; + } + @Override + public List toList() { + return new ArrayList(list); + } + + } + /** + * A bounded list which increases its size up to a maximum capacity, then + * behaves like a circular buffer with virtual indexes. + * @param the element type + */ + public static final class VirtualBoundedList implements VirtualList { + /** A list that grows up to maxSize. */ + private final List list = new ArrayList(); + /** The maximum allowed size. */ + private final int maxSize; + /** The logical start index of the list. */ + int startIndex; + /** The head index inside the list, where the first readable value sits. */ + int head; + /** The tail index inside the list, where the next value will be added. */ + int tail; + /** The number of items in the list. */ + int count; + /** + * Construct a VirtualBoundedList with the given maximum number of elements. + * @param maxSize + */ + public VirtualBoundedList(int maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("maxSize < 0"); + } + this.maxSize = maxSize; + } + @Override + public int start() { + return startIndex; + } + + @Override + public int end() { + return startIndex + count; + } + + @Override + public void clear() { + startIndex += count; + list.clear(); + head = 0; + tail = 0; + count = 0; + } + @Override + public int size() { + return count; + } + + @Override + public void add(T value) { + if (list.size() == maxSize) { + list.set(tail, value); + head = (head + 1) % maxSize; + tail = (tail + 1) % maxSize; + startIndex++; + } else { + list.add(value); + tail = (tail + 1) % maxSize; + count++; + } + } + + @Override + public T get(int index) { + if (index < start() || index >= end()) { + throw new ArrayIndexOutOfBoundsException(index); + } + int idx = (head + (index - startIndex)) % maxSize; + return list.get(idx); + } + + @Override + public void removeBefore(int index) { + if (index <= start()) { + return; + } + if (index >= end()) { + clear(); + startIndex = index; + return; + } + int rc = index - startIndex; + int head2 = head + rc; + for (int i = head; i < head2; i++) { + list.set(i % maxSize, null); + count--; + } + startIndex = index; + head = head2 % maxSize; + } + @Override + public List toList() { + List r = new ArrayList(list.size() + 1); + for (int i = head; i < head + count; i++) { + int idx = i % maxSize; + r.add(list.get(idx)); + } + return r; + } + + @Override + public void reset() { + list.clear(); + count = 0; + head = 0; + tail = 0; + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java index 56f540d466..40dfe016a5 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationReplayTest.java @@ -24,8 +24,8 @@ import rx.Observable; import rx.Observer; import rx.observables.ConnectableObservable; -import rx.operators.OperationReplay.VirtualBoundedList; import rx.schedulers.TestScheduler; +import rx.subjects.BoundedReplaySubject.VirtualBoundedList; import rx.subjects.PublishSubject; import rx.util.functions.Func1;