Skip to content

Commit be12144

Browse files
committed
RequestN reduces with completed subscribers.
#### Problem Re-calculation of max `requestN` per subscriber did not take into account removed subscribers correctly. This causes the `requestN` value to decrease over time when subscribers are completed. #### Modification - Modified `recalculateMaxPerSubscriber` to take old and new subscriber count instead of trying to determine the correct value from the current subscriber queue size. - `subscribers.remove()` does not happen on unsubscribe but only from the task that recalculates the max `requestN` values. This also coalesces multiple removes into a single task run. #### Result Consistent `requestN` values with new subscribes and completions.
1 parent 208b804 commit be12144

File tree

2 files changed

+126
-44
lines changed

2 files changed

+126
-44
lines changed

rxnetty-common/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.util.ArrayList;
3838
import java.util.Collections;
39+
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.concurrent.ConcurrentLinkedQueue;
4142

@@ -358,6 +359,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
358359
*/
359360
private int perSubscriberMaxRequest = MAX_PER_SUBSCRIBER_REQUEST;
360361
private Channel channel;
362+
private boolean removeTaskScheduled; // Guarded by this
361363

362364
BytesWriteInterceptor(String parentHandlerName) {
363365
this.parentHandlerName = parentHandlerName;
@@ -389,16 +391,22 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
389391
}
390392

391393
public WriteStreamSubscriber newSubscriber(final ChannelHandlerContext ctx, ChannelPromise promise) {
392-
393-
recalculateMaxPerSubscriber();
394+
int currentSubCount = subscribers.size();
395+
recalculateMaxPerSubscriber(currentSubCount, currentSubCount + 1);
394396

395397
final WriteStreamSubscriber sub = new WriteStreamSubscriber(ctx, promise, perSubscriberMaxRequest);
396398
sub.add(Subscriptions.create(new Action0() {
397399
@Override
398400
public void call() {
399-
/*Remove the subscriber on unsubscribe.*/
400-
subscribers.remove(sub);
401-
ctx.channel().eventLoop().execute(BytesWriteInterceptor.this);
401+
boolean _schedule;
402+
/*Schedule the task once as the task runs through and removes all unsubscribed subscribers*/
403+
synchronized (BytesWriteInterceptor.this) {
404+
_schedule = !removeTaskScheduled;
405+
removeTaskScheduled = true;
406+
}
407+
if (_schedule) {
408+
ctx.channel().eventLoop().execute(BytesWriteInterceptor.this);
409+
}
402410
}
403411
}));
404412

@@ -422,21 +430,37 @@ private void requestMoreIfWritable(Channel channel) {
422430

423431
@Override
424432
public void run() {
425-
recalculateMaxPerSubscriber();
433+
synchronized (this) {
434+
removeTaskScheduled = false;
435+
}
436+
int oldSubCount = subscribers.size();
437+
for (Iterator<WriteStreamSubscriber> iterator = subscribers.iterator(); iterator.hasNext(); ) {
438+
WriteStreamSubscriber subscriber = iterator.next();
439+
if (subscriber.isUnsubscribed()) {
440+
iterator.remove();
441+
}
442+
}
443+
int newSubCount = subscribers.size();
444+
recalculateMaxPerSubscriber(oldSubCount, newSubCount);
426445
}
427446

428447
/**
429448
* Called from within the eventloop, whenever the subscriber queue is modified. This modifies the per subscriber
430449
* request limit by equally distributing the demand. Minimum demand to any subscriber is 1.
431450
*/
432-
private void recalculateMaxPerSubscriber() {
451+
private void recalculateMaxPerSubscriber(int oldSubCount, int newSubCount) {
433452
assert channel.eventLoop().inEventLoop();
434-
435-
int subCount = subscribers.size();
436-
perSubscriberMaxRequest = 0 == subCount ? perSubscriberMaxRequest
437-
: perSubscriberMaxRequest * subCount / (subCount + 1);
453+
perSubscriberMaxRequest = newSubCount == 0 || oldSubCount == 0
454+
? MAX_PER_SUBSCRIBER_REQUEST
455+
: perSubscriberMaxRequest * oldSubCount / newSubCount;
438456

439457
perSubscriberMaxRequest = Math.max(1, perSubscriberMaxRequest);
458+
459+
if (logger.isDebugEnabled()) {
460+
logger.debug("Channel {}. Modifying per subscriber max request. Old subscribers count {}, " +
461+
"new subscribers count {}. New Value {} ", channel, oldSubCount, newSubCount,
462+
perSubscriberMaxRequest);
463+
}
440464
}
441465
}
442466

@@ -540,7 +564,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
540564

541565
boolean _isPromiseCompletedOnWriteComplete;
542566

543-
/**
567+
/*
544568
* The intent here is to NOT give listener callbacks via promise completion within the sync block.
545569
* So, a co-ordination b/w the thread sending Observable terminal event and thread sending write
546570
* completion event is required.
@@ -553,7 +577,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
553577
synchronized (guard) {
554578
listeningTo--;
555579
if (0 == listeningTo && isDone) {
556-
/**
580+
/*
557581
* If the listening count is 0 and no more items will arrive, this thread wins the race of
558582
* completing the overarchingWritePromise
559583
*/
@@ -562,7 +586,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
562586
_isPromiseCompletedOnWriteComplete = isPromiseCompletedOnWriteComplete;
563587
}
564588

565-
/**
589+
/*
566590
* Exceptions are not buffered but completion is only sent when there are no more items to be
567591
* received for write.
568592
*/
@@ -597,7 +621,7 @@ private void onTermination(Throwable throwableIfAny) {
597621
boolean _shouldCompletePromise;
598622
final boolean enqueueFlush;
599623

600-
/**
624+
/*
601625
* The intent here is to NOT give listener callbacks via promise completion within the sync block.
602626
* So, a co-ordination b/w the thread sending Observable terminal event and thread sending write
603627
* completion event is required.
@@ -611,7 +635,7 @@ private void onTermination(Throwable throwableIfAny) {
611635
enqueueFlush = atleastOneWriteEnqueued;
612636
isDone = true;
613637
_listeningTo = listeningTo;
614-
/**
638+
/*
615639
* Flag to indicate whether the write complete thread won the race and will complete the
616640
* overarchingWritePromise
617641
*/

rxnetty-common/src/test/java/io/reactivex/netty/channel/BytesWriteInterceptorTest.java

Lines changed: 86 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.reactivex.netty.channel.BackpressureManagingHandler.BytesWriteInterceptor;
2222
import io.reactivex.netty.channel.BackpressureManagingHandler.WriteStreamSubscriber;
2323
import io.reactivex.netty.test.util.MockProducer;
24-
import org.hamcrest.Matcher;
2524
import org.junit.Rule;
2625
import org.junit.Test;
2726
import org.junit.rules.ExternalResource;
@@ -46,23 +45,18 @@ public void testAddSubscriber() throws Exception {
4645
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), contains(sub1));
4746

4847
sub1.unsubscribe();
49-
48+
inspectorRule.channel.runPendingTasks();
5049
assertThat("Subscriber not removed post unsubscribe", inspectorRule.interceptor.getSubscribers(), is(empty()));
5150
}
5251

