Skip to content

Commit 726f901

Browse files
committed
Merge pull request #176 from NiteshKant/master
Fixes issue #169
2 parents 699b516 + f79af1c commit 726f901

21 files changed

+258
-590
lines changed

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostClient.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.handler.codec.http.HttpMethod;
21+
import io.netty.handler.logging.LogLevel;
2122
import io.reactivex.netty.RxNetty;
23+
import io.reactivex.netty.channel.StringTransformer;
2224
import io.reactivex.netty.pipeline.PipelineConfigurator;
2325
import io.reactivex.netty.pipeline.PipelineConfigurators;
2426
import io.reactivex.netty.protocol.http.client.HttpClient;
2527
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
2628
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
27-
import io.reactivex.netty.protocol.http.client.RawContentSource;
28-
import io.reactivex.netty.serialization.StringTransformer;
2929
import rx.Observable;
3030
import rx.functions.Func1;
3131

@@ -50,10 +50,12 @@ public String postMessage() {
5050
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<String>> pipelineConfigurator
5151
= PipelineConfigurators.httpClientConfigurator();
5252

53-
HttpClient<String, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
53+
HttpClient<String, ByteBuf> client = RxNetty.<String, ByteBuf>newHttpClientBuilder("localhost", port)
54+
.pipelineConfigurator(pipelineConfigurator)
55+
.enableWireLogging(LogLevel.ERROR).build();
5456

5557
HttpClientRequest<String> request = HttpClientRequest.create(HttpMethod.POST, "test/post");
56-
request.withRawContentSource(new RawContentSource.SingletonRawSource<String>(MESSAGE, new StringTransformer()));
58+
request.withRawContentSource(Observable.just(MESSAGE), StringTransformer.DEFAULT_INSTANCE);
5759

5860
String result = client.submit(request).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<String>>() {
5961
@Override

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/post/SimplePostServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivex.netty.examples.http.post;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.handler.logging.LogLevel;
2021
import io.reactivex.netty.RxNetty;
2122
import io.reactivex.netty.protocol.http.server.HttpServer;
2223
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
@@ -41,7 +42,7 @@ public SimplePostServer(int port) {
4142
}
4243

4344
public HttpServer<ByteBuf, ByteBuf> createServer() {
44-
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
45+
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
4546
@Override
4647
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
4748
return request.getContent().map(new Func1<ByteBuf, String>() {
@@ -62,7 +63,7 @@ public Observable<Void> call(String clientMessage) {
6263
}
6364
});
6465
}
65-
});
66+
}).enableWireLogging(LogLevel.ERROR).build();
6667
System.out.println("Simple POST server started...");
6768
return server;
6869
}

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterClient.java

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,24 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.handler.codec.http.HttpMethod;
2121
import io.reactivex.netty.RxNetty;
22+
import io.reactivex.netty.channel.StringTransformer;
2223
import io.reactivex.netty.pipeline.PipelineConfigurator;
2324
import io.reactivex.netty.pipeline.PipelineConfigurators;
2425
import io.reactivex.netty.protocol.http.client.HttpClient;
2526
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
2627
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
27-
import io.reactivex.netty.protocol.http.client.RawContentSource;
28-
import io.reactivex.netty.serialization.ContentTransformer;
29-
import io.reactivex.netty.serialization.StringTransformer;
28+
import rx.Observable;
29+
import rx.Subscriber;
30+
import rx.functions.Action0;
3031
import rx.functions.Action1;
31-
32-
import java.io.*;
32+
import rx.subscriptions.Subscriptions;
33+
34+
import java.io.BufferedInputStream;
35+
import java.io.File;
36+
import java.io.FileInputStream;
37+
import java.io.IOException;
38+
import java.io.InputStreamReader;
39+
import java.io.LineNumberReader;
3340
import java.nio.charset.Charset;
3441

3542
import static io.reactivex.netty.examples.http.wordcounter.WordCounterServer.DEFAULT_PORT;
@@ -48,68 +55,52 @@ public WordCounterClient(int port, String textFile) {
4855
}
4956

5057
public int countWords() throws IOException {
51-
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<String>> pipelineConfigurator
58+
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
5259
= PipelineConfigurators.httpClientConfigurator();
5360

54-
HttpClient<String, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
55-
HttpClientRequest<String> request = HttpClientRequest.create(HttpMethod.POST, "test/post");
61+
HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", port, pipelineConfigurator);
62+
HttpClientRequest<ByteBuf> request = HttpClientRequest.create(HttpMethod.POST, "test/post");
5663

5764
FileContentSource fileContentSource = new FileContentSource(new File(textFile));
58-
request.withRawContentSource(fileContentSource);
65+
request.withRawContentSource(fileContentSource, StringTransformer.DEFAULT_INSTANCE);
5966

6067
WordCountAction wAction = new WordCountAction();
6168
client.submit(request).toBlocking().forEach(wAction);
6269

63-
fileContentSource.close();
64-
6570
return wAction.wordCount;
6671
}
6772

