Skip to content

Commit 3ec7000

Browse files
authored
Delayed write streams race condition. (#517)
`BackpressureManagingHandler` was not correctly enqueuing the writes to the eventloop when done from outside the eventloop. Fixed the problem and also added test.
1 parent 6ebcf23 commit 3ec7000

File tree

3 files changed

+71
-7
lines changed

3 files changed

+71
-7
lines changed

rxnetty-common/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,8 @@ public void onNext(Object nextItem) {
503503
if (null == writeWorker) {
504504
if (!inEL) {
505505
atleastOneWriteEnqueued = true;
506-
} else if (atleastOneWriteEnqueued) {
506+
}
507+
if (atleastOneWriteEnqueued) {
507508
writeWorker = Schedulers.computation().createWorker();
508509
}
509510
}
@@ -594,7 +595,7 @@ public void call() {
594595
private void onTermination(Throwable throwableIfAny) {
595596
int _listeningTo;
596597
boolean _shouldCompletePromise;
597-
final boolean flush;
598+
final boolean enqueueFlush;
598599

599600
/**
600601
* The intent here is to NOT give listener callbacks via promise completion within the sync block.
@@ -607,7 +608,7 @@ private void onTermination(Throwable throwableIfAny) {
607608
* This co-oridantion is done via the flag isPromiseCompletedOnWriteComplete
608609
*/
609610
synchronized (guard) {
610-
flush = atleastOneWriteEnqueued;
611+
enqueueFlush = atleastOneWriteEnqueued;
611612
isDone = true;
612613
_listeningTo = listeningTo;
613614
/**
@@ -617,7 +618,7 @@ private void onTermination(Throwable throwableIfAny) {
617618
_shouldCompletePromise = 0 == _listeningTo && !isPromiseCompletedOnWriteComplete;
618619
}
619620

620-
if (flush) {
621+
if (enqueueFlush) {
621622
writeWorker.schedule(new Action0() {
622623
@Override
623624
public void call() {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.reactivex.netty.protocol.http.server;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.handler.codec.http.HttpResponseStatus;
5+
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
6+
import org.junit.Rule;
7+
import org.junit.Test;
8+
import rx.Observable;
9+
import rx.Observable.OnSubscribe;
10+
import rx.Scheduler.Worker;
11+
import rx.Subscriber;
12+
import rx.functions.Action0;
13+
import rx.schedulers.Schedulers;
14+
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import static io.reactivex.netty.protocol.http.server.HttpServerRule.*;
19+
import static org.hamcrest.MatcherAssert.*;
20+
import static org.hamcrest.Matchers.*;
21+
22+
public class HttpEndToEndTest {
23+
24+
@Rule
25+
public final HttpServerRule rule = new HttpServerRule();
26+
27+
@Test(timeout = 60000)
28+
public void testDelayedWrites() throws Exception {
29+
30+
final AtomicReference<Throwable> errorFromWriteStreamCompletion = new AtomicReference<>();
31+
final Worker worker = Schedulers.computation().createWorker();
32+
rule.startServer(new RequestHandler<ByteBuf, ByteBuf>() {
33+
@Override
34+
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
35+
return response.writeString(Observable.create(new OnSubscribe<String>() {
36+
@Override
37+
public void call(final Subscriber<? super String> subscriber) {
38+
worker.schedule(new Action0() {
39+
@Override
40+
public void call() {
41+
try {
42+
subscriber.onNext(WELCOME_SERVER_MSG);
43+
subscriber.onCompleted();
44+
} catch (Exception e) {
45+
errorFromWriteStreamCompletion.set(e);
46+
}
47+
}
48+
}, 1, TimeUnit.MILLISECONDS);
49+
50+
}
51+
}));
52+
}
53+
});
54+
55+
final HttpClientResponse<ByteBuf> response = rule.sendRequest(rule.getClient().createGet("/"));
56+
57+
assertThat("Unexpected response code.", response.getStatus(), is(HttpResponseStatus.OK));
58+
59+
rule.assertResponseContent(response);
60+
61+
assertThat("Unexpected exception on server.", errorFromWriteStreamCompletion.get(), is(nullValue()));
62+
}
63+
}

rxnetty-http/src/test/java/io/reactivex/netty/protocol/http/server/HttpServerRule.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package io.reactivex.netty.protocol.http.server;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.handler.logging.LogLevel;
2021
import io.reactivex.netty.protocol.http.client.HttpClient;
21-
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
2222
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
2323
import org.junit.rules.ExternalResource;
2424
import org.junit.runner.Description;
@@ -46,7 +46,7 @@ public Statement apply(final Statement base, Description description) {
4646
return new Statement() {
4747
@Override
4848
public void evaluate() throws Throwable {
49-
server = HttpServer.newServer();
49+
server = HttpServer.newServer().enableWireLogging("test", LogLevel.INFO);
5050
base.evaluate();
5151
}
5252
};
@@ -74,7 +74,7 @@ public void setupClient(HttpClient<ByteBuf, ByteBuf> client) {
7474
this.client = client;
7575
}
7676

77-
public HttpClientResponse<ByteBuf> sendRequest(HttpClientRequest<ByteBuf, ByteBuf> request) {
77+
public HttpClientResponse<ByteBuf> sendRequest(Observable<HttpClientResponse<ByteBuf>> request) {
7878
TestSubscriber<HttpClientResponse<ByteBuf>> subscriber = new TestSubscriber<>();
7979

8080
request.subscribe(subscriber);

0 commit comments

Comments
 (0)