Skip to content

Subjects reimplemented. #605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions rxjava-core/src/main/java/rx/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,33 @@ public void accept(Observer<? super T> observer) {
observer.onError(getThrowable());
}
}
/**
* Emits the notification value to the observer
* and captures the exception thrown by the onNext method.
* @param observer the observer to send the notification to
* @return false if a terminal condition was reached, i.e.,
* this is an isOnCompleted, isOnError or the observer.onNext threw
*/
public boolean acceptSafe(Observer<? super T> observer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is repeating what SafeObserver does. A Notification doesn't need to do anything different and thus we shouldn't be replicating that logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

if (isOnNext()) {
try {
observer.onNext(getValue());
return true;
} catch (Throwable t) {
observer.onError(t);
return false;
}
} else
if (isOnCompleted()) {
observer.onCompleted();
return false;
} else
if (isOnError()) {
observer.onError(getThrowable());
return false;
}
return false;
}

public static enum Kind {
OnNext, OnError, OnCompleted
Expand Down
150 changes: 149 additions & 1 deletion rxjava-core/src/main/java/rx/subjects/AbstractSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import rx.Notification;
Expand All @@ -30,7 +34,151 @@
import rx.util.functions.Action2;

public abstract class AbstractSubject<T> extends Subject<T, T> {

/** Base state with lock. */
static class BaseState {
/** The lock to protect the other fields. */
private final Lock lock = new ReentrantLock();
/** Lock. */
public void lock() {
lock.lock();
}
/** Unlock. */
public void unlock() {
lock.unlock();
}

}
/** The default state of Subjects.*/
static class DefaultState<T> extends BaseState {
/** The currently subscribed observers. */
private final Map<Subscription, Observer<? super T>> observers = new LinkedHashMap<Subscription, Observer<? super T>>();
/** Indicator that the subject has completed. */
public boolean done;
/** If not null, the subject completed with an error. */
public Throwable error;
/**
* Add an observer to the observers and create a Subscription for it.
* Caller should hold the lock.
* @param obs
* @return
*/
public Subscription addObserver(Observer<? super T> obs) {
Subscription s = new Subscription() {
final AtomicBoolean once = new AtomicBoolean();
@Override
public void unsubscribe() {
if (once.compareAndSet(false, true)) {
remove(this);
}
}

};
observers.put(s, obs);
return s;
}
/**
* Returns a live collection of the observers.
* <p>
* Caller should hold the lock.
* @return
*/
public Collection<Observer<? super T>> observers() {
return new ArrayList<Observer<? super T>>(observers.values());
}
/**
* Removes and returns all observers from the mapping.
* <p>
* Caller should hold the lock.
* @return
*/
public Collection<Observer<? super T>> removeAll() {
List<Observer<? super T>> list = new ArrayList<Observer<? super T>>(observers.values());
observers.clear();
return list;
}
/**
* Remove the subscription.
* @param s
*/
protected void remove(Subscription s) {
lock();
try {
observers.remove(s);
} finally {
unlock();
}
}
/**
* Set the error state and dispatch it to the observers.
* @param e
*/
public void defaultOnError(Throwable e) {
lock();
try {
if (done) {
return;
}
defaultDispatchError(e);
} finally {
unlock();
}
}
/**
* Set the completion state and dispatch it to the observers.
*/
public void defaultOnCompleted() {
lock();
try {
if (done) {
return;
}
done = true;
for (Observer<? super T> o : removeAll()) {
o.onCompleted();
}
} finally {
unlock();
}
}
/**
* Dispatch the value to all subscribed observers.
* @param value
*/
public void defaultOnNext(T value) {
lock();
try {
if (done) {
return;
}
defaultDispatch(value);
} finally {
unlock();
}
}
/**
* Dispatch the value to all subscribed observers.
* <p>
* Caller should hold the lock.
* @param value
*/
public void defaultDispatch(T value) {
for (Observer<? super T> o : observers()) {
o.onNext(value);
}
}
/**
* Dispatch the exception to all subscribed observers and
* remove them.
* @param e
*/
public void defaultDispatchError(Throwable e) {
done = true;
error = e;
for (Observer<? super T> o : removeAll()) {
o.onError(e);
}
}
}
protected AbstractSubject(rx.Observable.OnSubscribeFunc<T> onSubscribe) {
super(onSubscribe);
}
Expand Down
135 changes: 89 additions & 46 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package rx.subjects;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.util.functions.Action2;
import rx.Subscription;
import rx.subjects.AbstractSubject.DefaultState;
import rx.subscriptions.Subscriptions;

/**
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
Expand Down Expand Up @@ -48,68 +50,109 @@
*
* @param <T>
*/
public class AsyncSubject<T> extends AbstractSubject<T> {

public class AsyncSubject<T> extends Subject<T, T> {
/** The inner state. */
protected static final class State<T> extends DefaultState<T> {
protected boolean hasValue;
protected T value;
}
/**
* Create a new AsyncSubject
*
* @return a new AsyncSubject
*/
public static <T> AsyncSubject<T> create() {
final SubjectState<T> state = new SubjectState<T>();
OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() {

@Override
public void call(SubjectState<T> state, Observer<? super T> o) {
// we want the last value + completed so add this extra logic
// to send onCompleted if the last value is an onNext
if (state.completed.get()) {
Notification<T> value = state.currentValue.get();
if (value != null && value.isOnNext()) {
o.onCompleted();
}
}
}
});
return new AsyncSubject<T>(onSubscribe, state);
State<T> state = new State<T>();
return new AsyncSubject<T>(new AsyncSubjectSubscribeFunc<T>(state), state);
}

private final SubjectState<T> state;

protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) {
super(onSubscribe);
/** The state. */
protected final State<T> state;
protected AsyncSubject(Observable.OnSubscribeFunc<T> osf, State<T> state) {
super(osf);
this.state = state;
}

@Override
public void onCompleted() {
/**
* Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
*/
emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() {

@Override
public void call(SubjectState<T> state, Observer<? super T> o) {
o.onCompleted();
public void onNext(T args) {
state.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not want to lock for onNext calls. That is against Rx Design Guideline 6.8:

6.8. Avoid serializing operators

As all Rx operators are bound to guideline 6.7, operators can safely assume that their inputs are serialized. Adding too much synchronization would clutter the code and can lead to performance degradation.

If an observable sequence is not following the Rx contract (see chapter 0), it is up to the developer writing the end-user application to fix the observable sequence by calling the Synchronize operator at the first place the developer gets a hold of the observable sequence. This way the scope of additional synchronization is limited to where it is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case ... I just realized because this is AsyncSubject it doesn't actually emit here, but why not have the value be an AtomicReference like it was before so we don't have locking every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is 9 days old. I have gained more experience with lock-free approaches since then. I'll revisit these classes and will fix the onNext under lock issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The operators can safely assume their inputs are serialized", but aren't the AsyncSubject and co. the entry point of the Observable library? In that case I wouldn't assume anything on their Observer side.

try {
if (state.done) {
return;
}
});
state.hasValue = true;
state.value = args;
} finally {
state.unlock();
}
}

@Override
public void onError(Throwable e) {
/**
* Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
*/
state.currentValue.set(new Notification<T>(e));
emitNotificationAndTerminate(state, null);
state.lock();
try {
if (state.done) {
return;
}
state.value = null;
state.hasValue = false;

state.defaultDispatchError(e);
} finally {
state.unlock();
}
}

@Override
public void onNext(T v) {
/**
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
*/
state.currentValue.set(new Notification<T>(v));
public void onCompleted() {
state.lock();
try {
if (state.done) {
return;
}
state.done = true;
for (Observer<? super T> o : state.removeAll()) {
if (state.hasValue) {
o.onNext(state.value);
}
o.onCompleted();
}
} finally {
state.unlock();
}
}
/** The subscription function. */
protected static final class AsyncSubjectSubscribeFunc<T> implements OnSubscribeFunc<T> {
protected final State<T> state;
protected AsyncSubjectSubscribeFunc(State<T> state) {
this.state = state;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
Throwable error;
boolean hasValue;
T value;
state.lock();
try {
if (!state.done) {
return state.addObserver(t1);
}
error = state.error;
hasValue = state.hasValue;
value = state.value;
} finally {
state.unlock();
}
if (error != null) {
t1.onError(error);
} else {
if (hasValue) {
t1.onNext(value);
}
t1.onCompleted();
}
return Subscriptions.empty();
}
}
}
Loading