Skip to content

Commit 9047b36

Browse files
author
Nitesh Kant
committed
ClientChannelFactoryImpl was not catching errors inside operationComplete()
This caused any errors not getting propagated to the connect() subscriber. Also added some debug for a flaky test.
1 parent 52a8dd8 commit 9047b36

File tree

3 files changed

+38
-24
lines changed

3 files changed

+38
-24
lines changed

rx-netty/src/main/java/io/reactivex/netty/client/ClientChannelFactoryImpl.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.client;
1718

1819
import io.netty.bootstrap.Bootstrap;
@@ -74,34 +75,38 @@ public void call() {
7475
connectFuture.addListener(new ChannelFutureListener() {
7576
@Override
7677
public void operationComplete(ChannelFuture future) throws Exception {
77-
if (!future.isSuccess()) {
78-
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_FAILED, Clock.onEndMillis(startTimeMillis),
79-
future.cause());
80-
subscriber.onError(future.cause());
81-
} else {
82-
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_SUCCESS, Clock.onEndMillis(startTimeMillis));
83-
ChannelPipeline pipeline = future.channel().pipeline();
84-
ChannelHandlerContext ctx = pipeline.lastContext(); // The connection uses the context for write which should always start from the tail.
85-
final ObservableConnection<I, O> newConnection = connectionFactory.newConnection(ctx);
86-
ChannelHandler lifecycleHandler = pipeline.get(RxRequiredConfigurator.CONN_LIFECYCLE_HANDLER_NAME);
87-
if (null == lifecycleHandler) {
88-
onNewConnection(newConnection, subscriber);
78+
try {
79+
if (!future.isSuccess()) {
80+
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_FAILED, Clock.onEndMillis(startTimeMillis),
81+
future.cause());
82+
subscriber.onError(future.cause());
8983
} else {
90-
@SuppressWarnings("unchecked")
91-
ConnectionLifecycleHandler<I, O> handler = (ConnectionLifecycleHandler<I, O>) lifecycleHandler;
92-
SslHandler sslHandler = pipeline.get(SslHandler.class);
93-
if (null == sslHandler) {
94-
handler.setConnection(newConnection);
84+
eventsSubject.onEvent(ClientMetricsEvent.CONNECT_SUCCESS, Clock.onEndMillis(startTimeMillis));
85+
ChannelPipeline pipeline = future.channel().pipeline();
86+
ChannelHandlerContext ctx = pipeline.lastContext(); // The connection uses the context for write which should always start from the tail.
87+
final ObservableConnection<I, O> newConnection = connectionFactory.newConnection(ctx);
88+
ChannelHandler lifecycleHandler = pipeline.get(RxRequiredConfigurator.CONN_LIFECYCLE_HANDLER_NAME);
89+
if (null == lifecycleHandler) {
9590
onNewConnection(newConnection, subscriber);
9691
} else {
97-
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<? super Channel>>() {
98-
@Override
99-
public void operationComplete(Future<? super Channel> future) throws Exception {
100-
onNewConnection(newConnection, subscriber);
101-
}
102-
});
92+
@SuppressWarnings("unchecked")
93+
ConnectionLifecycleHandler<I, O> handler = (ConnectionLifecycleHandler<I, O>) lifecycleHandler;
94+
SslHandler sslHandler = pipeline.get(SslHandler.class);
95+
if (null == sslHandler) {
96+
handler.setConnection(newConnection);
97+
onNewConnection(newConnection, subscriber);
98+
} else {
99+
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<? super Channel>>() {
100+
@Override
101+
public void operationComplete(Future<? super Channel> future) throws Exception {
102+
onNewConnection(newConnection, subscriber);
103+
}
104+
});
105+
}
103106
}
104107
}
108+
} catch (Throwable throwable) {
109+
subscriber.onError(throwable);
105110
}
106111
}
107112
});

rx-netty/src/main/java/io/reactivex/netty/server/AbstractServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.reactivex.netty.metrics.MetricEventsSubject;
2929
import io.reactivex.netty.pipeline.PipelineConfigurator;
3030
import io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133
import rx.Subscription;
3234

3335
import java.net.InetSocketAddress;
@@ -42,6 +44,8 @@
4244
public class AbstractServer<I, O, B extends AbstractBootstrap<B, C>, C extends Channel, S extends AbstractServer>
4345
implements MetricEventsPublisher<ServerMetricsEvent<?>> {
4446

47+
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
48+
4549
protected enum ServerState {Created, Starting, Started, Shutdown}
4650

4751
protected final UnpooledConnectionFactory<I,O> connectionFactory;
@@ -88,6 +92,8 @@ public S start() {
8892

8993
serverStateRef.set(ServerState.Started); // It will come here only if this was the thread that transitioned to Starting
9094

95+
logger.info("Rx server started at port: " + getServerPort());
96+
9197
return returnServer();
9298
}
9399

rx-netty/src/test/java/io/reactivex/netty/protocol/http/server/UnexpectedErrorsTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.protocol.http.server;
1718

1819
import io.netty.buffer.ByteBuf;
@@ -72,7 +73,7 @@ public void tearDown() throws Exception {
7273
public void testErrorHandlerReturnsNull() throws Exception {
7374
TestableErrorHandler errorHandler = new TestableErrorHandler(null);
7475
server.withErrorHandler(errorHandler).start();
75-
76+
System.err.println("[testErrorHandlerReturnsNull] Server port: " + server.getServerPort());
7677
blockTillConnected(server.getServerPort());
7778
channelCloseListener.waitForClose(1, TimeUnit.MINUTES);
7879

@@ -86,6 +87,8 @@ public void testConnectionHandlerReturnsError() throws Exception {
8687

8788
server.withErrorHandler(errorHandler).start();
8889

90+
System.err.println("[testConnectionHandlerReturnsError] Server port: " + server.getServerPort());
91+
8992
blockTillConnected(server.getServerPort());
9093

9194
channelCloseListener.waitForClose(1, TimeUnit.MINUTES);

0 commit comments

Comments
 (0)