Skip to content

Commit cfcf90b

Browse files
authored
Merge pull request #606 from FortnoxAB/patch-1
Fix writes out of order
2 parents b5a747b + 86ce35b commit cfcf90b

File tree

2 files changed

+223
-3
lines changed

2 files changed

+223
-3
lines changed

rxnetty-common/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ private void recalculateMaxPerSubscriber(int oldSubCount, int newSubCount) {
482482
private boolean isDone; /*Guarded by guard*/
483483
private Scheduler.Worker writeWorker; /*Guarded by guard*/
484484
private boolean atleastOneWriteEnqueued; /*Guarded by guard*/
485+
private int enqueued; /*Guarded by guard*/
485486

486487
private boolean isPromiseCompletedOnWriteComplete; /*Guarded by guard. Only transition should be false->true*/
487488

@@ -533,7 +534,11 @@ public void onNext(Object nextItem) {
533534
}
534535
}
535536

536-
enqueue = null != writeWorker && inEL;
537+
enqueue = null != writeWorker && (inEL || enqueued > 0);
538+
539+
if (enqueue) {
540+
enqueued++;
541+
}
537542
}
538543

539544
final ChannelFuture channelFuture = enqueue ? enqueueWrite(nextItem) : ctx.write(nextItem);
@@ -611,6 +616,9 @@ private ChannelFuture enqueueWrite(final Object nextItem) {
611616
@Override
612617
public void call() {
613618
ctx.write(nextItem, toReturn);
619+
synchronized (guard) {
620+
enqueued--;
621+
}
614622
}
615623
});
616624
return toReturn;

rxnetty-common/src/test/java/io/reactivex/netty/channel/BytesWriteInterceptorTest.java

Lines changed: 214 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
*/
1717
package io.reactivex.netty.channel;
1818

19-
import io.netty.channel.WriteBufferWaterMark;
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.*;
2021
import io.netty.channel.embedded.EmbeddedChannel;
22+
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
23+
import io.netty.util.concurrent.Future;
24+
import io.netty.util.internal.ObjectUtil;
2125
import io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor;
2226
import io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber;
2327
import io.reactivex.netty.test.util.MockProducer;
@@ -26,11 +30,21 @@
2630
import org.junit.rules.ExternalResource;
2731
import org.junit.runner.Description;
2832
import org.junit.runners.model.Statement;
33+
import rx.Scheduler;
34+
import rx.functions.Action0;
35+
import rx.schedulers.Schedulers;
36+
37+
import java.nio.charset.Charset;
38+
import java.util.ArrayDeque;
39+
import java.util.Queue;
40+
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.TimeUnit;
2942

3043
import static io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor.MAX_PER_SUBSCRIBER_REQUEST;
3144
import static io.reactivex.netty.channel.BytesWriteInterceptorTest.InspectorRule.defaultRequestN;
3245
import static org.hamcrest.MatcherAssert.*;
3346
import static org.hamcrest.Matchers.*;
47+
import static rx.Observable.just;
3448

