Skip to content

Commit 373732e

Browse files
artembilangaryrussell
authored andcommitted
AMQP-847: Close channel in RabbitTemplate.receive
JIRA: https://jira.spring.io/browse/AMQP-847 To avoid unacked messages race condition when client timeouts, but at this moment the message becomes available in queue, physically close a receive channel on the `TimeoutException` from the `Future.get()` **Cherry-pick to 2.0.x & 1.7.x**
1 parent 639bddf commit 373732e

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
* @author Artem Bilan
139139
* @author Ernest Sadykov
140140
* @author Mark Norkin
141+
*
141142
* @since 1.0
142143
*/
143144
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener,
@@ -1214,7 +1215,7 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
12141215
Thread.currentThread().interrupt();
12151216
}
12161217
catch (TimeoutException e) {
1217-
// no result in time
1218+
RabbitUtils.setPhysicalCloseRequired(channel, true);
12181219
}
12191220
finally {
12201221
if ((exception == null || !(exception instanceof ConsumerCancelledException))

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Arrays;
4949
import java.util.Collection;
5050
import java.util.HashMap;
51+
import java.util.List;
5152
import java.util.Map;
5253
import java.util.Properties;
5354
import java.util.UUID;
@@ -391,6 +392,14 @@ public void testReceiveBlockingNoTimeout() throws Exception {
391392
}
392393
}
393394

395+
@Test
396+
public void testReceiveTimeoutRequeue() {
397+
assertNull(this.template.receiveAndConvert(ROUTE, 1));
398+
assertEquals(0,
399+
TestUtils.getPropertyValue(this.connectionFactory, "cachedChannelsNonTransactional", List.class)
400+
.size());
401+
}
402+
394403
@Test
395404
public void testReceiveBlockingTx() throws Exception {
396405
this.template.convertAndSend(ROUTE, "blockTX");

0 commit comments

Comments
 (0)