Skip to content

Commit 284a039

Browse files
committed
Merge pull request #189 from NiteshKant/master
Fixes #187
2 parents 1f012bc + ca710a6 commit 284a039

File tree

8 files changed

+143
-46
lines changed

8 files changed

+143
-46
lines changed

rx-netty-examples/build.gradle

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616

1717

1818

19-
20-
21-
22-
2319
sourceCompatibility = JavaVersion.VERSION_1_6
2420
targetCompatibility = JavaVersion.VERSION_1_6
2521

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/helloworld/HelloWorldClient.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717
package io.reactivex.netty.examples.http.helloworld;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.netty.handler.codec.http.HttpResponseStatus;
2120
import io.reactivex.netty.RxNetty;
21+
import io.reactivex.netty.protocol.http.client.FlatResponseOperator;
2222
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
23-
import rx.Observable;
24-
import rx.functions.Action0;
23+
import io.reactivex.netty.protocol.http.client.ResponseHolder;
2524
import rx.functions.Func1;
26-
import rx.functions.Func2;
2725

2826
import java.nio.charset.Charset;
2927
import java.util.Map;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
3031

3132
import static io.reactivex.netty.examples.http.helloworld.HelloWorldServer.DEFAULT_PORT;
3233

@@ -41,29 +42,18 @@ public HelloWorldClient(int port) {
4142
this.port = port;
4243
}
4344

44-
public HttpResponseStatus sendHelloRequest() {
45-
HttpResponseStatus statusCode = RxNetty.createHttpGet("http://localhost:" + port + "/hello")
46-
.mergeMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
47-
@Override
48-
public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
49-
return response.getContent();
50-
}
51-
}, new Func2<HttpClientResponse<ByteBuf>, ByteBuf, HttpResponseStatus>() {
52-
@Override
53-
public HttpResponseStatus call(HttpClientResponse<ByteBuf> response, ByteBuf content) {
54-
printResponseHeader(response);
55-
System.out.println(content.toString(Charset.defaultCharset()));
56-
return response.getStatus();
57-
}
58-
})
59-
.doOnTerminate(new Action0() {
60-
@Override
61-
public void call() {
62-
System.out.println("=======================");
63-
}
64-
}).toBlocking().last();
65-
66-
return statusCode;
45+
public HttpClientResponse<ByteBuf> sendHelloRequest() throws InterruptedException, ExecutionException, TimeoutException {
46+
return RxNetty.createHttpGet("http://localhost:" + port + "/hello")
47+
.lift(FlatResponseOperator.<ByteBuf>flatResponse())
48+
.map(new Func1<ResponseHolder<ByteBuf>, HttpClientResponse<ByteBuf>>() {
49+
@Override
50+
public HttpClientResponse<ByteBuf> call(ResponseHolder<ByteBuf> holder) {
51+
printResponseHeader(holder.getResponse());
52+
System.out.println(holder.getContent().toString(Charset.defaultCharset()));
53+
System.out.println("========================");
54+
return holder.getResponse();
55+
}
56+
}).toBlocking().toFuture().get(1, TimeUnit.MINUTES);
6757
}
6858

6959
public void printResponseHeader(HttpClientResponse<ByteBuf> response) {
@@ -76,7 +66,7 @@ public void printResponseHeader(HttpClientResponse<ByteBuf> response) {
7666
}
7767
}
7868