3549
public class BytesWriteInterceptorTest {
3650

@@ -113,6 +127,35 @@ public void testOneLongWriteAndManySmallWrites() throws Exception {
113127
assertThat("Unexpected items requested.", producer1.getRequested(), is(97L));
114128
}
115129

130+
@Test(timeout = 100000)
131+
public void testWritesInOrderFromDifferentThreads() throws Exception {
132+
final WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
133+
134+
// Set the current thread to be the thread of the event loop
135+
inspectorRule.setEventLoopThread();
136+
137+
// Send 1000 messages from two different threads
138+
int msgCount = 1000;
139+
Scheduler.Worker worker = Schedulers.computation().createWorker();
140+
for (int i = 1; i < msgCount; i+=2) {
141+
sub1.onNext(String.valueOf(i));
142+
143+
// Send from other thread
144+
inspectorRule.sendFromOtherThread(sub1, worker, String.valueOf(i+1));
145+
}
146+
147+
// In lack of a way of running all pending tasks on computation scheduler
148+
Thread.sleep(500);
149+
150+
// Ensure messages are in order
151+
Queue<Object> written = inspectorRule.getWrittenMessages();
152+
for (int i = 1; i <= msgCount; i++) {
153+
Object msg = written.poll();
154+
String strMsg = ((ByteBuf) msg).toString(Charset.defaultCharset());
155+
assertThat("Not in order ", strMsg, is(String.valueOf(i)));
156+
}
157+
}
158+
116159
@Test(timeout = 10000)
117160
public void testBatchedSubscriberRemoves() throws Exception {
118161
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
@@ -153,7 +196,7 @@ public Statement apply(final Statement base, Description description) {
153196
@Override
154197
public void evaluate() throws Throwable {
155198
interceptor = new BytesWriteInterceptor("foo");
156-
channel = new EmbeddedChannel(new WriteTransformer(), interceptor);
199+
channel = new TestEmbeddedChannel(new WriteTransformer(), interceptor);
157200
base.evaluate();
158201
}
159202
};
@@ -207,5 +250,174 @@ public void setupNewSubscriberAndComplete(int expectedSubCount, boolean runPendi
207250
channel.runPendingTasks();
208251
}
209252
}
253+
254+
public Queue<Object> getWrittenMessages() {
255+
channel.runPendingTasks();
256+
channel.flush();
257+
return channel.outboundMessages();
258+
}
259+
260+
public void setEventLoopThread() {
261+
ChannelPromise deregisterPromise = channel.newPromise();
262+
channel.deregister(deregisterPromise);
263+
channel.runPendingTasks();
264+
assertThat("failed to deregister", deregisterPromise.isDone() && deregisterPromise.isSuccess());
265+
266+
ThreadAwareEmbeddedEventLoop loop = new ThreadAwareEmbeddedEventLoop(Thread.currentThread());
267+
ChannelFuture registerPromise = loop.register(channel);
268+
assertThat("failed to register", registerPromise.isDone() && registerPromise.isSuccess());
269+
}
270+
271+
private void sendFromOtherThread(final WriteStreamSubscriber subscriber, Scheduler.Worker worker, final Object msg) throws InterruptedException {
272+
final CountDownLatch countDown = new CountDownLatch(1);
273+
worker.schedule(new Action0() {
274+
@Override
275+
public void call() {
276+
subscriber.onNext(msg);
277+
countDown.countDown();
278+
}
279+
});
280+
countDown.await();
281+
}
282+
}
283+
284+
/**
285+
* A custom EmbeddedChannel allowing a special EventLoop, so that we can simulate calls not coming from the event loop.
286+
*/
287+
private static class TestEmbeddedChannel extends EmbeddedChannel {
288+
289+
public TestEmbeddedChannel(WriteTransformer writeTransformer, BytesWriteInterceptor interceptor) {
290+
super(writeTransformer, interceptor);
291+
}
292+
293+
@Override
294+
protected boolean isCompatible(EventLoop loop) {
295+
return loop instanceof ThreadAwareEmbeddedEventLoop || super.isCompatible(loop);
296+
}
297+
298+
@Override
299+
public void runPendingTasks() {
300+
if (super.eventLoop() instanceof ThreadAwareEmbeddedEventLoop) {
301+
ThreadAwareEmbeddedEventLoop loop = (ThreadAwareEmbeddedEventLoop) super.eventLoop();
302+
loop.runTasks();
303+
} else {
304+
super.runPendingTasks();
305+
}
306+
}
210307
}
308+
309+
/**
310+
* Need an embedded event loop that considers a single thread to be "on the loop" in order to have writes from
311+
* outside the event loop.
312+
* Due to final modifier of EmbeddedEventLoop there was some copying needed.
313+
*/
314+
private static class ThreadAwareEmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
315+
316+
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
317+
private final Thread loopThread;
318+
319+
public ThreadAwareEmbeddedEventLoop(Thread loopThread) {
320+
this.loopThread = loopThread;
321+
}
322+
323+
@Override
324+
public EventLoopGroup parent() {
325+
return (EventLoopGroup) super.parent();
326+
}
327+
328+
@Override
329+
public EventLoop next() {
330+
return (EventLoop) super.next();
331+
}
332+
333+
@Override
334+
public void execute(Runnable command) {
335+
if (command == null) {
336+
throw new NullPointerException("command");
337+
}
338+
tasks.add(command);
339+
}
340+
341+
void runTasks() {
342+
for (;;) {
343+
Runnable task = tasks.poll();
344+
if (task == null) {
345+
break;
346+
}
347+
348+
task.run();
349+
}
350+
}
351+
352+
@Override
353+
protected void cancelScheduledTasks() {
354+
super.cancelScheduledTasks();
355+
}
356+
357+
@Override
358+
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
359+
throw new UnsupportedOperationException();
360+
}
361+
362+
@Override
363+
public Future<?> terminationFuture() {
364+
throw new UnsupportedOperationException();
365+
}
366+
367+
@Override
368+
@Deprecated
369+
public void shutdown() {
370+
throw new UnsupportedOperationException();
371+
}
372+
373+
@Override
374+
public boolean isShuttingDown() {
375+
return false;
376+
}
377+
378+
@Override
379+
public boolean isShutdown() {
380+
return false;
381+
}
382+
383+
@Override
384+
public boolean isTerminated() {
385+
return false;
386+
}
387+
388+
@Override
389+
public boolean awaitTermination(long timeout, TimeUnit unit) {
390+
return false;
391+
}
392+
393+
@Override
394+
public ChannelFuture register(Channel channel) {
395+
return register(new DefaultChannelPromise(channel, this));
396+
}
397+
398+
@Override
399+
public ChannelFuture register(ChannelPromise promise) {
400+
ObjectUtil.checkNotNull(promise, "promise");
401+
promise.channel().unsafe().register(this, promise);
402+
return promise;
403+
}
404+
405+
@Deprecated
406+
@Override
407+
public ChannelFuture register(Channel channel, ChannelPromise promise) {
408+
channel.unsafe().register(this, promise);
409+
return promise;
410+
}
411+
412+
@Override
413+
public boolean inEventLoop() {
414+
return Thread.currentThread() == loopThread;
415+
}
416+
417+
@Override
418+
public boolean inEventLoop(Thread thread) {
419+
return thread == loopThread;
420+
}
421+
}
422+
211423
}

0 commit comments

Comments
 (0)