68-
static class FileContentSource implements RawContentSource<String> {
73+
static class FileContentSource extends Observable<String> {
6974

70-
private final LineNumberReader fStream;
71-
private boolean opened;
72-
private String nextLine;
73-
74-
FileContentSource(File file) throws IOException {
75-
fStream = new LineNumberReader(new InputStreamReader(new BufferedInputStream(new FileInputStream(file))));
76-
opened = true;
77-
}
75+
FileContentSource(final File file) {
76+
super(new OnSubscribe<String>() {
7877

79-
void close() {
80-
if (fStream != null) {
81-
try {
82-
fStream.close();
83-
} catch (IOException e) {
84-
// IGNORE
78+
@Override
79+
public void call(Subscriber<? super String> subscriber) {
80+
try {
81+
String nextLine;
82+
final LineNumberReader reader =
83+
new LineNumberReader(new InputStreamReader(new BufferedInputStream(new FileInputStream(file))));
84+
subscriber.add(Subscriptions.create(new Action0() {
85+
@Override
86+
public void call() {
87+
try {
88+
reader.close();
89+
} catch (IOException e) {
90+
e.printStackTrace();
91+
}
92+
}
93+
}));
94+
while ((nextLine = reader.readLine()) != null) {
95+
subscriber.onNext(nextLine);
96+
}
97+
98+
subscriber.onCompleted();
99+
} catch (Throwable throwable) {
100+
subscriber.onError(throwable);
101+
}
85102
}
86-
}
87-
}
88-
89-
@Override
90-
public boolean hasNext() {
91-
try {
92-
return opened && (nextLine != null || (nextLine = fStream.readLine()) != null);
93-
} catch (IOException e) {
94-
e.printStackTrace();
95-
opened = false;
96-
return false;
97-
}
98-
}
99-
100-
@Override
101-
public String next() {
102-
if (hasNext()) {
103-
String response = nextLine + ' ';
104-
nextLine = null;
105-
return response;
106-
}
107-
return null;
108-
}
109-
110-
@Override
111-
public ContentTransformer<String> getTransformer() {
112-
return new StringTransformer();
103+
});
113104
}
114105
}
115106

rx-netty-examples/src/main/java/io/reactivex/netty/examples/http/wordcounter/WordCounterServer.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,24 @@ public WordCounterServer(int port) {
4545
public HttpServer<ByteBuf, ByteBuf> createServer() {
4646
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
4747
@Override
48-
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
48+
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
49+
final HttpServerResponse<ByteBuf> response) {
4950
return request.getContent()
50-
.map(new Func1<ByteBuf, String>() {
51-
@Override
52-
public String call(ByteBuf content) {
53-
return content.toString(Charset.defaultCharset());
54-
}
55-
})
56-
.lift(new WordSplitOperator())
57-
.count()
58-
.flatMap(new Func1<Integer, Observable<Void>>() {
59-
@Override
60-
public Observable<Void> call(Integer counter) {
61-
response.writeString(counter.toString());
62-
return response.close();
63-
}
64-
});
51+
.map(new Func1<ByteBuf, String>() {
52+
@Override
53+
public String call(ByteBuf content) {
54+
return content.toString(Charset.defaultCharset());
55+
}
56+
})
57+
.lift(new WordSplitOperator())
58+
.count()
59+
.flatMap(new Func1<Integer, Observable<Void>>() {
60+
@Override
61+
public Observable<Void> call(Integer counter) {
62+
response.writeString(counter.toString());
63+
return response.close();
64+
}
65+
});
6566
}
6667
});
6768
System.out.println("Started word counter server...");

rx-netty/src/main/java/io/reactivex/netty/RxNetty.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.socket.nio.NioDatagramChannel;
2222
import io.netty.handler.logging.LogLevel;
2323
import io.reactivex.netty.channel.ConnectionHandler;
24+
import io.reactivex.netty.channel.ContentTransformer;
2425
import io.reactivex.netty.channel.RxEventLoopProvider;
2526
import io.reactivex.netty.channel.SingleNioLoopProvider;
2627
import io.reactivex.netty.client.ClientBuilder;
@@ -29,12 +30,10 @@
2930
import io.reactivex.netty.pipeline.PipelineConfigurator;
3031
import io.reactivex.netty.protocol.http.client.CompositeHttpClient;
3132
import io.reactivex.netty.protocol.http.client.CompositeHttpClientBuilder;
32-
import io.reactivex.netty.protocol.http.client.ContentSource;
3333
import io.reactivex.netty.protocol.http.client.HttpClient;
3434
import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
3535
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
3636
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
37-
import io.reactivex.netty.protocol.http.client.RawContentSource;
3837
import io.reactivex.netty.protocol.http.server.HttpServer;
3938
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
4039
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
@@ -202,20 +201,22 @@ public static Observable<HttpClientResponse<ByteBuf>> createHttpGet(String uri)
202201
return createHttpRequest(HttpClientRequest.createGet(uri));
203202
}
204203

205-
public static Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, ContentSource<ByteBuf> content) {
204+
public static Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, Observable<ByteBuf> content) {
206205
return createHttpRequest(HttpClientRequest.createPost(uri).withContentSource(content));
207206
}
208207

