Skip to content

Commit 21277f3

Browse files
garyrussellartembilan
authored andcommitted
GH-837: Fix DMLC Recovery with queue removal
Fixes #837 - Skip recovery of consumers who's queue has been deleted during recovery - If recovery is aborted (no connection) reset `consumersToRestart` **cherry-pick to 2.0.x**
1 parent 9cc15f7 commit 21277f3

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.Date;
26+
import java.util.Iterator;
2627
import java.util.LinkedList;
2728
import java.util.List;
2829
import java.util.Map;
@@ -448,8 +449,18 @@ protected void actualStart() throws Exception {
448449
if (restartableConsumers.size() > 0) {
449450
doRedeclareElementsIfNecessary();
450451
}
451-
for (SimpleConsumer consumer : restartableConsumers) {
452-
if (this.logger.isDebugEnabled() && restartableConsumers.size() > 0) {
452+
Iterator<SimpleConsumer> iterator = restartableConsumers.iterator();
453+
while (iterator.hasNext()) {
454+
SimpleConsumer consumer = iterator.next();
455+
iterator.remove();
456+
if (!DirectMessageListenerContainer.this.consumersByQueue
457+
.containsKey(consumer.getQueue())) {
458+
if (this.logger.isDebugEnabled()) {
459+
this.logger.debug("Skipping restart of consumer " + consumer);
460+
}
461+
continue;
462+
}
463+
if (this.logger.isDebugEnabled()) {
453464
this.logger.debug("Attempting to restart consumer " + consumer);
454465
}
455466
Queue queue = namesToQueues.get(consumer.getQueue());
@@ -471,6 +482,10 @@ protected void actualStart() throws Exception {
471482
this.logger.error("Application context is closed, terminating");
472483
this.taskScheduler.schedule(this::stop, new Date());
473484
}
485+
this.consumersToRestart.addAll(restartableConsumers);
486+
if (this.logger.isTraceEnabled()) {
487+
this.logger.trace("After restart exception, consumers to restart now: " + this.consumersToRestart);
488+
}
474489
break;
475490
}
476491
}
@@ -772,6 +787,9 @@ private void cancelConsumer(SimpleConsumer consumer) {
772787
private void addConsumerToRestart(SimpleConsumer consumer) {
773788
if (this.started) {
774789
this.consumersToRestart.add(consumer);
790+
if (this.logger.isTraceEnabled()) {
791+
this.logger.trace("Consumers to restart now: " + this.consumersToRestart);
792+
}
775793
}
776794
}
777795

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import static org.mockito.ArgumentMatchers.anyLong;
2626
import static org.mockito.ArgumentMatchers.anyMap;
2727
import static org.mockito.ArgumentMatchers.anyString;
28+
import static org.mockito.ArgumentMatchers.eq;
2829
import static org.mockito.BDDMockito.given;
2930
import static org.mockito.BDDMockito.willAnswer;
3031
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.times;
3133
import static org.mockito.Mockito.verify;
3234

3335
import java.util.concurrent.CountDownLatch;
@@ -198,6 +200,64 @@ else if (i.getArgument(0).equals(17L)) {
198200
verify(channel).basicAck(20L, true);
199201
}
200202

203+
@Test
204+
public void testRemoveQueuesWhileNotConnected() throws Exception {
205+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
206+
Connection connection = mock(Connection.class);
207+
ChannelProxy channel = mock(ChannelProxy.class);
208+
Channel rabbitChannel = mock(AutorecoveringChannel.class);
209+
given(channel.getTargetChannel()).willReturn(rabbitChannel);
210+
211+
given(connectionFactory.createConnection()).willReturn(connection);
212+
given(connection.createChannel(anyBoolean())).willReturn(channel);
213+
final AtomicBoolean isOpen = new AtomicBoolean(true);
214+
willAnswer(i -> isOpen.get()).given(channel).isOpen();
215+
given(channel.queueDeclarePassive(Mockito.anyString()))
216+
.willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class));
217+
given(channel.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
218+
anyMap(), any(Consumer.class))).willReturn("consumerTag");
219+
220+
final CountDownLatch latch1 = new CountDownLatch(2);
221+
final CountDownLatch latch3 = new CountDownLatch(3);
222+
final AtomicInteger qos = new AtomicInteger();
223+
willAnswer(i -> {
224+
qos.set(i.getArgument(0));
225+
latch1.countDown();
226+
latch3.countDown();
227+
return null;
228+
}).given(channel).basicQos(anyInt());
229+
final CountDownLatch latch2 = new CountDownLatch(2);
230+
willAnswer(i -> {
231+
latch2.countDown();
232+
return null;
233+
}).given(channel).basicCancel("consumerTag");
234+
235+
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
236+
container.setQueueNames("test1", "test2");
237+
container.setPrefetchCount(2);
238+
container.setMonitorInterval(100);
239+
container.setFailedDeclarationRetryInterval(100);
240+
container.setRecoveryInterval(100);
241+
container.setShutdownTimeout(1);
242+
container.afterPropertiesSet();
243+
container.start();
244+
245+
assertTrue(latch1.await(10, TimeUnit.SECONDS));
246+
assertThat(qos.get(), equalTo(2));
247+
isOpen.set(false);
248+
assertTrue(latch2.await(10, TimeUnit.SECONDS));
249+
container.removeQueueNames("test1");
250+
isOpen.set(true);
251+
assertTrue(latch3.await(10, TimeUnit.SECONDS));
252+
253+
verify(channel, times(1)).basicConsume(eq("test1"), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
254+
anyMap(), any(Consumer.class));
255+
verify(channel, times(2)).basicConsume(eq("test2"), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
256+
anyMap(), any(Consumer.class));
257+
258+
container.stop();
259+
}
260+
201261
private Envelope envelope(long tag) {
202262
return new Envelope(tag, false, "", "");
203263
}

0 commit comments

Comments
 (0)