Skip to content

Commit ba6875d

Browse files
committed
GH-1034: DMLC: Cancel consumer after failed ack
Resolves #1034 The monitor task now cancels the consumer after a failed ack/nack, whether or not the channel `isOpen()` returns true. Test with a mock channel that stays open after a failed ack. **cherry-pick to 2.1.x, 2.0.x** * Fix tests removing AssertJ dependency
1 parent 47c5baa commit ba6875d

File tree

2 files changed

+69
-4
lines changed

2 files changed

+69
-4
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ protected void actualStart() throws Exception {
387387
synchronized (this.consumersMonitor) {
388388
consumersToCancel = this.consumers.stream()
389389
.filter(c -> {
390-
boolean open = c.getChannel().isOpen();
390+
boolean open = c.getChannel().isOpen() && !c.isAckFailed();
391391
if (open && this.messagesPerAck > 1) {
392392
try {
393393
c.ackIfNecessary(now);
@@ -502,7 +502,6 @@ protected void actualStart() throws Exception {
502502
}
503503
}
504504
}
505-
506505
});
507506
}
508507
else {
@@ -717,7 +716,7 @@ private void cancelConsumer(SimpleConsumer consumer) {
717716
this.logger.debug("Canceling " + consumer);
718717
}
719718
synchronized (consumer) {
720-
consumer.canceled = true;
719+
consumer.setCanceled(true);
721720
if (this.messagesPerAck > 1) {
722721
consumer.ackIfNecessary(0L);
723722
}
@@ -791,6 +790,8 @@ final class SimpleConsumer extends DefaultConsumer {
791790

792791
private volatile boolean canceled;
793792

793+
private volatile boolean ackFailed;
794+
794795
private SimpleConsumer(Connection connection, Channel channel, String queue) {
795796
super(channel);
796797
this.connection = connection;
@@ -815,6 +816,23 @@ int getEpoch() {
815816
return this.epoch;
816817
}
817818

819+
/**
820+
* Set to true to indicate this consumer is canceled and should send any pending
821+
* acks.
822+
* @param canceled the canceled to set
823+
*/
824+
void setCanceled(boolean canceled) {
825+
this.canceled = canceled;
826+
}
827+
828+
/**
829+
* True if an ack/nack failed (probably due to a closed channel).
830+
* @return the ackFailed
831+
*/
832+
boolean isAckFailed() {
833+
return this.ackFailed;
834+
}
835+
818836
/**
819837
* Increment and return the current epoch for this consumer; consumersMonitor must
820838
* be held.
@@ -966,6 +984,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
966984
}
967985
}
968986
catch (Exception e) {
987+
this.ackFailed = true;
969988
this.logger.error("Error acking", e);
970989
}
971990
}
@@ -976,7 +995,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
976995
* @param now the current time.
977996
* @throws IOException if one occurs.
978997
*/
979-
private synchronized void ackIfNecessary(long now) throws IOException {
998+
synchronized void ackIfNecessary(long now) throws IOException {
980999
if (this.pendingAcks >= this.messagesPerAck || (
9811000
this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled))) {
9821001
sendAck(now);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.mockito.ArgumentMatchers.eq;
2929
import static org.mockito.BDDMockito.given;
3030
import static org.mockito.BDDMockito.willAnswer;
31+
import static org.mockito.BDDMockito.willThrow;
3132
import static org.mockito.Mockito.mock;
3233
import static org.mockito.Mockito.times;
3334
import static org.mockito.Mockito.verify;
@@ -42,6 +43,7 @@
4243
import org.junit.Test;
4344
import org.mockito.Mockito;
4445

46+
import org.springframework.amqp.core.MessageListener;
4547
import org.springframework.amqp.rabbit.connection.ChannelProxy;
4648
import org.springframework.amqp.rabbit.connection.Connection;
4749
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -265,6 +267,50 @@ public void testRemoveQueuesWhileNotConnected() throws Exception {
265267
container.stop();
266268
}
267269

270+
@Test
271+
public void testMonitorCancelsAfterBadAckEvenIfChannelReportsOpen() throws Exception {
272+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
273+
Connection connection = mock(Connection.class);
274+
ChannelProxy channel = mock(ChannelProxy.class);
275+
Channel rabbitChannel = mock(Channel.class);
276+
given(channel.getTargetChannel()).willReturn(rabbitChannel);
277+
278+
given(connectionFactory.createConnection()).willReturn(connection);
279+
given(connection.createChannel(anyBoolean())).willReturn(channel);
280+
given(channel.isOpen()).willReturn(true);
281+
given(channel.queueDeclarePassive(Mockito.anyString()))
282+
.willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class));
283+
AtomicReference<Consumer> consumer = new AtomicReference<>();
284+
final CountDownLatch latch1 = new CountDownLatch(1);
285+
final CountDownLatch latch2 = new CountDownLatch(1);
286+
willAnswer(inv -> {
287+
consumer.set(inv.getArgument(6));
288+
latch1.countDown();
289+
return "consumerTag";
290+
}).given(channel).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
291+
anyMap(), any(Consumer.class));
292+
293+
willThrow(new RuntimeException("bad ack")).given(channel).basicAck(1L, false);
294+
willAnswer(inv -> {
295+
consumer.get().handleCancelOk("consumerTag");
296+
latch2.countDown();
297+
return null;
298+
}).given(channel).basicCancel("consumerTag");
299+
300+
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
301+
container.setQueueNames("test");
302+
container.setPrefetchCount(2);
303+
container.setMonitorInterval(100);
304+
container.setMessageListener(mock(MessageListener.class));
305+
container.afterPropertiesSet();
306+
container.start();
307+
308+
assertTrue(latch1.await(10, TimeUnit.SECONDS));
309+
consumer.get().handleDelivery("consumerTag", envelope(1L), new BasicProperties(), new byte[1]);
310+
assertTrue(latch2.await(10, TimeUnit.SECONDS));
311+
container.stop();
312+
}
313+
268314
private Envelope envelope(long tag) {
269315
return new Envelope(tag, false, "", "");
270316
}

0 commit comments

Comments
 (0)