209-
public static Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, ContentSource<ByteBuf> content) {
208+
public static Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, Observable<ByteBuf> content) {
210209
return createHttpRequest(HttpClientRequest.createPut(uri).withContentSource(content));
211210
}
212211

213-
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, RawContentSource<T> content) {
214-
return createHttpRequest(HttpClientRequest.createPost(uri).withRawContentSource(content));
212+
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPost(String uri, Observable<T> content,
213+
ContentTransformer<T> transformer) {
214+
return createHttpRequest(HttpClientRequest.createPost(uri).withRawContentSource(content, transformer));
215215
}
216216

217-
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, RawContentSource<T> content) {
218-
return createHttpRequest(HttpClientRequest.createPut(uri).withRawContentSource(content));
217+
public static <T> Observable<HttpClientResponse<ByteBuf>> createHttpPut(String uri, Observable<T> content,
218+
ContentTransformer<T> transformer) {
219+
return createHttpRequest(HttpClientRequest.createPut(uri).withRawContentSource(content, transformer));
219220
}
220221

221222
public static Observable<HttpClientResponse<ByteBuf>> createHttpDelete(String uri) {

rx-netty/src/main/java/io/reactivex/netty/serialization/ByteTransformer.java renamed to rx-netty/src/main/java/io/reactivex/netty/channel/ByteTransformer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,23 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivex.netty.serialization;
16+
17+
package io.reactivex.netty.channel;
1718

1819
import io.netty.buffer.ByteBuf;
1920
import io.netty.buffer.ByteBufAllocator;
2021

2122
/**
23+
* A simple implementation of {@link ContentTransformer} to convert a byte array to {@link ByteBuf}
24+
*
2225
* @author Nitesh Kant
2326
*/
2427
public class ByteTransformer implements ContentTransformer<byte[]> {
2528

29+
public static final ByteTransformer DEFAULT_INSTANCE = new ByteTransformer();
30+
2631
@Override
27-
public ByteBuf transform(byte[] toTransform, ByteBufAllocator byteBufAllocator) {
28-
return byteBufAllocator.buffer(toTransform.length).writeBytes(toTransform);
32+
public ByteBuf call(byte[] toTransform, ByteBufAllocator allocator) {
33+
return allocator.buffer(toTransform.length).writeBytes(toTransform);
2934
}
3035
}

rx-netty/src/main/java/io/reactivex/netty/channel/ChannelWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.channel;
1718

1819
import io.netty.buffer.ByteBufAllocator;
19-
import io.reactivex.netty.serialization.ContentTransformer;
2020
import rx.Observable;
2121

2222
/**

rx-netty/src/main/java/io/reactivex/netty/serialization/ContentTransformer.java renamed to rx-netty/src/main/java/io/reactivex/netty/channel/ContentTransformer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivex.netty.serialization;
16+
17+
package io.reactivex.netty.channel;
1718

1819
import io.netty.buffer.ByteBuf;
1920
import io.netty.buffer.ByteBufAllocator;
21+
import rx.functions.Func2;
2022

2123
/**
24+
* A contract to transform a java object to {@link ByteBuf} to be used for writing the object on netty's channel.
25+
*
2226
* @author Nitesh Kant
2327
*/
24-
public interface ContentTransformer<T> {
28+
public interface ContentTransformer<S> extends Func2<S, ByteBufAllocator, ByteBuf> {
2529

26-
ByteBuf transform(T toTransform, ByteBufAllocator byteBufAllocator);
2730
}

rx-netty/src/main/java/io/reactivex/netty/channel/DefaultChannelWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
package io.reactivex.netty.channel;
1718

1819
import io.netty.buffer.ByteBuf;
@@ -23,9 +24,6 @@
2324
import io.reactivex.netty.metrics.Clock;
2425
import io.reactivex.netty.metrics.MetricEventsSubject;
2526
import io.reactivex.netty.protocol.http.MultipleFutureListener;
26-
import io.reactivex.netty.serialization.ByteTransformer;
27-
import io.reactivex.netty.serialization.ContentTransformer;
28-
import io.reactivex.netty.serialization.StringTransformer;
2927
import rx.Observable;
3028
import rx.functions.Action0;
3129
import rx.functions.Action1;
@@ -79,7 +77,7 @@ public void write(O msg) {
7977

8078
@Override
8179
public <R> void write(R msg, ContentTransformer<R> transformer) {
82-
ByteBuf contentBytes = transformer.transform(msg, getAllocator());
80+
ByteBuf contentBytes = transformer.call(msg, getAllocator());
8381
writeOnChannel(contentBytes);
8482
}
8583

0 commit comments

Comments
 (0)