Skip to content

Commit 136af0d

Browse files
committed
Merge pull request #227 from NiteshKant/0.x
Fixes #223 #225 #226 #214 #228 #230
2 parents 4744a2d + 24be8a7 commit 136af0d

35 files changed

+563
-158
lines changed

rx-netty-contexts/src/main/java/io/reactivex/netty/contexts/http/HttpContextClientChannelFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void onError(Throwable e) {
100100
@Override
101101
public void onNext(ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>> connection) {
102102
if (null != requestId && null != container) {
103-
connection.getChannelHandlerContext().pipeline()
103+
connection.getChannel().pipeline()
104104
.fireUserEventTriggered(new NewContextEvent(requestId, container));
105105
}
106106
original.onNext(connection);

rx-netty-contexts/src/test/java/io/reactivex/netty/contexts/http/ContextPropagationTest.java

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import io.reactivex.netty.contexts.RxContexts;
2929
import io.reactivex.netty.contexts.TestContext;
3030
import io.reactivex.netty.contexts.TestContextSerializer;
31+
import io.reactivex.netty.protocol.http.client.FlatResponseOperator;
3132
import io.reactivex.netty.protocol.http.client.HttpClient;
3233
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
3334
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
35+
import io.reactivex.netty.protocol.http.client.ResponseHolder;
3436
import io.reactivex.netty.protocol.http.server.HttpServer;
3537
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
3638
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
@@ -41,18 +43,15 @@
4143
import org.junit.Before;
4244
import org.junit.Test;
4345
import rx.Observable;
44-
import rx.functions.Action0;
45-
import rx.functions.Action1;
4646
import rx.functions.Func1;
4747

48-
import java.util.ArrayList;
49-
import java.util.List;
5048
import java.util.concurrent.Callable;
51-
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.ExecutionException;
5250
import java.util.concurrent.ExecutorService;
5351
import java.util.concurrent.Executors;
5452
import java.util.concurrent.Future;
5553
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.TimeoutException;
5655

5756
import static io.reactivex.netty.contexts.ThreadLocalRequestCorrelator.getCurrentContextContainer;
5857
import static io.reactivex.netty.contexts.ThreadLocalRequestCorrelator.getCurrentRequestId;
@@ -103,7 +102,7 @@ public String getContextValue(String key) {
103102
return Observable.error(e);
104103
}
105104
}
106-
}).enableWireLogging(LogLevel.DEBUG).build();
105+
}).enableWireLogging(LogLevel.ERROR).build();
107106
mockServer.start();
108107
}
109108

@@ -121,10 +120,10 @@ public void testEndToEnd() throws Exception {
121120
public Observable<HttpClientResponse<ByteBuf>> call(HttpClient<ByteBuf, ByteBuf> client) {
122121
return client.submit(HttpClientRequest.createGet("/"));
123122
}
124-
}).enableWireLogging(LogLevel.ERROR).build().start();
123+
}).enableWireLogging(LogLevel.DEBUG).build().start();
125124

126125
HttpClient<ByteBuf, ByteBuf> testClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
127-
.enableWireLogging(LogLevel.DEBUG).build();
126+
.enableWireLogging(LogLevel.ERROR).build();
128127

129128
String reqId = "testE2E";
130129
sendTestRequest(testClient, reqId);
@@ -184,7 +183,8 @@ public void testWithPooledConnections() throws Exception {
184183
RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", mockServer.getServerPort(),
185184
REQUEST_ID_HEADER_NAME,
186185
RxContexts.DEFAULT_CORRELATOR)
187-
.withMaxConnections(1).withIdleConnectionsTimeoutMillis(100000).build();
186+
.withMaxConnections(1).enableWireLogging(LogLevel.ERROR)
187+
.withIdleConnectionsTimeoutMillis(100000).build();
188188
ContextsContainer container = new ContextsContainerImpl(new MapBackedKeySupplier());
189189
container.addContext(CTX_1_NAME, CTX_1_VAL);
190190
container.addContext(CTX_2_NAME, CTX_2_VAL, new TestContextSerializer());
@@ -203,7 +203,8 @@ public void testNoStateLeakOnThreadReuse() throws Exception {
203203
RxContexts.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", mockServer.getServerPort(),
204204
REQUEST_ID_HEADER_NAME,
205205
RxContexts.DEFAULT_CORRELATOR)
206-
.withMaxConnections(1).withIdleConnectionsTimeoutMillis(100000).build();
206+
.withMaxConnections(1).enableWireLogging(LogLevel.ERROR)
207+
.withIdleConnectionsTimeoutMillis(100000).build();
207208

