Skip to content

modify ReplaySubject (with specified capacity) to comply with original reactive extensions behaviour #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed

Conversation

wangsha
Copy link

@wangsha wangsha commented Jan 15, 2014

modify ReplaySubject (with specified capacity) to comply with original reactive extensions behaviour

the capacity of ReplaySubject specifies maximum element count of the replay buffer

code:
https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs
test:
https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs

@cloudbees-pull-request-builder

RxJava-pull-requests #668 FAILURE
Looks like there's a problem with this pull request

…y-subject-with-capacity

Conflicts:
	rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
@cloudbees-pull-request-builder

RxJava-pull-requests #669 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #670 SUCCESS
This pull request looks good

@akarnokd
Copy link
Member

Hi, not really necessary as OperationReplay.replayBuffered() allows this but it is not in the public API area. @benjchristensen could a rx.subject.Subjects utility class collect all subject factory methods from the other subjects?

Edit, btw, the create(initialCapacity) isn't meant for bounded buffering but to set the capacity of the internal buffer to avoid constant reallocation if it needs to grow.

@wangsha
Copy link
Author

wangsha commented Jan 16, 2014

I agree the initialCapacity isn't meant for bounded buffering. This is where the confusion comes from. I am writing the same application on both iOS (ReactiveCocoa) and Android(RxJava). The different behaviours of ReplaySubject with capacity is confusing. ReplaySubject on ReactiveCocoa

I think it is good to keep the behaviour consistent across different platforms, following the original .Net reactive extensions design. :)

@benjchristensen
Copy link
Member

ReplaySubject should behave the same as Rx.Net. Any different behavior is a mistake.

@benjchristensen
Copy link
Member

could a rx.subject.Subjects utility class collect all subject factory methods from the other subjects?

Are you referring to something like this?

Subjects.createAsync();
Subjects.createBehavior();
Subjects.createPublish();
Subjects.createReplay();
Subjects.createReplayBuffered(size);
Subjects.createReplayBuffered(size, time);

However, in this case the following seems clearer:

AsyncSubject.create();
BehaviorSubject.create();
PublishSubject.create();
ReplaySubject.create();
ReplaySubject.createBuffered(size);
ReplaySubject.createBuffered(size, time);

In general I prefer the latter. On Subscriptions it does not offer factory methods for each Subscription impl, it is still each class that has it's own factory methods such as BooleanSubscription.create(). The Subscriptions factories are generic, all returning Subscription.

Are there generic things that belong on a Subjects class?

@benjchristensen
Copy link
Member

Thank you @wangsha for getting involved.

I believe we have all of the replay functionality available on Observable:

ConnectableObservable<T>    replay() 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnit unit) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, Scheduler scheduler) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnit unit) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, Scheduler scheduler) 
 ConnectableObservable<T>   replay(int bufferSize) 
 ConnectableObservable<T>   replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit) 
 ConnectableObservable<T>   replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 
 ConnectableObservable<T>   replay(int bufferSize, Scheduler scheduler) 
 ConnectableObservable<T>   replay(long time, java.util.concurrent.TimeUnit unit) 
 ConnectableObservable<T>   replay(long time, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 
 ConnectableObservable<T>   replay(Scheduler scheduler) 

Unfortunately though, I don't think it's all been implemented using ReplaySubject, but instead some other Subject implementations as internal implementation details of making Observable work, and not ReplaySubject.

We need to fix this and make ReplaySubject be the backing implementation for all of these so it matches Rx.Net with these signatures: http://msdn.microsoft.com/en-us/library/hh211810(v=vs.103).aspx

ReplaySubject<T>()  Creates a replayable subject.
ReplaySubject<T>(Integer)   Initializes a new instance of the ReplaySubject<T> class with the specified buffer size.
ReplaySubject<T>(TimeSpan)  Initializes a new instance of the ReplaySubject<T> class with the specified window.
ReplaySubject<T>(Scheduler) Initializes a new instance of the ReplaySubject<T> class with the specified scheduler.
ReplaySubject<T>(Integer, Scheduler)    Initializes a new instance of the ReplaySubject<T> class with the specified buffer size and scheduler.
ReplaySubject<T>(Integer, TimeSpan) Initializes a new instance of the ReplaySubject<T> class with the specified buffer size and window.
ReplaySubject<T>(TimeSpan, Scheduler)   Initializes a new instance of the ReplaySubject<T> class with the specified window and scheduler.
ReplaySubject<T>(Integer, TimeSpan, Scheduler)  Initializes a new instance of the ReplaySubject<T> class with the specified buffer size, window and scheduler.

@benjchristensen
Copy link
Member

@akarnokd Since you have implemented most of the replay overloads, are you interested and do you have the time to migrate or re-implement the functionality on ReplaySubject as discussed above from where it currently resides inside OperationReplay?

This should of course not lose the benefits we gained from the 0.16.0 work such as in #651

@akarnokd
Copy link
Member

A simple thing is to expose OperationReplay.replayBuffered in ReplaySubject. Rewriting CustomReplaySubject to use similar state machine is more complicated and I can't do it this week.

@wangsha
Copy link
Author

wangsha commented Jan 20, 2014

@akarnokd , @benjchristensen cool. Thanks for looking into this issue. Looking forward to the update.

@cloudbees-pull-request-builder

RxJava-pull-requests #683 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #709 FAILURE
Looks like there's a problem with this pull request

…ubject-with-capacity

Conflicts:
	rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
	rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
@cloudbees-pull-request-builder

RxJava-pull-requests #710 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

Keeping this open for now as I will merge this onto my branch when I work on ReplaySubject. As is though I'm not ready to merge.

@benjchristensen
Copy link
Member

Closing out. This is referenced from #865

jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants