|
41 | 41 | import org.junit.Test;
|
42 | 42 | import org.mockito.Mockito;
|
43 | 43 |
|
| 44 | +import org.springframework.amqp.core.AcknowledgeMode; |
44 | 45 | import org.springframework.amqp.core.MessageListener;
|
45 | 46 | import org.springframework.amqp.rabbit.connection.ChannelProxy;
|
46 | 47 | import org.springframework.amqp.rabbit.connection.Connection;
|
@@ -175,7 +176,7 @@ else if (i.getArgument(0).equals(17L)) {
|
175 | 176 | }
|
176 | 177 | Thread.sleep(200);
|
177 | 178 | consumer.get().handleDelivery("consumerTag", envelope(16), props, body);
|
178 |
| - // should get 2 acks #10 and #6 (timeout) |
| 179 | + // should get 2 acks #10 and #16 (timeout) |
179 | 180 | assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
|
180 | 181 | consumer.get().handleDelivery("consumerTag", envelope(17), props, body);
|
181 | 182 | verify(channel).basicAck(10L, true);
|
@@ -309,6 +310,56 @@ public void testMonitorCancelsAfterBadAckEvenIfChannelReportsOpen() throws Excep
|
309 | 310 | container.stop();
|
310 | 311 | }
|
311 | 312 |
|
| 313 | + @Test |
| 314 | + public void testMonitorCancelsAfterTargetChannelChanges() throws Exception { |
| 315 | + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); |
| 316 | + Connection connection = mock(Connection.class); |
| 317 | + ChannelProxy channel = mock(ChannelProxy.class); |
| 318 | + Channel rabbitChannel1 = mock(Channel.class); |
| 319 | + Channel rabbitChannel2 = mock(Channel.class); |
| 320 | + AtomicReference<Channel> target = new AtomicReference<>(rabbitChannel1); |
| 321 | + willAnswer(inv -> { |
| 322 | + return target.get(); |
| 323 | + }).given(channel).getTargetChannel(); |
| 324 | + |
| 325 | + given(connectionFactory.createConnection()).willReturn(connection); |
| 326 | + given(connection.createChannel(anyBoolean())).willReturn(channel); |
| 327 | + given(channel.isOpen()).willReturn(true); |
| 328 | + given(channel.queueDeclarePassive(Mockito.anyString())) |
| 329 | + .willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class)); |
| 330 | + AtomicReference<Consumer> consumer = new AtomicReference<>(); |
| 331 | + final CountDownLatch latch1 = new CountDownLatch(1); |
| 332 | + final CountDownLatch latch2 = new CountDownLatch(1); |
| 333 | + willAnswer(inv -> { |
| 334 | + consumer.set(inv.getArgument(6)); |
| 335 | + latch1.countDown(); |
| 336 | + return "consumerTag"; |
| 337 | + }).given(channel).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), |
| 338 | + anyMap(), any(Consumer.class)); |
| 339 | + |
| 340 | + willAnswer(inv -> { |
| 341 | + consumer.get().handleCancelOk("consumerTag"); |
| 342 | + latch2.countDown(); |
| 343 | + return null; |
| 344 | + }).given(channel).basicCancel("consumerTag"); |
| 345 | + |
| 346 | + DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory); |
| 347 | + container.setQueueNames("test"); |
| 348 | + container.setPrefetchCount(2); |
| 349 | + container.setMonitorInterval(100); |
| 350 | + container.setMessageListener(msg -> { |
| 351 | + target.set(rabbitChannel2); |
| 352 | + }); |
| 353 | + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); |
| 354 | + container.afterPropertiesSet(); |
| 355 | + container.start(); |
| 356 | + |
| 357 | + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); |
| 358 | + consumer.get().handleDelivery("consumerTag", envelope(1L), new BasicProperties(), new byte[1]); |
| 359 | + assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue(); |
| 360 | + container.stop(); |
| 361 | + } |
| 362 | + |
312 | 363 | private Envelope envelope(long tag) {
|
313 | 364 | return new Envelope(tag, false, "", "");
|
314 | 365 | }
|
|
0 commit comments