208209
ContextsContainer container = new ContextsContainerImpl(new MapBackedKeySupplier());
209210
container.addContext(CTX_1_NAME, CTX_1_VAL);
@@ -259,7 +260,7 @@ public Observable<Void> call(HttpClientResponse<ByteBuf> response) {
259260

260261
private static void invokeMockServer(HttpClient<ByteBuf, ByteBuf> testClient, final String requestId,
261262
boolean finishServerProcessing)
262-
throws MockBackendRequestFailedException, InterruptedException {
263+
throws MockBackendRequestFailedException, InterruptedException, TimeoutException, ExecutionException {
263264
try {
264265
sendTestRequest(testClient, requestId);
265266
} finally {
@@ -277,40 +278,24 @@ private static void invokeMockServer(HttpClient<ByteBuf, ByteBuf> testClient, fi
277278
}
278279

279280
private static void sendTestRequest(HttpClient<ByteBuf, ByteBuf> testClient, final String requestId)
280-
throws MockBackendRequestFailedException, InterruptedException {
281+
throws MockBackendRequestFailedException, InterruptedException, TimeoutException, ExecutionException {
281282
System.err.println("Sending test request to mock server, with request id: " + requestId);
282283
RxContexts.DEFAULT_CORRELATOR.dumpThreadState(System.err);
283-
final CountDownLatch finishLatch = new CountDownLatch(1);
284-
final List<HttpClientResponse<ByteBuf>> responseHolder = new ArrayList<HttpClientResponse<ByteBuf>>();
285-
testClient.submit(HttpClientRequest.createGet("").withHeader(REQUEST_ID_HEADER_NAME, requestId))
286-
.finallyDo(new Action0() {
287-
@Override
288-
public void call() {
289-
finishLatch.countDown();
290-
}
291-
})
292-
.subscribe(new Action1<HttpClientResponse<ByteBuf>>() {
293-
@Override
294-
public void call(HttpClientResponse<ByteBuf> response) {
295-
responseHolder.add(response);
296-
}
297-
});
298-
299-
finishLatch.await(1, TimeUnit.MINUTES);
300-
if (responseHolder.isEmpty()) {
301-
throw new AssertionError("Response not received.");
302-
}
303284

304-
System.err.println("Received response from mock server, with request id: " + requestId
305-
+ ", status: " + responseHolder.get(0).getStatus());
285+
ResponseHolder<ByteBuf> responseHolder =
286+
testClient.submit(HttpClientRequest.createGet("").withHeader(REQUEST_ID_HEADER_NAME, requestId))
287+
.lift(FlatResponseOperator.<ByteBuf>flatResponse())
288+
.toBlocking().toFuture().get(1, TimeUnit.MINUTES);
306289

307-
HttpClientResponse<ByteBuf> response = responseHolder.get(0);
290+
System.err.println("Received response from mock server, with request id: " + requestId
291+
+ ", status: " + responseHolder.getResponse().getStatus());
308292

309-
if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
310-
throw new MockBackendRequestFailedException("Test request failed. Status: " + response.getStatus().code());
293+
if (responseHolder.getResponse().getStatus().code() != HttpResponseStatus.OK.code()) {
294+
throw new MockBackendRequestFailedException("Test request failed. Status: "
295+
+ responseHolder.getResponse().getStatus().code());
311296
}
312297

313-
String requestIdGot = response.getHeaders().get(REQUEST_ID_HEADER_NAME);
298+
String requestIdGot = responseHolder.getResponse().getHeaders().get(REQUEST_ID_HEADER_NAME);
314299

315300
if (!requestId.equals(requestId)) {
316301
throw new MockBackendRequestFailedException("Request Id not sent from mock server. Expected: "

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/cpuintensive/CPUIntensiveServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
4949
final HttpServerResponse<ByteBuf> response) {
5050
printRequestHeader(request);
5151
response.getHeaders().set(IN_EVENT_LOOP_HEADER_NAME,
52-
response.getChannelHandlerContext().channel().eventLoop()
52+
response.getChannel().eventLoop()
5353
.inEventLoop());
5454
response.writeString("Welcome!!");
5555
return response.close(false);

rx-netty-examples/src/main/java/io/reactivex/netty/examples/udp/HelloUdpClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivex.netty.examples.udp;
1818

1919
import io.netty.channel.socket.DatagramPacket;
20+
import io.netty.handler.logging.LogLevel;
2021
import io.reactivex.netty.RxNetty;
2122
import io.reactivex.netty.channel.ObservableConnection;
2223
import rx.Observable;
@@ -38,15 +39,17 @@ public HelloUdpClient(int port) {
3839
}
3940

4041
public String sendHello() {
41-
String content = RxNetty.createUdpClient("localhost", port).connect()
42+
String content = RxNetty.<DatagramPacket, DatagramPacket>newUdpClientBuilder("localhost", port)
43+
.enableWireLogging(LogLevel.ERROR).build().connect()
4244
.flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>,
4345
Observable<DatagramPacket>>() {
4446
@Override
4547
public Observable<DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) {
4648
connection.writeStringAndFlush("Is there anybody out there?");
4749
return connection.getInput();
4850
}
49-
}).take(1)
51+
})
52+
.take(1)
5053
.map(new Func1<DatagramPacket, String>() {
5154
@Override
5255
public String call(DatagramPacket datagramPacket) {

rx-netty-examples/src/main/java/io/reactivex/netty/examples/udp/HelloUdpServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public Observable<Void> call(DatagramPacket received) {
5252
InetSocketAddress sender = received.sender();
5353
System.out.println("Received datagram. Sender: " + sender + ", data: "
5454
+ received.content().toString(Charset.defaultCharset()));
55-
ByteBuf data = newConnection.getChannelHandlerContext().alloc().buffer(WELCOME_MSG_BYTES.length);
55+
ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
5656
data.writeBytes(WELCOME_MSG_BYTES);
5757
return newConnection.writeAndFlush(new DatagramPacket(data, sender));
5858
}

rx-netty-remote-observable/src/main/java/io/reactivex/netty/ingress/InetAddressWhiteListIngressPolicy.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
import io.reactivex.netty.RemoteRxEvent;
1919
import io.reactivex.netty.channel.ObservableConnection;
20+
import rx.Observable;
21+
import rx.functions.Action1;
2022

2123
import java.net.InetSocketAddress;
2224
import java.util.Set;
2325
import java.util.concurrent.atomic.AtomicReference;
2426

25-
import rx.Observable;
26-
import rx.functions.Action1;
27-
2827
public class InetAddressWhiteListIngressPolicy implements IngressPolicy{
2928

3029
private AtomicReference<Set<String>> whiteList = new AtomicReference<Set<String>>();
@@ -42,7 +41,7 @@ public void call(Set<String> newList) {
4241
public boolean allowed(
4342
ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
4443
InetSocketAddress inetSocketAddress
45-
= (InetSocketAddress) connection.getChannelHandlerContext().channel().remoteAddress();
44+
= (InetSocketAddress) connection.getChannel().remoteAddress();
4645
return whiteList.get().contains(inetSocketAddress.getAddress().getHostAddress());
4746
}
4847

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivex.netty.channel;
17+
18+
import rx.Observer;
19+
20+
/**
21+
* @author Nitesh Kant
22+
*/
23+
@SuppressWarnings("rawtypes")
24+
public abstract class AbstractConnectionEvent<T extends ObservableConnection> {
25+
26+
@SuppressWarnings("rawtypes") protected final T observableConnection;
27+
@SuppressWarnings("rawtypes") protected final Observer connectedObserver;
28+
29+
protected AbstractConnectionEvent(@SuppressWarnings("rawtypes") Observer connectedObserver,
30+
final T observableConnection) {
31+
this.connectedObserver = connectedObserver;
32+
this.observableConnection = observableConnection;
33+
}
34+
35+
public @SuppressWarnings("rawtypes")
36+
Observer getConnectedObserver() {
37+
return connectedObserver;
38+
}
39+
40+
public @SuppressWarnings("rawtypes") T getConnection() {
41+
return observableConnection;
42+
}
43+
}

rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ public class DefaultChannelWriter<O> implements ChannelWriter<O> {
3939
protected static final Observable<Void> CONNECTION_ALREADY_CLOSED =
4040
Observable.error(new IllegalStateException("Connection is already closed."));
4141
protected final AtomicBoolean closeIssued = new AtomicBoolean();
42+
4243
private final ChannelHandlerContext ctx;
44+
private final Channel nettyChannel;
45+
4346
/**
4447
* A listener for all pending writes before a flush.
4548
*/
@@ -55,6 +58,7 @@ protected DefaultChannelWriter(ChannelHandlerContext ctx, MetricEventsSubject<?>
5558
throw new NullPointerException("Channel context can not be null.");
5659
}
5760
this.ctx = ctx;
61+
nettyChannel = ctx.channel();
5862
unflushedWritesListener = new AtomicReference<MultipleFutureListener>(new MultipleFutureListener(ctx.newPromise()));
5963
}
6064

@@ -108,14 +112,14 @@ public Observable<Void> writeStringAndFlush(String msg) {
108112
public Observable<Void> flush() {
109113
final long startTimeMillis = Clock.newStartTimeMillis();
110114
eventsSubject.onEvent(metricEventProvider.getFlushStartEvent());
111-
MultipleFutureListener existingListener = unflushedWritesListener.getAndSet(new MultipleFutureListener(
112-
ctx.newPromise()));
115+
MultipleFutureListener existingListener =
116+
unflushedWritesListener.getAndSet(new MultipleFutureListener(nettyChannel.newPromise()));
113117
/**
114118
* Do flush() after getting the last listener so that we do not wait for a write which is not flushed.
115119
* If we do it before getting the existingListener then the write that happens after the flush() from the user
116120
* will be contained in the retrieved listener and hence we will wait till the next flush() finish.
117121
*/
118-
ctx.flush();
122+
nettyChannel.flush();
119123
return existingListener.asObservable()
120124
.doOnCompleted(new Action0() {
121125
@Override
@@ -140,9 +144,15 @@ public void cancelPendingWrites(boolean mayInterruptIfRunning) {
140144

141145
@Override
142146
public ByteBufAllocator getAllocator() {
143-
return ctx.alloc();
147+
return nettyChannel.alloc();
144148
}
145149

150+
/**
151+
* @deprecated It is misleading to provide {@link ChannelHandlerContext} instance as it is unclear which handler
152+
* this context belongs to. So, instead one should use {@link #getChannel()} and all actions possible from the
153+
* {@link ChannelHandlerContext} are possible via the {@link Channel}.
154+
*/
155+
@Deprecated
146156
public ChannelHandlerContext getChannelHandlerContext() {
147157
return ctx;
148158
}
@@ -153,8 +163,8 @@ protected ChannelFuture writeOnChannel(Object msg) {
153163
return writeFuture;
154164
}
155165

156-
protected Channel getChannel() {
157-
return ctx.channel();
166+
public Channel getChannel() {
167+
return nettyChannel;
158168
}
159169

160170
public boolean isCloseIssued() {

rx-netty/src/main/java/io/reactivex/netty/channel/NewRxConnectionEvent.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@
2222
*
2323
* @author Nitesh Kant
2424
*/
25-
public class NewRxConnectionEvent {
25+
@SuppressWarnings("rawtypes")
26+
public class NewRxConnectionEvent extends AbstractConnectionEvent<ObservableConnection> {
2627

27-
@SuppressWarnings("rawtypes") private final Observer connectedObserver;
28-
29-
public NewRxConnectionEvent(@SuppressWarnings("rawtypes") Observer connectedObserver) {
30-
this.connectedObserver = connectedObserver;
28+
public NewRxConnectionEvent(final ObservableConnection<?, ?> observableConnection,
29+
final Observer connectedObserver) {
30+
super(connectedObserver, observableConnection);
3131
}
3232

33-
public @SuppressWarnings("rawtypes") Observer getConnectedObserver() {
34-
return connectedObserver;
35-
}
3633
}

0 commit comments

Comments
 (0)