79-
public static void main(String[] args) {
69+
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
8070
int port = DEFAULT_PORT;
8171
if (args.length > 0) {
8272
port = Integer.parseInt(args[0]);

rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/cpuintensive/CpuIntensiveServerTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.junit.Before;
2727
import org.junit.Test;
2828

29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.TimeoutException;
31+
2932
import static io.reactivex.netty.examples.http.cpuintensive.CPUIntensiveServer.DEFAULT_PORT;
3033

3134
/**
@@ -47,9 +50,9 @@ public void stopServer() throws InterruptedException {
4750
}
4851

4952
@Test
50-
public void testRequestReplySequence() {
53+
public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException {
5154
HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); // The client is no different than hello world.
52-
HttpResponseStatus statusCode = client.sendHelloRequest();
55+
HttpResponseStatus statusCode = client.sendHelloRequest().getStatus();
5356
Assert.assertEquals(HttpResponseStatus.OK, statusCode);
5457
}
5558
}

rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/helloworld/HelloWorldTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.junit.Before;
2626
import org.junit.Test;
2727

28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeoutException;
30+
2831
import static io.reactivex.netty.examples.http.helloworld.HelloWorldServer.DEFAULT_PORT;
2932

3033
/**
@@ -46,9 +49,9 @@ public void stopServer() throws InterruptedException {
4649
}
4750

4851
@Test
49-
public void testRequestReplySequence() {
52+
public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException {
5053
HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT);
51-
HttpResponseStatus statusCode = client.sendHelloRequest();
54+
HttpResponseStatus statusCode = client.sendHelloRequest().getStatus();
5255
Assert.assertEquals(HttpResponseStatus.OK, statusCode);
5356
}
5457
}

rx-netty-examples/src/test/java/io/reactivex/netty/examples/http/plaintext/PlainTextServerTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import org.junit.Before;
2727
import org.junit.Test;
2828

29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.TimeoutException;
31+
2932
import static io.reactivex.netty.examples.http.plaintext.PlainTextServer.DEFAULT_PORT;
3033

3134
/**
@@ -47,9 +50,9 @@ public void stopServer() throws InterruptedException {
4750
}
4851

4952
@Test
50-
public void testRequestReplySequence() {
53+
public void testRequestReplySequence() throws InterruptedException, ExecutionException, TimeoutException {
5154
HelloWorldClient client = new HelloWorldClient(DEFAULT_PORT); // The client is no different than hello world.
52-
HttpResponseStatus statusCode = client.sendHelloRequest();
55+
HttpResponseStatus statusCode = client.sendHelloRequest().getStatus();
5356
Assert.assertEquals(HttpResponseStatus.OK, statusCode);
5457
}
5558
}

rx-netty/build.gradle

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
apply plugin: 'osgi'
2-
apply plugin: 'groovy'
3-
4-
sourceCompatibility = JavaVersion.VERSION_1_6
5-
targetCompatibility = JavaVersion.VERSION_1_6
6-
7-
81
/*
92
* Copyright 2014 Netflix, Inc.
103
*
@@ -20,7 +13,13 @@ targetCompatibility = JavaVersion.VERSION_1_6
2013
* See the License for the specific language governing permissions and
2114
* limitations under the License.
2215
*/
23-
sourceSets.test.java.srcDir 'src/main/java'
16+
17+
18+
apply plugin: 'osgi'
19+
apply plugin: 'groovy'
20+
21+
sourceCompatibility = JavaVersion.VERSION_1_6
22+
targetCompatibility = JavaVersion.VERSION_1_6
2423

2524
tasks.withType(Javadoc).each {
2625
it.classpath = sourceSets.main.compileClasspath
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.netty.protocol.http.client;
18+
19+
import rx.Observable;
20+
import rx.Subscriber;
21+
import rx.functions.Func1;
22+
23+
/**
24+
* An operator to be used for a source of {@link HttpClientResponse} containing aggregated responses i.e. which does not
25+
* have multiple HTTP chunks. This operator simplifies the handling of such a responses by flattening the content
26+
* {@link Observable} into a single element producing a {@link ResponseHolder} object.
27+
* See <a href="https://github.com/Netflix/RxNetty/issues/187">related issue</a> for details.
28+
*
29+
* @author Nitesh Kant
30+
*/
31+
public class FlatResponseOperator<T>
32+
implements Observable.Operator<ResponseHolder<T>, HttpClientResponse<T>> {
33+
34+
public static <T> FlatResponseOperator<T> flatResponse() {
35+
return new FlatResponseOperator<T>();
36+
}
37+
38+
@Override
39+
public Subscriber<? super HttpClientResponse<T>> call(final Subscriber<? super ResponseHolder<T>> child) {
40+
return new Subscriber<HttpClientResponse<T>>() {
41+
@Override
42+
public void onCompleted() {
43+
// Content complete propagates to the child subscriber.
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
child.onError(e);
49+
}
50+
51+
@Override
52+
public void onNext(final HttpClientResponse<T> response) {
53+
response.getContent()
54+
.take(1)
55+
.map(new Func1<T, ResponseHolder<T>>() {
56+
@Override
57+
public ResponseHolder<T> call(T t) {
58+
return new ResponseHolder<T>(response, t);
59+
}
60+
}).subscribe(child);
61+
}
62+
};
63+
}
64+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.netty.protocol.http.client;
18+
19+
/**
20+
* @author Nitesh Kant
21+
*/
22+
public class ResponseHolder<T> {
23+
24+
private final HttpClientResponse<T> response;
25+
private final T content;
26+
27+
public ResponseHolder(HttpClientResponse<T> response, T content) {
28+
this.response = response;
29+
this.content = content;
30+
}
31+
32+
public HttpClientResponse<T> getResponse() {
33+
return response;
34+
}
35+
36+
public T getContent() {
37+
return content;
38+
}
39+
}

0 commit comments

Comments
 (0)