Skip to content

Commit 878d6ef

Browse files
authored
Merge pull request #614 from FortnoxAB/patch-5
Closing connection instead of just creating observable that is not subscribed to
2 parents ca6d0e1 + d67b6b0 commit 878d6ef

File tree

4 files changed

+117
-14
lines changed

4 files changed

+117
-14
lines changed

rxnetty-common/src/main/java/io/reactivex/netty/channel/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ protected void connectCloseToChannelClose() {
317317
.addListener(new ChannelFutureListener() {
318318
@Override
319319
public void operationComplete(ChannelFuture future) throws Exception {
320-
close(false); // Close this connection when the channel is closed.
320+
closeNow(); // Close this connection when the channel is closed.
321321
}
322322
});
323323
nettyChannel.attr(CONNECTION_ATTRIBUTE_KEY).set(this);

rxnetty-common/src/test/java/io/reactivex/netty/client/pool/PooledConnectionProviderImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void testMetricEventCallback() throws Throwable {
213213
assertThat("Unexpected release attempted count.", eventsListener.getReleaseAttemptedCount(), is(2L));
214214
assertThat("Unexpected release succeeded count.", eventsListener.getReleaseSucceededCount(), is(2L));
215215
assertThat("Unexpected release failed count.", eventsListener.getReleaseFailedCount(), is(0L));
216-
assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(1L));
216+
assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(2L));
217217
}
218218

219219
private PooledConnection<String, String> _testRelease() throws Exception {
@@ -379,4 +379,4 @@ public void call(Subscriber<? super Connection<String, String>> s) {
379379
public @interface MaxConnections {
380380
int value() default DEFAULT_MAX_CONNECTIONS;
381381
}
382-
}
382+
}

rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import org.junit.Rule;
2424
import org.junit.Test;
2525
import rx.Observable;
26+
import rx.functions.Func0;
27+
import rx.functions.Func1;
28+
import rx.observers.AssertableSubscriber;
29+
import org.junit.Test;
30+
import rx.Observable;
2631
import rx.functions.Action1;
2732
import rx.functions.Func0;
2833
import rx.functions.Func1;
@@ -31,6 +36,14 @@
3136
import java.util.ArrayList;
3237
import java.util.List;
3338

39+
import java.util.ArrayList;
40+
import java.util.List;
41+
42+
import static org.hamcrest.MatcherAssert.assertThat;
43+
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.is;
45+
import static rx.Observable.fromCallable;
46+
import static rx.Observable.just;
3447
import static org.hamcrest.MatcherAssert.*;
3548
import static org.hamcrest.Matchers.*;
3649
import static rx.Observable.fromCallable;
@@ -134,4 +147,83 @@ public void call(Throwable throwable) {
134147
test.assertNoErrors();
135148
}
136149
}
150+
151+
@Test
152+
/**
153+
*
154+
* Load test to prove concurrency issues mainly seen on heavy load.
155+
*
156+
*/
157+
public void assertPermitsAreReleasedWhenMergingObservablesWithExceptions() {
158+
clientRule.startServer(10, true);
159+
160+
MockTcpClientEventListener listener = new MockTcpClientEventListener();
161+
clientRule.getClient().subscribe(listener);
162+
163+
int number_of_iterations = 1;
164+
int numberOfRequests = 3;
165+
166+
makeRequests(number_of_iterations, numberOfRequests);
167+
168+
sleep(clientRule.getPoolConfig().getMaxIdleTimeMillis());
169+
170+
assertThat("Permits should be 10", clientRule.getPoolConfig().getPoolLimitDeterminationStrategy().getAvailablePermits(), equalTo(10));
171+
}
172+
173+
private void sleep(long i) {
174+
try {
175+
Thread.sleep(i);
176+
} catch (InterruptedException e) {
177+
e.printStackTrace();
178+
}
179+
}
180+
181+
private void makeRequests(int number_of_iterations, int numberOfRequests) {
182+
for (int j = 0; j < number_of_iterations; j++) {
183+
184+
//List<Observable<String>> results = new ArrayList<>();
185+
186+
sleep(100);
187+
188+
List<Observable<String>> results = new ArrayList<>();
189+
190+
//Just giving the client some time to recover
191+
sleep(100);
192+
193+
for (int i = 0; i < numberOfRequests; i++) {
194+
results.add(
195+
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() {
196+
@Override
197+
public PooledConnection<ByteBuf, ByteBuf> call() {
198+
return clientRule.connect();
199+
}
200+
})
201+
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() {
202+
@Override
203+
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) {
204+
return connection.writeStringAndFlushOnEach(just("Hello"))
205+
.toCompletable()
206+
.<ByteBuf>toObservable()
207+
.concatWith(connection.getInput())
208+
.take(1)
209+
.single()
210+
.map(new Func1<ByteBuf, String>() {
211+
@Override
212+
public String call(ByteBuf byteBuf) {
213+
try {
214+
byte[] bytes = new byte[byteBuf.readableBytes()];
215+
byteBuf.readBytes(bytes);
216+
return new String(bytes);
217+
} finally {
218+
byteBuf.release();
219+
}
220+
}
221+
});
222+
}
223+
}));
224+
}
225+
AssertableSubscriber<String> test = Observable.merge(results).test();
226+
test.awaitTerminalEvent();
227+
}
228+
}
137229
}

rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.reactivex.netty.channel.Connection;
21-
import io.reactivex.netty.client.ConnectionProvider;
22-
import io.reactivex.netty.client.ConnectionProviderFactory;
2321
import io.reactivex.netty.client.Host;
24-
import io.reactivex.netty.client.HostConnector;
22+
import io.reactivex.netty.client.pool.PoolConfig;
2523
import io.reactivex.netty.client.pool.PooledConnection;
26-
import io.reactivex.netty.client.pool.PooledConnectionProvider;
2724
import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory;
2825
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
2926
import io.reactivex.netty.protocol.tcp.server.TcpServer;
@@ -39,13 +36,14 @@
3936
import java.net.InetSocketAddress;
4037
import java.util.concurrent.atomic.AtomicBoolean;
4138

42-
import static org.hamcrest.MatcherAssert.*;
43-
import static org.hamcrest.Matchers.*;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
import static org.hamcrest.Matchers.hasSize;
4441

4542
public class TcpClientRule extends ExternalResource {
4643

47-
private TcpServer<ByteBuf, ByteBuf> server;
48-
private TcpClient<ByteBuf, ByteBuf> client;
44+
private TcpServer<ByteBuf, ByteBuf> server;
45+
private TcpClient<ByteBuf, ByteBuf> client;
46+
private PoolConfig<ByteBuf, ByteBuf> poolConfig;
4947

5048
@Override
5149
public Statement apply(final Statement base, Description description) {
@@ -59,10 +57,18 @@ public void evaluate() throws Throwable {
5957
}
6058

6159
public void startServer(int maxConnections) {
60+
startServer(maxConnections, false);
61+
}
62+
63+
public void startServer(int maxConnections, final boolean failing) {
6264
server.start(new ConnectionHandler<ByteBuf, ByteBuf>() {
6365
@Override
6466
public Observable<Void> handle(Connection<ByteBuf, ByteBuf> newConnection) {
65-
return newConnection.writeAndFlushOnEach(newConnection.getInput());
67+
if(failing) {
68+
throw new RuntimeException("exception");
69+
} else {
70+
return newConnection.writeAndFlushOnEach(newConnection.getInput());
71+
}
6672
}
6773
});
6874
createClient(maxConnections);
@@ -131,8 +137,9 @@ public Observable<PooledConnection<ByteBuf, ByteBuf>> call() {
131137

132138
private void createClient(final int maxConnections) {
133139
InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort());
134-
client = TcpClient.newClient(SingleHostPoolingProviderFactory.<ByteBuf, ByteBuf>createBounded(maxConnections),
135-
Observable.just(new Host(serverAddr)));
140+
poolConfig = new PoolConfig<ByteBuf, ByteBuf>().maxConnections(maxConnections);
141+
SingleHostPoolingProviderFactory<ByteBuf, ByteBuf> bounded = SingleHostPoolingProviderFactory.create(poolConfig);
142+
client = TcpClient.newClient(bounded, Observable.just(new Host(serverAddr)));
136143
}
137144

138145
public TcpServer<ByteBuf, ByteBuf> getServer() {
@@ -142,4 +149,8 @@ public TcpServer<ByteBuf, ByteBuf> getServer() {
142149
public TcpClient<ByteBuf, ByteBuf> getClient() {
143150
return client;
144151
}
152+
153+
public PoolConfig<ByteBuf, ByteBuf> getPoolConfig() {
154+
return poolConfig;
155+
}
145156
}

0 commit comments

Comments
 (0)