Skip to content

Commit d74baa4

Browse files
committed
ensure StringObservable.join does not stall, see #23
1 parent 8ad398b commit d74baa4

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply plugin: 'rxjava-project'
77
apply plugin: 'java'
88

99
dependencies {
10-
compile 'io.reactivex:rxjava:1.0.+'
10+
compile 'io.reactivex:rxjava:1.0.12'
1111
testCompile 'junit:junit-dep:4.10'
1212
testCompile 'org.mockito:mockito-core:1.8.5'
1313
}

src/main/java/rx/observables/StringObservable.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -460,26 +460,32 @@ private void output(String part) {
460460
public static Observable<String> join(final Observable<String> source, final CharSequence separator) {
461461
return source.lift(new Operator<String, String>() {
462462
@Override
463-
public Subscriber<String> call(final Subscriber<? super String> o) {
464-
return new Subscriber<String>(o) {
463+
public Subscriber<String> call(final Subscriber<? super String> child) {
464+
Subscriber<String> parent = new Subscriber<String>() {
465+
465466
boolean mayAddSeparator;
466467
StringBuilder b = new StringBuilder();
468+
469+
@Override
470+
public void onStart() {
471+
request(Long.MAX_VALUE);
472+
}
467473

468474
@Override
469475
public void onCompleted() {
470476
String str = b.toString();
471477
b = null;
472-
if (!o.isUnsubscribed())
473-
o.onNext(str);
474-
if (!o.isUnsubscribed())
475-
o.onCompleted();
478+
if (!child.isUnsubscribed())
479+
child.onNext(str);
480+
if (!child.isUnsubscribed())
481+
child.onCompleted();
476482
}
477483

478484
@Override
479485
public void onError(Throwable e) {
480486
b = null;
481-
if (!o.isUnsubscribed())
482-
o.onError(e);
487+
if (!child.isUnsubscribed())
488+
child.onError(e);
483489
}
484490

485491
@Override
@@ -491,6 +497,8 @@ public void onNext(String t) {
491497
b.append(t);
492498
}
493499
};
500+
child.add(parent);
501+
return parent;
494502
}
495503
});
496504
}

src/test/java/rx/observables/StringObservableTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.Mockito.spy;
2727
import static org.mockito.Mockito.times;
2828
import static org.mockito.Mockito.verify;
29+
import static rx.Observable.just;
2930
import static rx.observables.StringObservable.byCharacter;
3031
import static rx.observables.StringObservable.byLine;
3132
import static rx.observables.StringObservable.decode;
@@ -456,4 +457,12 @@ public Observable<String> call(Reader reader) {
456457

457458
verify(reader, times(1)).close();
458459
}
460+
461+
@Test(timeout=5000)
462+
public void testJoinDoesNotStallIssue23() {
463+
String s = StringObservable
464+
.join(just("a","b","c"),",")
465+
.toBlocking().single();
466+
assertEquals("a,b,c", s);
467+
}
459468
}

0 commit comments

Comments
 (0)