Skip to content

Commit 895fd7b

Browse files
committed
Merge pull request #178 from NiteshKant/master
Fixes issue #177
2 parents 726f901 + 09aec45 commit 895fd7b

File tree

17 files changed

+209
-52
lines changed

17 files changed

+209
-52
lines changed

rx-netty-contexts/src/test/java/io/reactivex/netty/contexts/http/ContextPropagationTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.contexts.http;
1718

1819
import com.netflix.server.context.ContextSerializationException;
@@ -249,7 +250,7 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
249250
@Override
250251
public Observable<Void> call(HttpClientResponse<ByteBuf> response) {
251252
serverResponse.setStatus(response.getStatus());
252-
return Observable.empty();
253+
return serverResponse.close(true);
253254
}
254255
});
255256
}

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public HttpServer<ByteBuf, ByteBuf> createServer() {
4646
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
4747
printRequestHeader(request);
4848
response.writeString("Welcome!!");
49-
return response.close();
49+
return response.close(false);
5050
}
5151
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
5252

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public String call(String accumulator, String value) {
5959
@Override
6060
public Observable<Void> call(String clientMessage) {
6161
response.writeString(clientMessage.toUpperCase());
62-
return response.close();
62+
return response.close(false);
6363
}
6464
});
6565
}

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/ssl/SslHelloWorldServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public HttpServer<ByteBuf, ByteBuf> createServer() {
4343
@Override
4444
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
4545
response.writeStringAndFlush("Welcome!!");
46-
return response.close();
46+
return response.close(false);
4747
}
4848
}).withSslEngineFactory(DefaultFactories.SELF_SIGNED).build();
4949

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public String call(ByteBuf content) {
6060
@Override
6161
public Observable<Void> call(Integer counter) {
6262
response.writeString(counter.toString());
63-
return response.close();
63+
return response.close(false);
6464
}
6565
});
6666
}

rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@ public interface ChannelWriter<O> {
4747
Observable<Void> writeBytesAndFlush(byte[] msg);
4848

4949
Observable<Void> writeStringAndFlush(String msg);
50+
51+
Observable<Void> close();
52+
53+
Observable<Void> close(boolean flush);
5054
}

rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.reactivex.netty.metrics.Clock;
2525
import io.reactivex.netty.metrics.MetricEventsSubject;
26-
import io.reactivex.netty.protocol.http.MultipleFutureListener;
26+
import io.reactivex.netty.util.MultipleFutureListener;
2727
import rx.Observable;
2828
import rx.functions.Action0;
2929
import rx.functions.Action1;
@@ -147,7 +147,6 @@ public ChannelHandlerContext getChannelHandlerContext() {
147147
return ctx;
148148
}
149149