5352
@Test(timeout = 60000)
5453
public void testRequestMore() throws Exception {
5554

5655
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
57-
MockProducer mockProducer = InspectorRule.setupSubscriber(sub1);
58-
56+
MockProducer mockProducer = inspectorRule.setupSubscriberAndValidate(sub1, 1);
5957
assertThat("Unexpected items requested from producer.", mockProducer.getRequested(), is(defaultRequestN()));
6058

61-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), hasSize(1));
62-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), contains(sub1));
63-
64-
String msg = "Hello";
65-
inspectorRule.channel.writeAndFlush(msg);
59+
inspectorRule.sendMessages(1);
6660

6761
assertThat("Channel not writable post write.", inspectorRule.channel.isWritable(), is(true));
6862
assertThat("Unexpected items requested.", mockProducer.getRequested(), is(defaultRequestN()));
@@ -72,13 +66,9 @@ public void testRequestMore() throws Exception {
7266
public void testRequestMorePostFlush() throws Exception {
7367

7468
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
75-
MockProducer mockProducer = InspectorRule.setupSubscriber(sub1);
76-
69+
MockProducer mockProducer = inspectorRule.setupSubscriberAndValidate(sub1, 1);
7770
assertThat("Unexpected items requested from producer.", mockProducer.getRequested(), is(defaultRequestN()));
7871

79-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), hasSize(1));
80-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), contains(sub1));
81-
8272
inspectorRule.channel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(1, 2)); /*Make sure that the channel is not writable on writing.*/
8373

8474
String msg = "Hello";
@@ -97,19 +87,12 @@ public void testRequestMorePostFlush() throws Exception {
9787
@Test(timeout = 60000)
9888
public void testMultiSubscribers() throws Exception {
9989
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
100-
MockProducer producer1 = InspectorRule.setupSubscriber(sub1);
101-
102-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), hasSize(1));
103-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), contains(sub1));
90+
MockProducer producer1 = inspectorRule.setupSubscriberAndValidate(sub1, 1);
10491

10592
WriteStreamSubscriber sub2 = inspectorRule.newSubscriber();
106-
MockProducer producer2 = InspectorRule.setupSubscriber(sub2);
93+
MockProducer producer2 = inspectorRule.setupSubscriberAndValidate(sub2, 2);
10794

108-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), hasSize(2));
109-
assertThat("Subscriber not added.", inspectorRule.interceptor.getSubscribers(), contains(sub1, sub2));
110-
111-
String msg = "Hello";
112-
inspectorRule.channel.writeAndFlush(msg);
95+
inspectorRule.sendMessages(1);
11396

11497
assertThat("Channel not writable post write.", inspectorRule.channel.isWritable(), is(true));
11598
assertThat("Unexpected items requested from first subscriber.", producer1.getRequested(),
@@ -118,6 +101,47 @@ public void testMultiSubscribers() throws Exception {
118101
is(defaultRequestN() / 2));
119102
}
120103

104+
@Test(timeout = 10000)
105+
public void testOneLongWriteAndManySmallWrites() throws Exception {
106+
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
107+
MockProducer producer1 = inspectorRule.setupSubscriberAndValidate(sub1, 1);
108+
assertThat("Unexpected items requested from producer.", producer1.getRequested(), is(defaultRequestN()));
109+
inspectorRule.setupNewSubscriberAndComplete(2, true);
110+
inspectorRule.setupNewSubscriberAndComplete(2, true);
111+
112+
inspectorRule.sendMessages(sub1, 33);
113+
assertThat("Unexpected items requested.", producer1.getRequested(), is(97L));
114+
}
115+
116+
@Test(timeout = 10000)
117+
public void testBatchedSubscriberRemoves() throws Exception {
118+
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
119+
MockProducer producer1 = inspectorRule.setupSubscriberAndValidate(sub1, 1);
120+
assertThat("Unexpected items requested from producer.", producer1.getRequested(), is(defaultRequestN()));
121+
for (int i=1; i < 5; i++) {
122+
inspectorRule.setupNewSubscriberAndComplete(i+1, false);
123+
}
124+
125+
inspectorRule.channel.runPendingTasks();
126+
127+
inspectorRule.sendMessages(sub1, 35);
128+
assertThat("Unexpected items requested.", producer1.getRequested(), is(95L));
129+
}
130+
131+
@Test(timeout = 10000)
132+
public void testMinRequestN() throws Exception {
133+
for (int i=1; i < 66; i++) {
134+
inspectorRule.setupNewSubscriberAndComplete(i, false);
135+
}
136+
WriteStreamSubscriber sub1 = inspectorRule.newSubscriber();
137+
MockProducer producer1 = inspectorRule.setupSubscriberAndValidate(sub1, 66);
138+
assertThat("Unexpected items requested from producer.", producer1.getRequested(), is(1L));
139+
140+
inspectorRule.channel.runPendingTasks();
141+
inspectorRule.sendMessages(sub1, 35);
142+
assertThat("Unexpected items requested.", producer1.getRequested(), greaterThan(1L));
143+
}
144+
121145
public static class InspectorRule extends ExternalResource {
122146

123147
private BytesWriteInterceptor interceptor;
@@ -136,18 +160,52 @@ public void evaluate() throws Throwable {
136160
}
137161

138162
WriteStreamSubscriber newSubscriber() {
139-
return interceptor.newSubscriber(channel.pipeline().firstContext(), channel.newPromise());
163+
return interceptor.newSubscriber(channel.pipeline().lastContext(), channel.newPromise());
140164
}
141165

142-
private static MockProducer setupSubscriber(WriteStreamSubscriber sub1) {
143-
sub1.onStart();
166+
private MockProducer setupSubscriberAndValidate(WriteStreamSubscriber sub, int expectedSubCount) {
167+
MockProducer mockProducer = setupSubscriber(sub);
168+
assertThat("Subscriber not added.", interceptor.getSubscribers(), hasSize(expectedSubCount));
169+
assertThat("Subscriber not added.", interceptor.getSubscribers().get(expectedSubCount - 1), equalTo(sub));
170+
return mockProducer;
171+
}
172+
173+
private static MockProducer setupSubscriber(WriteStreamSubscriber sub) {
174+
sub.onStart();
144175
MockProducer mockProducer = new MockProducer();
145-
sub1.setProducer(mockProducer);
176+
sub.setProducer(mockProducer);
146177
return mockProducer;
147178
}
148179

149180
public static Long defaultRequestN() {
150181
return Long.valueOf(MAX_PER_SUBSCRIBER_REQUEST);
151182
}
183+
184+
public void sendMessages(WriteStreamSubscriber subscriber, int msgCount) {
185+
for(int i=0; i < msgCount; i++) {
186+
subscriber.onNext("Hello");
187+
channel.write("Hello");
188+
}
189+
channel.flush();
190+
}
191+
192+
public void sendMessages(int msgCount) {
193+
for(int i=0; i < msgCount; i++) {
194+
channel.write("Hello");
195+
}
196+
channel.flush();
197+
}
198+
199+
public void setupNewSubscriberAndComplete(int expectedSubCount, boolean runPendingTasks) {
200+
WriteStreamSubscriber sub2 = newSubscriber();
201+
MockProducer producer2 = setupSubscriberAndValidate(sub2, expectedSubCount);
202+
assertThat("Unexpected items requested from producer.", producer2.getRequested(),
203+
lessThanOrEqualTo(Math.max(1, defaultRequestN()/expectedSubCount)));
204+
sub2.onCompleted();
205+
sub2.unsubscribe();
206+
if (runPendingTasks) {
207+
channel.runPendingTasks();
208+
}
209+
}
152210
}
153211
}

0 commit comments

Comments
 (0)