Skip to content

Commit ca6d0e1

Browse files
authored
Merge pull request #613 from FortnoxAB/patch-3
Fix sometimes empty response.
2 parents cfcf90b + 5d37d86 commit ca6d0e1

File tree

3 files changed

+132
-1
lines changed

3 files changed

+132
-1
lines changed

rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,9 @@ public void onNext(PooledConnection<R, W> conn) {
421421
onNextArrived = true;
422422
_terminated = terminated;
423423
_error = error;
424+
delegate.onNext(conn);
424425
}
425426

426-
delegate.onNext(conn);
427427

428428
if (_terminated) {
429429
if (null != error) {

rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,22 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.channel.embedded.EmbeddedChannel;
2121
import io.reactivex.netty.client.pool.PooledConnection;
22+
import org.junit.Assert;
2223
import org.junit.Rule;
24+
import org.junit.Test;
25+
import rx.Observable;
26+
import rx.functions.Action1;
27+
import rx.functions.Func0;
28+
import rx.functions.Func1;
29+
import rx.observers.AssertableSubscriber;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
2333

2434
import static org.hamcrest.MatcherAssert.*;
2535
import static org.hamcrest.Matchers.*;
36+
import static rx.Observable.fromCallable;
37+
import static rx.Observable.just;
2638

2739
/**
2840
* This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task
@@ -49,4 +61,77 @@ public void testReuse() throws Exception {
4961

5062
assertThat("Connection is not reused.", connection2, is(connection));
5163
}
64+
65+
@Test
66+
/**
67+
*
68+
* Load test to prove concurrency issues mainly seen on heavy load.
69+
*
70+
*/
71+
public void testLoad() {
72+
clientRule.startServer(1000);
73+
74+
MockTcpClientEventListener listener = new MockTcpClientEventListener();
75+
clientRule.getClient().subscribe(listener);
76+
77+
78+
int number_of_iterations = 300;
79+
int numberOfRequests = 10;
80+
81+
for(int j = 0; j < number_of_iterations; j++) {
82+
83+
List<Observable<String>> results = new ArrayList<>();
84+
85+
//Just giving the client some time to recover
86+
try {
87+
Thread.sleep(100);
88+
} catch (InterruptedException e) {
89+
e.printStackTrace();
90+
}
91+
92+
for (int i = 0; i < numberOfRequests; i++) {
93+
results.add(
94+
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() {
95+
@Override
96+
public PooledConnection<ByteBuf, ByteBuf> call() {
97+
return clientRule.connectWithCheck();
98+
}
99+
})
100+
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() {
101+
@Override
102+
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) {
103+
return connection.writeStringAndFlushOnEach(just("Hello"))
104+
.toCompletable()
105+
.<ByteBuf>toObservable()
106+
.concatWith(connection.getInput())
107+
.take(1)
108+
.single()
109+
.map(new Func1<ByteBuf, String>() {
110+
@Override
111+
public String call(ByteBuf byteBuf) {
112+
try {
113+
114+
byte[] bytes = new byte[byteBuf.readableBytes()];
115+
byteBuf.readBytes(bytes);
116+
String result = new String(bytes);
117+
return result;
118+
} finally {
119+
byteBuf.release();
120+
}
121+
}
122+
}).doOnError(new Action1<Throwable>() {
123+
@Override
124+
public void call(Throwable throwable) {
125+
Assert.fail("Did not expect exception: " + throwable.getMessage());
126+
throwable.printStackTrace();
127+
}
128+
});
129+
}
130+
}));
131+
}
132+
AssertableSubscriber<String> test = Observable.merge(results).test();
133+
test.awaitTerminalEvent();
134+
test.assertNoErrors();
135+
}
136+
}
52137
}

rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@
2727
import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory;
2828
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
2929
import io.reactivex.netty.protocol.tcp.server.TcpServer;
30+
import org.junit.Assert;
3031
import org.junit.rules.ExternalResource;
3132
import org.junit.runner.Description;
3233
import org.junit.runners.model.Statement;
3334
import rx.Observable;
35+
import rx.Observer;
36+
import rx.functions.Func0;
3437
import rx.observers.TestSubscriber;
3538

3639
import java.net.InetSocketAddress;
40+
import java.util.concurrent.atomic.AtomicBoolean;
3741

3842
import static org.hamcrest.MatcherAssert.*;
3943
import static org.hamcrest.Matchers.*;
@@ -83,6 +87,48 @@ public PooledConnection<ByteBuf, ByteBuf> connect() {
8387
return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
8488
}
8589

90+
public PooledConnection<ByteBuf, ByteBuf> connectWithCheck() {
91+
92+
final AtomicBoolean gotOnNext = new AtomicBoolean(false);
93+
94+
Observable<Connection<ByteBuf, ByteBuf>> got_no_connection = client.createConnectionRequest()
95+
.doOnEach(new Observer<Connection<ByteBuf, ByteBuf>>() {
96+
@Override
97+
public void onCompleted() {
98+
if(!gotOnNext.get()) {
99+
//A PooledConnection could sometimes send onCompleted before the onNext event occurred.
100+
Assert.fail("Should not get onCompletedBefore onNext");
101+
}
102+
}
103+
104+
@Override
105+
public void onError(Throwable e) {
106+
}
107+
108+
@Override
109+
public void onNext(Connection<ByteBuf, ByteBuf> byteBufByteBufConnection) {
110+
gotOnNext.set(true);
111+
}
112+
})
113+
.switchIfEmpty(Observable.defer(new Func0<Observable<PooledConnection<ByteBuf, ByteBuf>>>() {
114+
@Override
115+
public Observable<PooledConnection<ByteBuf, ByteBuf>> call() {
116+
return Observable.empty();
117+
}
118+
}));
119+
120+
TestSubscriber<Connection<ByteBuf, ByteBuf>> cSub = new TestSubscriber<>();
121+
got_no_connection.subscribe(cSub);
122+
123+
cSub.awaitTerminalEvent();
124+
125+
cSub.assertNoErrors();
126+
127+
assertThat("No connection received.", cSub.getOnNextEvents(), hasSize(1));
128+
129+
return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
130+
}
131+
86132
private void createClient(final int maxConnections) {
87133
InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort());
88134
client = TcpClient.newClient(SingleHostPoolingProviderFactory.<ByteBuf, ByteBuf>createBounded(maxConnections),

0 commit comments

Comments
 (0)