150-
@SuppressWarnings("unchecked")
151150
protected ChannelFuture writeOnChannel(Object msg) {
152151
ChannelFuture writeFuture = getChannel().write(msg); // Calling write on context will be wrong as the context will be of a component not necessarily, the tail of the pipeline.
153152
unflushedWritesListener.get().listen(writeFuture);
@@ -162,16 +161,21 @@ public boolean isCloseIssued() {
162161
return closeIssued.get();
163162
}
164163

165-
@SuppressWarnings("unchecked")
164+
@Override
166165
public Observable<Void> close() {
166+
return close(false);
167+
}
168+
169+
@Override
170+
public Observable<Void> close(boolean flush) {
167171
if (closeIssued.compareAndSet(false, true)) {
168-
return _close();
172+
return _close(flush);
169173
} else {
170174
return CONNECTION_ALREADY_CLOSED;
171175
}
172176
}
173177

174-
protected Observable<Void> _close() {
178+
protected Observable<Void> _close(boolean flush) {
175179
return Observable.empty();
176180
}
177181
}

rx-netty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.channel;
1718

1819
import io.netty.channel.ChannelFuture;
@@ -21,6 +22,7 @@
2122
import io.reactivex.netty.metrics.Clock;
2223
import io.reactivex.netty.metrics.MetricEventsSubject;
2324
import io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator;
25+
import io.reactivex.netty.util.NoOpSubscriber;
2426
import rx.Observable;
2527
import rx.Subscriber;
2628
import rx.subjects.PublishSubject;
@@ -67,24 +69,55 @@ public Observable<Void> close() {
6769
}
6870

6971
@Override
70-
protected Observable<Void> _close() {
71-
PublishSubject<I> thisSubject = inputSubject;
72-
cleanupConnection(); // Cleanup is required irrespective of close underlying connection (pooled connection)
73-
Observable<Void> toReturn = _closeChannel();
74-
thisSubject.onCompleted(); // This is just to make sure we make the subject as completed after we finish
75-
// closing the channel, results in more deterministic behavior for clients.
76-
return toReturn;
77-
}
78-
79-
protected void cleanupConnection() {
72+
protected Observable<Void> _close(boolean flush) {
73+
final PublishSubject<I> thisSubject = inputSubject;
8074
cancelPendingWrites(true);
8175
ReadTimeoutPipelineConfigurator.disableReadTimeout(getChannelHandlerContext().pipeline());
76+
if (flush) {
77+
Observable<Void> toReturn = flush().lift(new Observable.Operator<Void, Void>() {
78+
@Override
79+
public Subscriber<? super Void> call(final Subscriber<? super Void> child) {
80+
return new Subscriber<Void>() {
81+
@Override
82+
public void onCompleted() {
83+
_closeChannel().subscribe(child);
84+
thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager.
85+
// So this makes sure we send onCompleted() on subject after close is initialized.
86+
// This results in more deterministic behavior for clients.
87+
}
88+
89+
@Override
90+
public void onError(Throwable e) {
91+
child.onError(e);
92+
}
93+
94+
@Override
95+
public void onNext(Void aVoid) {
96+
// Insignificant
97+
}
98+
};
99+
}
100+
});
101+
toReturn.subscribe(new NoOpSubscriber<Void>()); // Since subscribing to returned Observable is not required
102+
// by the caller and we need to be subscribed to trigger the
103+
// close of channel (_closeChannel()), it is required to
104+
// subscribe to the returned Observable. We are not
105+
// interested in the result so NoOpSub is used.
106+
return toReturn;
107+
} else {
108+
Observable<Void> toReturn = _closeChannel();
109+
thisSubject.onCompleted(); // Even though closeChannel() returns an Observable, close itself is eager.
110+
// So this makes sure we send onCompleted() on subject after close is initialized.
111+
// This results in more deterministic behavior for clients.
112+
return toReturn;
113+
}
82114
}
83115

84116
@SuppressWarnings("unchecked")
85117
protected Observable<Void> _closeChannel() {
86118
closeStartTimeMillis = Clock.newStartTimeMillis();
87119
eventsSubject.onEvent(metricEventProvider.getChannelCloseStartEvent());
120+
88121
final ChannelFuture closeFuture = getChannelHandlerContext().close();
89122

90123
/**
@@ -126,4 +159,5 @@ public void operationComplete(ChannelFuture future) throws Exception {
126159
protected void updateInputSubject(PublishSubject<I> newSubject) {
127160
inputSubject = newSubject;
128161
}
162+
129163
}

rx-netty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import io.reactivex.netty.client.ConnectionReuseEvent;
3838
import io.reactivex.netty.metrics.Clock;
3939
import io.reactivex.netty.metrics.MetricEventsSubject;
40-
import io.reactivex.netty.protocol.http.MultipleFutureListener;
40+
import io.reactivex.netty.util.MultipleFutureListener;
4141
import rx.Observable;
4242
import rx.Subscriber;
4343
import rx.subjects.PublishSubject;

rx-netty/src/main/java/io/reactivex/netty/protocol/http/server/HttpConnectionHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void onNext(I i) {
133133
public void onCompleted() {
134134
eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HANDLING_SUCCESS,
135135
Clock.onEndMillis(startTimeMillis));
136-
response.close();
136+
response.close(false);
137137
}
138138

139139
@Override
@@ -143,7 +143,7 @@ public void onError(Throwable throwable) {
143143
if (!response.isHeaderWritten()) {
144144
responseGenerator.updateResponse(response, throwable);
145145
}
146-
response.close();
146+
response.close(false);
147147
}
148148

149149
@Override

0 commit comments

Comments
 (0)