diff --git a/rxjava-core/src/main/java/rx/operators/SafeObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java index 3e6508dac9..7f32114dbc 100644 --- a/rxjava-core/src/main/java/rx/operators/SafeObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -57,10 +57,25 @@ */ public class SafeObserver implements Observer { - private final Observer actual; + private volatile Observer actual; private final AtomicBoolean isFinished = new AtomicBoolean(false); private final SafeObservableSubscription subscription; + /** + * If the observer completes, this is swapped in place of the actual + * should avoid the overhead of isFinished.get() on every onNext call. */ + private static final Observer nopObserver = new Observer() { + @Override + public void onNext(Object args) { + } + @Override + public void onError(Throwable e) { + } + @Override + public void onCompleted() { + } + + }; public SafeObserver(SafeObservableSubscription subscription, Observer actual) { this.subscription = subscription; this.actual = actual; @@ -69,8 +84,10 @@ public SafeObserver(SafeObservableSubscription subscription, Observer @Override public void onCompleted() { if (isFinished.compareAndSet(false, true)) { + Observer a = actual; + actual = nopObserver; try { - actual.onCompleted(); + a.onCompleted(); } catch (Throwable e) { // handle errors if the onCompleted implementation fails, not just if the Observable fails onError(e); @@ -83,8 +100,11 @@ public void onCompleted() { @Override public void onError(Throwable e) { if (isFinished.compareAndSet(false, true)) { + Observer a = actual; + // will prevent onNext from sending a new value after completion + actual = nopObserver; try { - actual.onError(e); + a.onError(e); } catch (Throwable e2) { if (e2 instanceof OnErrorNotImplementedException) { /** @@ -117,12 +137,10 @@ public void onError(Throwable e) { @Override public void onNext(T args) { try { - if (!isFinished.get()) { - actual.onNext(args); - } - } catch (Throwable e) { + actual.onNext(args); + } catch (Throwable t) { // handle errors if the onNext implementation fails, not just if the Observable fails - onError(e); + onError(t); } } diff --git a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java deleted file mode 100644 index 30db331ac2..0000000000 --- a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.subjects; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -import rx.Notification; -import rx.Observer; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action2; - -public abstract class AbstractSubject extends Subject { - - protected AbstractSubject(rx.Observable.OnSubscribeFunc onSubscribe) { - super(onSubscribe); - } - - protected static class SubjectState { - protected final ConcurrentHashMap> observers = new ConcurrentHashMap>(); - protected final AtomicReference> currentValue = new AtomicReference>(); - protected final AtomicBoolean completed = new AtomicBoolean(); - protected final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock(); - } - - protected static OnSubscribeFunc getOnSubscribeFunc(final SubjectState state, final Action2, Observer> onEach) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - /* - * Subscription needs to be synchronized with terminal states to ensure - * race conditions are handled. When subscribing we must make sure - * onComplete/onError is correctly emitted to all observers, even if it - * comes in while the onComplete/onError is being propagated. - */ - state.SUBSCRIPTION_LOCK.lock(); - try { - if (state.completed.get()) { - emitNotification(state.currentValue.get(), observer); - if (onEach != null) { - onEach.call(state, observer); - } - return Subscriptions.empty(); - } else { - // the subject is not completed so we subscribe - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - subscription.wrap(new Subscription() { - @Override - public void unsubscribe() { - // on unsubscribe remove it from the map of outbound observers to notify - state.observers.remove(subscription); - } - }); - - // on subscribe add it to the map of outbound observers to notify - state.observers.put(subscription, observer); - - // invoke onSubscribe logic - if (onEach != null) { - onEach.call(state, observer); - } - - return subscription; - } - } finally { - state.SUBSCRIPTION_LOCK.unlock(); - } - - } - - }; - } - - protected static void emitNotification(Notification value, Observer observer) { - // if null that means onNext was never invoked (no Notification set) - if (value != null) { - if (value.isOnNext()) { - observer.onNext(value.getValue()); - } else if (value.isOnError()) { - observer.onError(value.getThrowable()); - } else if (value.isOnCompleted()) { - observer.onCompleted(); - } - } - } - - /** - * Emit the current value. - * - * @param state - */ - protected static void emitNotification(final SubjectState state, final Action2, Observer> onEach) { - for (Subscription s : snapshotOfObservers(state)) { - Observer o = state.observers.get(s); - // emit notifications to this observer - emitNotification(state.currentValue.get(), o); - // onEach action if applicable - if (onEach != null) { - onEach.call(state, o); - } - } - } - - /** - * Emit the current value to all observers and remove their subscription. - * - * @param state - */ - protected void emitNotificationAndTerminate(final SubjectState state, final Action2, Observer> onEach) { - /* - * We can not allow new subscribers to be added while we execute the terminal state. - */ - state.SUBSCRIPTION_LOCK.lock(); - try { - if (state.completed.compareAndSet(false, true)) { - for (Subscription s : snapshotOfObservers(state)) { - Observer o = state.observers.get(s); - // emit notifications to this observer - emitNotification(state.currentValue.get(), o); - // onEach action if applicable - if (onEach != null) { - onEach.call(state, o); - } - - // remove the subscription as it is completed - state.observers.remove(s); - } - } - } finally { - state.SUBSCRIPTION_LOCK.unlock(); - } - } - - /** - * Current snapshot of 'state.observers.keySet()' so that concurrent modifications aren't included. - * - * This makes it behave deterministically in a single-threaded execution when nesting subscribes. - * - * In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead - * of possibly being included in the current onNext iteration. - * - * @return List> - */ - private static Collection snapshotOfObservers(final SubjectState state) { - return new ArrayList(state.observers.keySet()); - } -} diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index a9c18be805..6e9dce1d35 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -15,9 +15,13 @@ */ package rx.subjects; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + import rx.Notification; import rx.Observer; -import rx.util.functions.Action2; +import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import rx.util.functions.Action1; /** * Subject that publishes only the last event to each {@link Observer} that has subscribed when the @@ -28,8 +32,8 @@ * Example usage: *

*

 {@code
-
- * / observer will receive no onNext events because the subject.onCompleted() isn't called.
+ * 
+ * // observer will receive no onNext events because the subject.onCompleted() isn't called.
   AsyncSubject subject = AsyncSubject.create();
   subject.subscribe(observer);
   subject.onNext("one");
@@ -48,68 +52,88 @@
  * 
  * @param 
  */
-public class AsyncSubject extends AbstractSubject {
+public final class AsyncSubject extends Subject {
 
-    /**
-     * Create a new AsyncSubject
-     * 
-     * @return a new AsyncSubject
-     */
     public static  AsyncSubject create() {
-        final SubjectState state = new SubjectState();
-        OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() {
+        final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
+        final AtomicReference> lastNotification = new AtomicReference>(new Notification());
 
-            @Override
-            public void call(SubjectState state, Observer o) {
-                // we want the last value + completed so add this extra logic 
-                // to send onCompleted if the last value is an onNext
-                if (state.completed.get()) {
-                    Notification value = state.currentValue.get();
-                    if (value != null && value.isOnNext()) {
-                        o.onCompleted();
+        OnSubscribeFunc onSubscribe = subscriptionManager.getOnSubscribeFunc(
+                /**
+                 * This function executes at beginning of subscription.
+                 * 
+                 * This will always run, even if Subject is in terminal state.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // nothing to do if not terminated
                     }
-                }
-            }
-        });
-        return new AsyncSubject(onSubscribe, state);
+                },
+                /**
+                 * This function executes if the Subject is terminated.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // we want the last value + completed so add this extra logic 
+                        // to send onCompleted if the last value is an onNext
+                        emitValueToObserver(lastNotification.get(), o);
+                    }
+                });
+
+        return new AsyncSubject(onSubscribe, subscriptionManager, lastNotification);
+    }
+
+    protected static  void emitValueToObserver(Notification n, Observer o) {
+        n.accept(o);
+        if (n.isOnNext()) {
+            o.onCompleted();
+        }
     }
 
-    private final SubjectState state;
+    private final SubjectSubscriptionManager subscriptionManager;
+    final AtomicReference> lastNotification;
 
-    protected AsyncSubject(OnSubscribeFunc onSubscribe, SubjectState state) {
+    protected AsyncSubject(OnSubscribeFunc onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) {
         super(onSubscribe);
-        this.state = state;
+        this.subscriptionManager = subscriptionManager;
+        this.lastNotification = lastNotification;
     }
 
     @Override
     public void onCompleted() {
-        /**
-         * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
-         */
-        emitNotificationAndTerminate(state, new Action2, Observer>() {
+        subscriptionManager.terminate(new Action1>>() {
 
             @Override
-            public void call(SubjectState state, Observer o) {
-                o.onCompleted();
+            public void call(Collection> observers) {
+                for (Observer o : observers) {
+                    emitValueToObserver(lastNotification.get(), o);
+                }
             }
         });
     }
 
     @Override
-    public void onError(Throwable e) {
-        /**
-         * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
-         */
-        state.currentValue.set(new Notification(e));
-        emitNotificationAndTerminate(state, null);
+    public void onError(final Throwable e) {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(new Notification(e));
+                for (Observer o : observers) {
+                    emitValueToObserver(lastNotification.get(), o);
+                }
+            }
+        });
+
     }
 
     @Override
     public void onNext(T v) {
-        /**
-         * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
-         */
-        state.currentValue.set(new Notification(v));
+        lastNotification.set(new Notification(v));
     }
 
 }
diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
index 053a51f0cf..4af0c54b2c 100644
--- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
+++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
@@ -15,9 +15,13 @@
  */
 package rx.subjects;
 
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+
 import rx.Notification;
 import rx.Observer;
-import rx.util.functions.Action2;
+import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
+import rx.util.functions.Action1;
 
 /**
  * Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}.
@@ -29,25 +33,38 @@
  * 
 {@code
 
  * / observer will receive all events.
-  BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+  BehaviorSubject subject = BehaviorSubject.create("default");
   subject.subscribe(observer);
   subject.onNext("one");
   subject.onNext("two");
   subject.onNext("three");
 
   // observer will receive the "one", "two" and "three" events, but not "zero"
-  BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+  BehaviorSubject subject = BehaviorSubject.create("default");
   subject.onNext("zero");
   subject.onNext("one");
   subject.subscribe(observer);
   subject.onNext("two");
   subject.onNext("three");
 
+  // observer will receive only onCompleted
+  BehaviorSubject subject = BehaviorSubject.create("default");
+  subject.onNext("zero");
+  subject.onNext("one");
+  subject.onCompleted();
+  subject.subscribe(observer);
+  
+  // observer will receive only onError
+  BehaviorSubject subject = BehaviorSubject.create("default");
+  subject.onNext("zero");
+  subject.onNext("one");
+  subject.onError(new RuntimeException("error"));
+  subject.subscribe(observer);
   } 
  * 
  * @param 
  */
-public class BehaviorSubject extends AbstractSubject {
+public final class BehaviorSubject extends Subject {
 
     /**
      * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
@@ -69,54 +86,96 @@ public static  BehaviorSubject createWithDefaultValue(T defaultValue) {
      * @return the constructed {@link BehaviorSubject}.
      */
     public static  BehaviorSubject create(T defaultValue) {
-        final SubjectState state = new SubjectState();
+        final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
         // set a default value so subscriptions will immediately receive this until a new notification is received
-        state.currentValue.set(new Notification(defaultValue));
-        OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() {
+        final AtomicReference> lastNotification = new AtomicReference>(new Notification(defaultValue));
 
-            @Override
-            public void call(SubjectState state, Observer o) {
+        OnSubscribeFunc onSubscribe = subscriptionManager.getOnSubscribeFunc(
                 /**
-                 * When we subscribe we always emit the latest value to the observer, including
-                 * terminal states which are recorded as the last value.
+                 * This function executes at beginning of subscription.
+                 * 
+                 * This will always run, even if Subject is in terminal state.
                  */
-                emitNotification(state.currentValue.get(), o);
-            }
-        });
-        return new BehaviorSubject(onSubscribe, state);
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        /*
+                         * When we subscribe we always emit the latest value to the observer.
+                         * 
+                         * Here we only emit if it's an onNext as terminal states are handled in the next function.
+                         */
+                        Notification n = lastNotification.get();
+                        if (n.isOnNext()) {
+                            n.accept(o);
+                        }
+                    }
+                },
+                /**
+                 * This function executes if the Subject is terminated before subscription occurs.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        /*
+                         * If we are already terminated, or termination happens while trying to subscribe
+                         * this will be invoked and we emit whatever the last terminal value was.
+                         */
+                        lastNotification.get().accept(o);
+                    }
+                });
+
+        return new BehaviorSubject(onSubscribe, subscriptionManager, lastNotification);
     }
 
-    private final SubjectState state;
+    private final SubjectSubscriptionManager subscriptionManager;
+    final AtomicReference> lastNotification;
 
-    protected BehaviorSubject(OnSubscribeFunc onSubscribe, SubjectState state) {
+    protected BehaviorSubject(OnSubscribeFunc onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) {
         super(onSubscribe);
-        this.state = state;
+        this.subscriptionManager = subscriptionManager;
+        this.lastNotification = lastNotification;
     }
 
     @Override
     public void onCompleted() {
-        /**
-         * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
-         */
-        state.currentValue.set(new Notification());
-        emitNotificationAndTerminate(state, null);
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(new Notification());
+                for (Observer o : observers) {
+                    o.onCompleted();
+                }
+            }
+        });
     }
 
     @Override
-    public void onError(Throwable e) {
-        /**
-         * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
-         */
-        state.currentValue.set(new Notification(e));
-        emitNotificationAndTerminate(state, null);
+    public void onError(final Throwable e) {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(new Notification(e));
+                for (Observer o : observers) {
+                    o.onError(e);
+                }
+            }
+        });
+
     }
 
     @Override
     public void onNext(T v) {
         /**
-         * Store the latest value and send it to all observers;
+         * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
          */
-        state.currentValue.set(new Notification(v));
-        emitNotification(state, null);
+        lastNotification.set(new Notification(v));
+        for (Observer o : subscriptionManager.rawSnapshot()) {
+            o.onNext(v);
+        }
     }
+
 }
diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
index 8cf2d75747..daa9dd6636 100644
--- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
+++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
@@ -15,8 +15,13 @@
  */
 package rx.subjects;
 
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+
 import rx.Notification;
 import rx.Observer;
+import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
+import rx.util.functions.Action1;
 
 /**
  * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
@@ -41,44 +46,86 @@
  * 
  * @param 
  */
-public class PublishSubject extends AbstractSubject {
+public final class PublishSubject extends Subject {
+
     public static  PublishSubject create() {
-        final SubjectState state = new SubjectState();
-        OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, null);
-        return new PublishSubject(onSubscribe, state);
+        final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
+        // set a default value so subscriptions will immediately receive this until a new notification is received
+        final AtomicReference> lastNotification = new AtomicReference>();
+
+        OnSubscribeFunc onSubscribe = subscriptionManager.getOnSubscribeFunc(
+                /**
+                 * This function executes at beginning of subscription.
+                 * 
+                 * This will always run, even if Subject is in terminal state.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // nothing onSubscribe unless in terminal state which is the next function
+                    }
+                },
+                /**
+                 * This function executes if the Subject is terminated before subscription occurs.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        /*
+                         * If we are already terminated, or termination happens while trying to subscribe
+                         * this will be invoked and we emit whatever the last terminal value was.
+                         */
+                        lastNotification.get().accept(o);
+                    }
+                });
+
+        return new PublishSubject(onSubscribe, subscriptionManager, lastNotification);
     }
 
-    private final SubjectState state;
+    private final SubjectSubscriptionManager subscriptionManager;
+    final AtomicReference> lastNotification;
 
-    protected PublishSubject(OnSubscribeFunc onSubscribe, SubjectState state) {
+    protected PublishSubject(OnSubscribeFunc onSubscribe, SubjectSubscriptionManager subscriptionManager, AtomicReference> lastNotification) {
         super(onSubscribe);
-        this.state = state;
+        this.subscriptionManager = subscriptionManager;
+        this.lastNotification = lastNotification;
     }
 
     @Override
     public void onCompleted() {
-        /**
-         * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
-         */
-        state.currentValue.set(new Notification());
-        emitNotificationAndTerminate(state, null);
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(new Notification());
+                for (Observer o : observers) {
+                    o.onCompleted();
+                }
+            }
+        });
     }
 
     @Override
-    public void onError(Throwable e) {
-        /**
-         * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
-         */
-        state.currentValue.set(new Notification(e));
-        emitNotificationAndTerminate(state, null);
+    public void onError(final Throwable e) {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                lastNotification.set(new Notification(e));
+                for (Observer o : observers) {
+                    o.onError(e);
+                }
+            }
+        });
+
     }
 
     @Override
     public void onNext(T v) {
-        /**
-         * Store the latest value and send it to all observers;
-         */
-        state.currentValue.set(new Notification(v));
-        emitNotification(state, null);
+        for (Observer o : subscriptionManager.rawSnapshot()) {
+            o.onNext(v);
+        }
     }
 }
diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
index c4985ba84a..0546da9a0b 100644
--- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
+++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
@@ -16,15 +16,15 @@
 package rx.subjects;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import rx.Notification;
 import rx.Observer;
-import rx.Subscription;
-import rx.subscriptions.Subscriptions;
-import rx.util.functions.Func1;
+import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
+import rx.util.functions.Action1;
 
 /**
  * Subject that retains all events and will replay them to an {@link Observer} that subscribes.
@@ -35,7 +35,7 @@
  * 

*

 {@code
 
- * eplaySubject subject = ReplaySubject.create();
+ * ReplaySubject subject = ReplaySubject.create();
   subject.onNext("one");
   subject.onNext("two");
   subject.onNext("three");
@@ -49,128 +49,181 @@
  * 
  * @param 
  */
-public final class ReplaySubject extends Subject
-{
-
-    private boolean isDone = false;
-    private Throwable exception = null;
-    private final Map> subscriptions = new HashMap>();
-    private final List history = Collections.synchronizedList(new ArrayList());
-
+public final class ReplaySubject extends Subject {
     public static  ReplaySubject create() {
-        return new ReplaySubject(new DelegateSubscriptionFunc());
+        return create(16);
     }
+    public static  ReplaySubject create(int initialCapacity) {
+        final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
+        final ReplayState state = new ReplayState(initialCapacity);
+
+        OnSubscribeFunc onSubscribe = subscriptionManager.getOnSubscribeFunc(
+                /**
+                 * This function executes at beginning of subscription.
+                 * We want to replay history with the subscribing thread
+                 * before the Observer gets registered.
+                 * 
+                 * This will always run, even if Subject is in terminal state.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // replay history for this observer using the subscribing thread
+                        int lastIndex = replayObserverFromIndex(state.history, 0, o);
+
+                        // now that it is caught up add to observers
+                        state.replayState.put(o, lastIndex);
+                    }
+                },
+                /**
+                 * This function executes if the Subject is terminated.
+                 */
+                new Action1>() {
+
+                    @Override
+                    public void call(SubjectObserver o) {
+                        // we will finish replaying if there is anything left
+                        replayObserverFromIndex(state.history, state.replayState.get(o), o);
+                    }
+                });
 
-    private ReplaySubject(DelegateSubscriptionFunc onSubscribe) {
-        super(onSubscribe);
-        onSubscribe.wrap(new SubscriptionFunc());
+        return new ReplaySubject(onSubscribe, subscriptionManager, state);
     }
 
-    private static final class DelegateSubscriptionFunc implements OnSubscribeFunc
-    {
-        private Func1, ? extends Subscription> delegate = null;
-
-        public void wrap(Func1, ? extends Subscription> delegate)
-        {
-            if (this.delegate != null) {
-                throw new UnsupportedOperationException("delegate already set");
-            }
-            this.delegate = delegate;
-        }
-
-        @Override
-        public Subscription onSubscribe(Observer observer)
-        {
-            return delegate.call(observer);
+    private static class ReplayState {
+        // single-producer, multi-consumer
+        final History history;
+        // each Observer is tracked here for what events they have received
+        final ConcurrentHashMap, Integer> replayState;
+        public ReplayState(int initialCapacity) {
+            history = new History(initialCapacity);
+            replayState = new ConcurrentHashMap, Integer>();
         }
     }
 
-    private class SubscriptionFunc implements Func1, Subscription>
-    {
-        @Override
-        public Subscription call(Observer observer) {
-            int item = 0;
-            Subscription subscription;
+    private final SubjectSubscriptionManager subscriptionManager;
+    private final ReplayState state;
 
-            for (;;) {
-                while (item < history.size()) {
-                    observer.onNext(history.get(item++));
-                }
+    protected ReplaySubject(OnSubscribeFunc onSubscribe, SubjectSubscriptionManager subscriptionManager, ReplayState state) {
+        super(onSubscribe);
+        this.subscriptionManager = subscriptionManager;
+        this.state = state;
+    }
 
-                synchronized (subscriptions) {
-                    if (item < history.size()) {
-                        continue;
+    @Override
+    public void onCompleted() {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                state.history.complete(new Notification());
+                for (SubjectObserver o : observers) {
+                    if (caughtUp(o)) {
+                        o.onCompleted();
                     }
+                }
+            }
+        });
+    }
 
-                    if (exception != null) {
-                        observer.onError(exception);
-                        return Subscriptions.empty();
-                    }
-                    if (isDone) {
-                        observer.onCompleted();
-                        return Subscriptions.empty();
+    @Override
+    public void onError(final Throwable e) {
+        subscriptionManager.terminate(new Action1>>() {
+
+            @Override
+            public void call(Collection> observers) {
+                state.history.complete(new Notification(e));
+                for (SubjectObserver o : observers) {
+                    if (caughtUp(o)) {
+                        o.onError(e);
                     }
-
-                    subscription = new RepeatSubjectSubscription();
-                    subscriptions.put(subscription, observer);
-                    break;
                 }
             }
+        });
+    }
 
-            return subscription;
+    @Override
+    public void onNext(T v) {
+        if (state.history.terminalValue.get() != null) {
+            return;
+        }
+        state.history.next(v);
+        for (SubjectObserver o : subscriptionManager.rawSnapshot()) {
+            if (caughtUp(o)) {
+                o.onNext(v);
+            }
         }
     }
 
-    private class RepeatSubjectSubscription implements Subscription
-    {
-        @Override
-        public void unsubscribe()
-        {
-            synchronized (subscriptions) {
-                subscriptions.remove(this);
-            }
+    /*
+     * This is not very elegant but resulted in non-trivial performance improvement by
+     * eliminating the 'replay' code-path on the normal fast-path of emitting values.
+     * 
+     * With this method: 16,151,174 ops/sec
+     * Without: 8,632,358 ops/sec
+     */
+    private boolean caughtUp(SubjectObserver o) {
+        if (!o.caughtUp) {
+            o.caughtUp = true;
+            replayObserver(o);
+            return false;
+        } else {
+            // it was caught up so proceed the "raw route"
+            return true;
         }
     }
 
-    @Override
-    public void onCompleted()
-    {
-        synchronized (subscriptions) {
-            isDone = true;
-            for (Observer observer : new ArrayList>(subscriptions.values())) {
-                observer.onCompleted();
-            }
-            subscriptions.clear();
+    private void replayObserver(SubjectObserver observer) {
+        Integer lastEmittedLink = state.replayState.get(observer);
+        if (lastEmittedLink != null) {
+            int l = replayObserverFromIndex(state.history, lastEmittedLink, observer);
+            state.replayState.put(observer, l);
+        } else {
+            throw new IllegalStateException("failed to find lastEmittedLink for: " + observer);
         }
     }
 
-    @Override
-    public void onError(Throwable e)
-    {
-        synchronized (subscriptions) {
-            if (isDone) {
-                return;
-            }
-            isDone = true;
-            exception = e;
-            for (Observer observer : new ArrayList>(subscriptions.values())) {
-                observer.onError(e);
-            }
-            subscriptions.clear();
+    private static  int replayObserverFromIndex(History history, Integer l, SubjectObserver observer) {
+        while (l < history.index.get()) {
+            observer.onNext(history.list.get(l));
+            l++;
+        }
+        if (history.terminalValue.get() != null) {
+            history.terminalValue.get().accept(observer);
         }
+
+        return l;
     }
 
-    @Override
-    public void onNext(T args)
-    {
-        synchronized (subscriptions) {
-            if (isDone) {
-                return;
-            }
-            history.add(args);
-            for (Observer observer : new ArrayList>(subscriptions.values())) {
-                observer.onNext(args);
+    /**
+     * NOT thread-safe for multi-writer. Assumes single-writer.
+     * Is thread-safe for multi-reader.
+     * 
+     * @param 
+     */
+    private static class History {
+        private final AtomicInteger index;
+        private final ArrayList list;
+        private final AtomicReference> terminalValue;
+        public History(int initialCapacity) {
+             index = new AtomicInteger(0);
+             list = new ArrayList(initialCapacity);
+             terminalValue = new AtomicReference>();
+        }
+        public boolean next(T n) {
+            if (terminalValue.get() == null) {
+                list.add(n);
+                index.getAndIncrement();
+                return true;
+            } else {
+                return false;
             }
         }
+
+        public void complete(Notification n) {
+            terminalValue.set(n);
+        }
     }
+
 }
diff --git a/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java
new file mode 100644
index 0000000000..ff7033c0c5
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java
@@ -0,0 +1,252 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subjects;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import rx.Observable.OnSubscribeFunc;
+import rx.Observer;
+import rx.Subscription;
+import rx.operators.SafeObservableSubscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action1;
+
+/* package */class SubjectSubscriptionManager {
+
+    private AtomicReference> state = new AtomicReference>(new State());
+
+    /**
+     * 
+     * @param onSubscribe
+     *            Always runs at the beginning of 'subscribe' regardless of terminal state.
+     * @param onTerminated
+     *            Only runs if Subject is in terminal state and the Observer ends up not being registered.
+     * @return
+     */
+    public OnSubscribeFunc getOnSubscribeFunc(final Action1> onSubscribe, final Action1> onTerminated) {
+        return new OnSubscribeFunc() {
+            @Override
+            public Subscription onSubscribe(Observer actualObserver) {
+                SubjectObserver observer = new SubjectObserver(actualObserver);
+                // invoke onSubscribe logic 
+                if (onSubscribe != null) {
+                    onSubscribe.call(observer);
+                }
+
+                State current;
+                State newState = null;
+                boolean addedObserver = false;
+                Subscription s;
+                do {
+                    current = state.get();
+                    if (current.terminated) {
+                        // we are terminated so don't need to do anything
+                        s = Subscriptions.empty();
+                        addedObserver = false;
+                        // break out and don't try to modify state
+                        newState = current;
+                        // wait for termination to complete if 
+                        try {
+                            current.terminationLatch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("Interrupted waiting for termination.", e);
+                        }
+                        break;
+                    } else {
+                        final SafeObservableSubscription subscription = new SafeObservableSubscription();
+                        s = subscription;
+                        addedObserver = true;
+                        subscription.wrap(new Subscription() {
+                            @Override
+                            public void unsubscribe() {
+                                State current;
+                                State newState;
+                                do {
+                                    current = state.get();
+                                    // on unsubscribe remove it from the map of outbound observers to notify
+                                    newState = current.removeObserver(subscription);
+                                } while (!state.compareAndSet(current, newState));
+                            }
+                        });
+
+                        // on subscribe add it to the map of outbound observers to notify
+                        newState = current.addObserver(subscription, observer);
+                    }
+                } while (!state.compareAndSet(current, newState));
+
+                /**
+                 * Whatever happened above, if we are terminated we run `onTerminated`
+                 */
+                if (newState.terminated && !addedObserver) {
+                    onTerminated.call(observer);
+                }
+
+                return s;
+            }
+
+        };
+    }
+
+    protected void terminate(Action1>> onTerminate) {
+        State current;
+        State newState = null;
+        do {
+            current = state.get();
+            if (current.terminated) {
+                // already terminated so do nothing
+                return;
+            } else {
+                newState = current.terminate();
+            }
+        } while (!state.compareAndSet(current, newState));
+
+        /*
+         * if we get here then we won setting the state to terminated
+         * and have a deterministic set of Observers to emit to (concurrent subscribes
+         * will have failed and will try again and see we are term
+         * inated)
+         */
+        try {
+            // had to circumvent type check, we know what the array contains
+            onTerminate.call((Collection)Arrays.asList(newState.observers));
+        } finally {
+            // mark that termination is completed
+            newState.terminationLatch.countDown();
+        }
+    }
+    /**
+     * Returns the array of observers directly.
+     * Don't modify the array!
+     * @return the array of current observers
+     */
+    public SubjectObserver[] rawSnapshot() {
+        return state.get().observers;
+    }
+
+    @SuppressWarnings("rawtypes")
+    protected static class State {
+        final boolean terminated;
+        final CountDownLatch terminationLatch;
+        final Subscription[] subscriptions;
+        final SubjectObserver[] observers;
+        // to avoid lots of empty arrays
+        final Subscription[] EMPTY_S = new Subscription[0];
+        // to avoid lots of empty arrays
+        final SubjectObserver[] EMPTY_O = new SubjectObserver[0];
+        private State(boolean isTerminated, CountDownLatch terminationLatch, 
+                Subscription[] subscriptions, SubjectObserver[] observers) {
+            this.terminationLatch = terminationLatch;
+            this.terminated = isTerminated;
+            this.subscriptions = subscriptions;
+            this.observers = observers;
+        }
+
+        State() {
+            this.terminated = false;
+            this.terminationLatch = null;
+            this.subscriptions = EMPTY_S;
+            this.observers = EMPTY_O;
+        }
+
+        public State terminate() {
+            if (terminated) {
+                throw new IllegalStateException("Already terminated.");
+            }
+            return new State(true, new CountDownLatch(1), subscriptions, observers);
+        }
+
+        public State addObserver(Subscription s, SubjectObserver observer) {
+            int n = this.observers.length;
+            
+            Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n + 1);
+            SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n + 1);
+            
+            newsubscriptions[n] = s;
+            newobservers[n] = observer;
+            
+            return createNewWith(newsubscriptions, newobservers);
+        }
+        private State createNewWith(Subscription[] newsubscriptions, SubjectObserver[] newobservers) {
+            return new State(terminated, terminationLatch, newsubscriptions, newobservers);
+        }
+
+        public State removeObserver(Subscription s) {
+            // we are empty, nothing to remove
+            if (this.observers.length == 0) {
+                return this;
+            }
+            int n = Math.max(this.observers.length - 1, 1);
+            int copied = 0;
+            Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n);
+            SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n);
+
+            for (int i = 0; i < this.subscriptions.length; i++) {
+                Subscription s0 = this.subscriptions[i];
+                if (s0 != s) {
+                    if (copied == n) {
+                        // if s was not found till the end of the iteration
+                        // we return ourselves since no modification should
+                        // have happened
+                        return this;
+                    }
+                    newsubscriptions[copied] = s0;
+                    newobservers[copied] = this.observers[i];
+                    copied++;
+                }
+            }
+            
+            if (copied == 0) {
+                return createNewWith(EMPTY_S, EMPTY_O);
+            }
+            // if somehow copied less than expected, truncate the arrays
+            // if s is unique, this should never happen
+            if (copied < n) {
+                return createNewWith(Arrays.copyOf(newsubscriptions, copied), Arrays.copyOf(newobservers, copied));
+            }
+            return createNewWith(newsubscriptions, newobservers);
+        }
+    }
+
+    protected static class SubjectObserver implements Observer {
+
+        private final Observer actual;
+        protected volatile boolean caughtUp = false;
+
+        SubjectObserver(Observer actual) {
+            this.actual = actual;
+        }
+
+        @Override
+        public void onCompleted() {
+            this.actual.onCompleted();
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            this.actual.onError(e);
+        }
+
+        @Override
+        public void onNext(T v) {
+            this.actual.onNext(v);
+        }
+
+    }
+
+}
diff --git a/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java
deleted file mode 100644
index 0aaf498bc7..0000000000
--- a/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Copyright 2013 Netflix, Inc.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package rx.subjects;
-
-import static org.junit.Assert.*;
-import rx.Observable;
-import rx.Observer;
-import rx.Subscription;
-import rx.util.functions.Action1;
-import rx.util.functions.Func0;
-
-/* package */class UnsubscribeTester {
-
-    /*
-     * This is purposefully package-only so it does not leak into the public API outside of this package.
-     * 
-     * This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility.
-     * 
-     * benjchristensen => I'm procrastinating the decision of where and how these types of classes (see rx.operators.OperatorTester) should exist.
-     * If they are only for internal implementations then I don't want them as part of the API.
-     * If they are truly useful for everyone to use then an "rx.testing" package may make sense.
-     */
-
-    private boolean isDone = false;
-    private Subscription subscription;
-
-    public UnsubscribeTester() {
-    }
-
-    /**
-     * Tests the unsubscription semantics of an observable.
-     * 
-     * @param provider
-     *            Function that when called provides an instance of the observable being tested
-     * @param generateOnCompleted
-     *            Causes an observer generated by @param provider to generate an onCompleted event. Null to not test onCompleted.
-     * @param generateOnError
-     *            Causes an observer generated by @param provider to generate an onError event. Null to not test onError.
-     * @param generateOnNext
-     *            Causes an observer generated by @param provider to generate an onNext event. Null to not test onNext.
-     * @param 
-     *            The type of object passed by the Observable
-     */
-    public static > void test(Func0 provider, Action1 generateOnCompleted, Action1 generateOnError, Action1 generateOnNext)
-    {
-        if (generateOnCompleted != null) {
-            O observable = provider.call();
-            UnsubscribeTester tester1 = createOnCompleted(observable);
-            UnsubscribeTester tester2 = createOnCompleted(observable);
-            generateOnCompleted.call(observable);
-            tester1.assertPassed();
-            tester2.assertPassed();
-        }
-        if (generateOnError != null) {
-            O observable = provider.call();
-            UnsubscribeTester tester1 = createOnError(observable);
-            UnsubscribeTester tester2 = createOnError(observable);
-            generateOnError.call(observable);
-            tester1.assertPassed();
-            tester2.assertPassed();
-        }
-        if (generateOnNext != null) {
-            O observable = provider.call();
-            UnsubscribeTester tester1 = createOnNext(observable);
-            UnsubscribeTester tester2 = createOnNext(observable);
-            generateOnNext.call(observable);
-            tester1.assertPassed();
-            tester2.assertPassed();
-        }
-    }
-
-    private static  UnsubscribeTester createOnCompleted(Observable observable)
-    {
-        final UnsubscribeTester test = new UnsubscribeTester();
-        test.setSubscription(observable.subscribe(new Observer()
-        {
-            @Override
-            public void onCompleted()
-            {
-                test.doUnsubscribe("onCompleted");
-            }
-
-            @Override
-            public void onError(Throwable e)
-            {
-                test.gotEvent("onError");
-            }
-
-            @Override
-            public void onNext(T args)
-            {
-                test.gotEvent("onNext");
-            }
-        }));
-        return test;
-    }
-
-    private static  UnsubscribeTester createOnError(Observable observable)
-    {
-        final UnsubscribeTester test = new UnsubscribeTester();
-        test.setSubscription(observable.subscribe(new Observer()
-        {
-            @Override
-            public void onCompleted()
-            {
-                test.gotEvent("onCompleted");
-            }
-
-            @Override
-            public void onError(Throwable e)
-            {
-                test.doUnsubscribe("onError");
-            }
-
-            @Override
-            public void onNext(T args)
-            {
-                test.gotEvent("onNext");
-            }
-        }));
-        return test;
-    }
-
-    private static  UnsubscribeTester createOnNext(Observable observable)
-    {
-        final UnsubscribeTester test = new UnsubscribeTester();
-        test.setSubscription(observable.subscribe(new Observer()
-        {
-            @Override
-            public void onCompleted()
-            {
-                test.gotEvent("onCompleted");
-            }
-
-            @Override
-            public void onError(Throwable e)
-            {
-                test.gotEvent("onError");
-            }
-
-            @Override
-            public void onNext(T args)
-            {
-                test.doUnsubscribe("onNext");
-            }
-        }));
-        return test;
-    }
-
-    private void setSubscription(Subscription subscription)
-    {
-        this.subscription = subscription;
-    }
-
-    private void gotEvent(String event)
-    {
-        assertFalse("received " + event + " after unsubscribe", isDone);
-    }
-
-    private void doUnsubscribe(String event)
-    {
-        gotEvent(event);
-        if (subscription != null) {
-            isDone = true;
-            subscription.unsubscribe();
-        }
-    }
-
-    private void assertPassed()
-    {
-        assertTrue("expected notification was received", isDone);
-    }
-}
diff --git a/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java
index b483b9c99f..7958d87439 100644
--- a/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java
@@ -172,29 +172,6 @@ public void testUnsubscribeBeforeCompleted() {
         verify(aObserver, Mockito.never()).onCompleted();
     }
 
-    @Test
-    public void testUnsubscribe() {
-        UnsubscribeTester.test(
-                new Func0>() {
-                    @Override
-                    public AsyncSubject call() {
-                        return AsyncSubject.create();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(AsyncSubject DefaultSubject) {
-                        DefaultSubject.onCompleted();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(AsyncSubject DefaultSubject) {
-                        DefaultSubject.onError(new Throwable());
-                    }
-                },
-                null
-                );
-    }
-
     @Test
     public void testEmptySubjectCompleted() {
         AsyncSubject subject = AsyncSubject.create();
@@ -215,7 +192,7 @@ public void testEmptySubjectCompleted() {
     /**
      * Can receive timeout if subscribe never receives an onError/onCompleted ... which reveals a race condition.
      */
-    @Test
+    @Test(timeout=10000)
     public void testSubscribeCompletionRaceCondition() {
         /*
          * With non-threadsafe code this fails most of the time on my dev laptop and is non-deterministic enough
diff --git a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
index 57958c108f..19badb2d95 100644
--- a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
@@ -24,16 +24,14 @@
 
 import rx.Observer;
 import rx.Subscription;
-import rx.util.functions.Action1;
-import rx.util.functions.Func0;
 
 public class BehaviorSubjectTest {
 
     private final Throwable testException = new Throwable();
 
     @Test
-    public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
-        BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+    public void testThatObserverReceivesDefaultValueAndSubsequentEvents() {
+        BehaviorSubject subject = BehaviorSubject.create("default");
 
         @SuppressWarnings("unchecked")
         Observer aObserver = mock(Observer.class);
@@ -52,7 +50,7 @@ public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
     }
 
     @Test
-    public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
+    public void testThatObserverReceivesLatestAndThenSubsequentEvents() {
         BehaviorSubject subject = BehaviorSubject.create("default");
 
         subject.onNext("one");
@@ -73,7 +71,7 @@ public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished()
     }
 
     @Test
-    public void testCompleted() {
+    public void testSubscribeThenOnComplete() {
         BehaviorSubject subject = BehaviorSubject.create("default");
 
         @SuppressWarnings("unchecked")
@@ -89,6 +87,39 @@ public void testCompleted() {
         verify(aObserver, times(1)).onCompleted();
     }
 
+    @Test
+    public void testSubscribeToCompletedOnlyEmitsOnComplete() {
+        BehaviorSubject subject = BehaviorSubject.create("default");
+        subject.onNext("one");
+        subject.onCompleted();
+
+        @SuppressWarnings("unchecked")
+        Observer aObserver = mock(Observer.class);
+        subject.subscribe(aObserver);
+
+        verify(aObserver, never()).onNext("default");
+        verify(aObserver, never()).onNext("one");
+        verify(aObserver, Mockito.never()).onError(any(Throwable.class));
+        verify(aObserver, times(1)).onCompleted();
+    }
+
+    @Test
+    public void testSubscribeToErrorOnlyEmitsOnError() {
+        BehaviorSubject subject = BehaviorSubject.create("default");
+        subject.onNext("one");
+        RuntimeException re = new RuntimeException("test error");
+        subject.onError(re);
+
+        @SuppressWarnings("unchecked")
+        Observer aObserver = mock(Observer.class);
+        subject.subscribe(aObserver);
+
+        verify(aObserver, never()).onNext("default");
+        verify(aObserver, never()).onNext("one");
+        verify(aObserver, times(1)).onError(re);
+        verify(aObserver, never()).onCompleted();
+    }
+
     @Test
     public void testCompletedStopsEmittingData() {
         BehaviorSubject channel = BehaviorSubject.create(2013);
@@ -136,7 +167,7 @@ public void testCompletedStopsEmittingData() {
     }
 
     @Test
-    public void testCompletedAfterError() {
+    public void testCompletedAfterErrorIsNotSent() {
         BehaviorSubject subject = BehaviorSubject.create("default");
 
         @SuppressWarnings("unchecked")
@@ -151,32 +182,8 @@ public void testCompletedAfterError() {
         verify(aObserver, times(1)).onNext("default");
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, times(1)).onError(testException);
+        verify(aObserver, never()).onNext("two");
+        verify(aObserver, never()).onCompleted();
     }
 
-    @Test
-    public void testUnsubscribe() {
-        UnsubscribeTester.test(
-                new Func0>() {
-                    @Override
-                    public BehaviorSubject call() {
-                        return BehaviorSubject.create("default");
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(BehaviorSubject DefaultSubject) {
-                        DefaultSubject.onCompleted();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(BehaviorSubject DefaultSubject) {
-                        DefaultSubject.onError(new Throwable());
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(BehaviorSubject DefaultSubject) {
-                        DefaultSubject.onNext("one");
-                    }
-                }
-                );
-    }
 }
diff --git a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
index 5e9cc2790f..518d9b7b74 100644
--- a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
@@ -209,33 +209,6 @@ private void assertObservedUntilTwo(Observer aObserver) {
         verify(aObserver, Mockito.never()).onCompleted();
     }
 
-    @Test
-    public void testUnsubscribe() {
-        UnsubscribeTester.test(
-                new Func0>() {
-                    @Override
-                    public PublishSubject call() {
-                        return PublishSubject.create();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(PublishSubject DefaultSubject) {
-                        DefaultSubject.onCompleted();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(PublishSubject DefaultSubject) {
-                        DefaultSubject.onError(new Throwable());
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(PublishSubject DefaultSubject) {
-                        DefaultSubject.onNext("one");
-                    }
-                }
-                );
-    }
-
     @Test
     public void testNestedSubscribe() {
         final PublishSubject s = PublishSubject.create();
diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java
new file mode 100644
index 0000000000..60ff4f0cfd
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java
@@ -0,0 +1,326 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subjects;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observable.OnSubscribeFunc;
+import rx.Observer;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action1;
+
+public class ReplaySubjectConcurrencyTest {
+
+    public static void main(String args[]) {
+        try {
+            for (int i = 0; i < 100; i++) {
+                new ReplaySubjectConcurrencyTest().testSubscribeCompletionRaceCondition();
+                new ReplaySubjectConcurrencyTest().testReplaySubjectConcurrentSubscriptions();
+                new ReplaySubjectConcurrencyTest().testReplaySubjectConcurrentSubscribersDoingReplayDontBlockEachOther();
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test(timeout = 4000)
+    public void testReplaySubjectConcurrentSubscribersDoingReplayDontBlockEachOther() throws InterruptedException {
+        final ReplaySubject replay = ReplaySubject.create();
+        Thread source = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                Observable.create(new OnSubscribeFunc() {
+
+                    @Override
+                    public Subscription onSubscribe(Observer o) {
+                        System.out.println("********* Start Source Data ***********");
+                        for (long l = 1; l <= 10000; l++) {
+                            o.onNext(l);
+                        }
+                        System.out.println("********* Finished Source Data ***********");
+                        o.onCompleted();
+                        return Subscriptions.empty();
+                    }
+                }).subscribe(replay);
+            }
+        });
+        source.start();
+
+        long v = replay.toBlockingObservable().last();
+        assertEquals(10000, v);
+
+        // it's been played through once so now it will all be replays
+        final CountDownLatch slowLatch = new CountDownLatch(1);
+        Thread slowThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                Observer slow = new Observer() {
+
+                    @Override
+                    public void onCompleted() {
+                        System.out.println("*** Slow Observer completed");
+                        slowLatch.countDown();
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                    }
+
+                    @Override
+                    public void onNext(Long args) {
+                        if (args == 1) {
+                            System.out.println("*** Slow Observer STARTED");
+                        }
+                        try {
+                            if (args % 10 == 0) {
+                                Thread.sleep(1);
+                            }
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                };
+                replay.subscribe(slow);
+                try {
+                    slowLatch.await();
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
+                }
+            }
+        });
+        slowThread.start();
+
+        Thread fastThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                final CountDownLatch fastLatch = new CountDownLatch(1);
+                Observer fast = new Observer() {
+
+                    @Override
+                    public void onCompleted() {
+                        System.out.println("*** Fast Observer completed");
+                        fastLatch.countDown();
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                    }
+
+                    @Override
+                    public void onNext(Long args) {
+                        if (args == 1) {
+                            System.out.println("*** Fast Observer STARTED");
+                        }
+                    }
+                };
+                replay.subscribe(fast);
+                try {
+                    fastLatch.await();
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
+                }
+            }
+        });
+        fastThread.start();
+        fastThread.join();
+
+        // slow should not yet be completed when fast completes
+        assertEquals(1, slowLatch.getCount());
+
+        slowThread.join();
+    }
+
+    @Test
+    public void testReplaySubjectConcurrentSubscriptions() throws InterruptedException {
+        final ReplaySubject replay = ReplaySubject.create();
+        Thread source = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                Observable.create(new OnSubscribeFunc() {
+
+                    @Override
+                    public Subscription onSubscribe(Observer o) {
+                        System.out.println("********* Start Source Data ***********");
+                        for (long l = 1; l <= 10000; l++) {
+                            o.onNext(l);
+                        }
+                        System.out.println("********* Finished Source Data ***********");
+                        o.onCompleted();
+                        return Subscriptions.empty();
+                    }
+                }).subscribe(replay);
+            }
+        });
+
+        // used to collect results of each thread
+        final List> listOfListsOfValues = Collections.synchronizedList(new ArrayList>());
+        final List threads = Collections.synchronizedList(new ArrayList());
+
+        for (int i = 1; i <= 200; i++) {
+            final int count = i;
+            if (count == 20) {
+                // start source data after we have some already subscribed
+                // and while others are in process of subscribing
+                source.start();
+            }
+            if (count == 100) {
+                // wait for source to finish then keep adding after it's done
+                source.join();
+            }
+            Thread t = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    List values = replay.toList().toBlockingObservable().last();
+                    listOfListsOfValues.add(values);
+                    System.out.println("Finished thread: " + count);
+                }
+            });
+            t.start();
+            System.out.println("Started thread: " + i);
+            threads.add(t);
+        }
+
+        // wait for all threads to complete
+        for (Thread t : threads) {
+            t.join();
+        }
+
+        // assert all threads got the same results
+        List sums = new ArrayList();
+        for (List values : listOfListsOfValues) {
+            long v = 0;
+            for (long l : values) {
+                v += l;
+            }
+            sums.add(v);
+        }
+
+        long expected = sums.get(0);
+        boolean success = true;
+        for (long l : sums) {
+            if (l != expected) {
+                success = false;
+                System.out.println("FAILURE => Expected " + expected + " but got: " + l);
+            }
+        }
+
+        if (success) {
+            System.out.println("Success! " + sums.size() + " each had the same sum of " + expected);
+        } else {
+            throw new RuntimeException("Concurrency Bug");
+        }
+
+    }
+
+    /**
+     * Can receive timeout if subscribe never receives an onError/onCompleted ... which reveals a race condition.
+     */
+    @Test(timeout = 10000)
+    public void testSubscribeCompletionRaceCondition() {
+        for (int i = 0; i < 50; i++) {
+            final ReplaySubject subject = ReplaySubject.create();
+            final AtomicReference value1 = new AtomicReference();
+
+            subject.subscribe(new Action1() {
+
+                @Override
+                public void call(String t1) {
+                    try {
+                        // simulate a slow observer
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    value1.set(t1);
+                }
+
+            });
+
+            Thread t1 = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    subject.onNext("value");
+                    subject.onCompleted();
+                }
+            });
+
+            SubjectObserverThread t2 = new SubjectObserverThread(subject);
+            SubjectObserverThread t3 = new SubjectObserverThread(subject);
+            SubjectObserverThread t4 = new SubjectObserverThread(subject);
+            SubjectObserverThread t5 = new SubjectObserverThread(subject);
+
+            t2.start();
+            t3.start();
+            t1.start();
+            t4.start();
+            t5.start();
+            try {
+                t1.join();
+                t2.join();
+                t3.join();
+                t4.join();
+                t5.join();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+
+            assertEquals("value", value1.get());
+            assertEquals("value", t2.value.get());
+            assertEquals("value", t3.value.get());
+            assertEquals("value", t4.value.get());
+            assertEquals("value", t5.value.get());
+        }
+
+    }
+
+    private static class SubjectObserverThread extends Thread {
+
+        private final ReplaySubject subject;
+        private final AtomicReference value = new AtomicReference();
+
+        public SubjectObserverThread(ReplaySubject subject) {
+            this.subject = subject;
+        }
+
+        @Override
+        public void run() {
+            try {
+                // a timeout exception will happen if we don't get a terminal state 
+                String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlockingObservable().single();
+                value.set(v);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
index 005b56ba5a..e853944820 100644
--- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
@@ -15,17 +15,20 @@
  */
 package rx.subjects;
 
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 
 import rx.Observer;
 import rx.Subscription;
-import rx.util.functions.Action1;
-import rx.util.functions.Func0;
+import rx.schedulers.Schedulers;
 
 public class ReplaySubjectTest {
 
@@ -243,30 +246,90 @@ private void assertObservedUntilTwo(Observer aObserver) {
         verify(aObserver, Mockito.never()).onCompleted();
     }
 
-    @Test
-    public void testUnsubscribe() {
-        UnsubscribeTester.test(
-                new Func0>() {
-                    @Override
-                    public ReplaySubject call() {
-                        return ReplaySubject.create();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(ReplaySubject repeatSubject) {
-                        repeatSubject.onCompleted();
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(ReplaySubject repeatSubject) {
-                        repeatSubject.onError(new Throwable());
-                    }
-                }, new Action1>() {
-                    @Override
-                    public void call(ReplaySubject repeatSubject) {
-                        repeatSubject.onNext("one");
+    @Test(timeout = 2000)
+    public void testNewSubscriberDoesntBlockExisting() throws InterruptedException {
+
+        final AtomicReference lastValueForObserver1 = new AtomicReference();
+        Observer observer1 = new Observer() {
+
+            @Override
+            public void onCompleted() {
+
+            }
+
+            @Override
+            public void onError(Throwable e) {
+
+            }
+
+            @Override
+            public void onNext(String v) {
+                System.out.println("observer1: " + v);
+                lastValueForObserver1.set(v);
+            }
+
+        };
+
+        final AtomicReference lastValueForObserver2 = new AtomicReference();
+        final CountDownLatch oneReceived = new CountDownLatch(1);
+        final CountDownLatch makeSlow = new CountDownLatch(1);
+        final CountDownLatch completed = new CountDownLatch(1);
+        Observer observer2 = new Observer() {
+
+            @Override
+            public void onCompleted() {
+                completed.countDown();
+            }
+
+            @Override
+            public void onError(Throwable e) {
+
+            }
+
+            @Override
+            public void onNext(String v) {
+                System.out.println("observer2: " + v);
+                if (v.equals("one")) {
+                    oneReceived.countDown();
+                } else {
+                    try {
+                        makeSlow.await();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
                     }
+                    lastValueForObserver2.set(v);
                 }
-                );
+            }
+
+        };
+
+        ReplaySubject subject = ReplaySubject.create();
+        Subscription s1 = subject.subscribe(observer1);
+        subject.onNext("one");
+        assertEquals("one", lastValueForObserver1.get());
+        subject.onNext("two");
+        assertEquals("two", lastValueForObserver1.get());
+
+        Subscription s2 = subject.observeOn(Schedulers.newThread()).subscribe(observer2);
+
+        System.out.println("before waiting for one");
+        
+        // wait until observer2 starts having replay occur
+        oneReceived.await();
+        
+        System.out.println("after waiting for one");
+        
+        subject.onNext("three");
+        // if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet 
+        assertEquals("three", lastValueForObserver1.get());
+        subject.onCompleted();
+
+        // release 
+        makeSlow.countDown();
+        completed.await();
+        // all of them should be emitted with the last being "three"
+        assertEquals("three", lastValueForObserver2.get());
+
     }
+
 }
diff --git a/rxjava-core/src/test/java/rx/subjects/SubjectPerformanceTests.java b/rxjava-core/src/test/java/rx/subjects/SubjectPerformanceTests.java
new file mode 100644
index 0000000000..2574b1bcd9
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/subjects/SubjectPerformanceTests.java
@@ -0,0 +1,150 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subjects;
+
+import rx.Observer;
+import rx.util.functions.Action0;
+
+public class SubjectPerformanceTests {
+
+    private static final int REPETITIONS = 10 * 1000 * 1000;
+    private static final int NUM_PRODUCERS = 1;
+
+    public static void main(String args[]) {
+
+        final SubjectPerformanceTests spt = new SubjectPerformanceTests();
+        try {
+            spt.runTest(new Action0() {
+
+                @Override
+                public void call() {
+                    //                    spt.baseline();
+                    spt.unboundedReplaySubject();
+                }
+            });
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private void runTest(Action0 action) throws InterruptedException {
+        for (int runNum = 0; runNum < 15; runNum++) {
+            System.gc();
+            Thread.sleep(1000L);
+
+            final long start = System.nanoTime();
+
+            action.call();
+
+            long duration = System.nanoTime() - start;
+            long opsPerSec = (REPETITIONS * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration;
+            System.out.printf("Run: %d - %,d ops/sec \n",
+                    Integer.valueOf(runNum),
+                    Long.valueOf(opsPerSec));
+        }
+    }
+
+    /**
+     * Baseline ops/second without a subject.
+     * 
+     * Perf along this order of magnitude:
+     * 
+     * Run: 10 - 316,235,532 ops/sec
+     * Run: 11 - 301,886,792 ops/sec
+     * Run: 12 - 310,472,228 ops/sec
+     * Run: 13 - 313,469,797 ops/sec
+     * Run: 14 - 305,380,809 ops/sec
+     */
+    public long baseline() {
+
+        LongObserver o = new LongObserver();
+        for (long l = 0; l < REPETITIONS; l++) {
+            o.onNext(l);
+        }
+        o.onCompleted();
+        return o.sum;
+    }
+
+    /**
+     * ReplaySubject
+     * 
+     * This is testing pass-thru, not replay speed (though it will be storing all of the history).
+     * 
+     * ArrayList with raw values & synchronized access
+     * 
+     * Run: 10 - 11,993,341 ops/sec
+     * Run: 11 - 11,719,523 ops/sec
+     * Run: 12 - 11,965,214 ops/sec
+     * Run: 13 - 11,814,730 ops/sec
+     * Run: 14 - 11,947,459 ops/sec
+     * 
+     * Custom linked list + Notification (failed experiment)
+     * 
+     * Run: 10 - 2,102,785 ops/sec
+     * Run: 11 - 2,109,057 ops/sec
+     * Run: 12 - 2,094,372 ops/sec
+     * Run: 13 - 2,096,183 ops/sec
+     * Run: 14 - 2,107,019 ops/sec
+     * 
+     * Custom linked list + raw values (elegant code ... but another failed experiment)
+     * 
+     * Run: 10 - 5,131,634 ops/sec
+     * Run: 11 - 5,133,114 ops/sec
+     * Run: 12 - 5,080,652 ops/sec
+     * Run: 13 - 5,072,743 ops/sec
+     * Run: 14 - 5,198,813 ops/sec
+     * 
+     * ArrayList with raw values & non-blocking (no synchronization)
+     * 
+     * Run: 10 - 16,069,678 ops/sec
+     * Run: 11 - 15,954,688 ops/sec
+     * Run: 12 - 16,158,874 ops/sec
+     * Run: 13 - 16,209,504 ops/sec
+     * Run: 14 - 16,151,174 ops/sec
+     */
+    public long unboundedReplaySubject() {
+        ReplaySubject s = ReplaySubject.create();
+        LongObserver o = new LongObserver();
+        s.subscribe(o);
+        for (long l = 0; l < REPETITIONS; l++) {
+            s.onNext(l);
+        }
+        s.onCompleted();
+        return o.sum;
+    }
+
+    private static class LongObserver implements Observer {
+
+        long sum = 0;
+
+        @Override
+        public void onCompleted() {
+
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            throw new RuntimeException(e);
+        }
+
+        @Override
+        public void onNext(Long l) {
+            sum += l;
+        }
+    }
+
+}