Skip to content

Commit 5ce8529

Browse files
artembilangaryrussell
authored andcommitted
AMQP-801-2: Introduce ConsumerDecorator
JIRA: https://jira.spring.io/browse/AMQP-801 To properly assign the queue to the `ConsumeOkEvent`, we need perform such a logic in the `Consumer.handleConsumeOk()`. * Introduce `BlockingQueueConsumer.ConsumerDecorator` to be created on each `channel.basicConsume()` for wrapping the target `InternalConsumer` per queue * Add getters to the `ConsumeOkEvent` for better interoperability * Assert assigned queue names for the `ConsumeOkEvent`s in the `SimpleMessageListenerContainerIntegration2Tests` **Cherry-pick to 1.7.x** * Add `ConsumerDecorator.consumerTag` property * Add `ConsumerDecorator.toString()` * Add JavaDocs for the `ConsumeOkEvent`
1 parent d547a86 commit 5ce8529

File tree

3 files changed

+105
-15
lines changed

3 files changed

+105
-15
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import com.rabbitmq.client.AMQP;
6464
import com.rabbitmq.client.AlreadyClosedException;
6565
import com.rabbitmq.client.Channel;
66+
import com.rabbitmq.client.Consumer;
6667
import com.rabbitmq.client.DefaultConsumer;
6768
import com.rabbitmq.client.Envelope;
6869
import com.rabbitmq.client.Recoverable;
@@ -239,10 +240,10 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
239240
* @param queues The queues.
240241
*/
241242
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
242-
MessagePropertiesConverter messagePropertiesConverter,
243-
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
244-
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
245-
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
243+
MessagePropertiesConverter messagePropertiesConverter,
244+
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
245+
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
246+
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
246247
this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
247248
prefetchCount, defaultRequeueRejected, consumerArgs, false, exclusive, queues);
248249
}
@@ -475,8 +476,8 @@ private Message handle(Delivery delivery) throws InterruptedException {
475476
*/
476477
public Message nextMessage() throws InterruptedException, ShutdownSignalException {
477478
if (logger.isTraceEnabled()) {
478-
logger.trace("Retrieving delivery for " + this);
479-
}
479+
logger.trace("Retrieving delivery for " + this);
480+
}
480481
return handle(this.queue.take());
481482
}
482483

@@ -666,8 +667,10 @@ private void addRecoveryListener() {
666667

667668
private void consumeFromQueue(String queue) throws IOException {
668669
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
669-
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal, this.exclusive,
670-
this.consumerArgs, this.consumer);
670+
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
671+
this.exclusive, this.consumerArgs,
672+
new ConsumerDecorator(queue, this.consumer, this.applicationEventPublisher));
673+
671674
if (consumerTag != null) {
672675
this.consumerTags.put(consumerTag, queue);
673676
if (logger.isDebugEnabled()) {
@@ -810,7 +813,7 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
810813
*/
811814
boolean isLocallyTransacted = locallyTransacted
812815
|| (this.transactional
813-
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
816+
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
814817
try {
815818

816819
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
@@ -874,11 +877,6 @@ public void handleConsumeOk(String consumerTag) {
874877
if (logger.isDebugEnabled()) {
875878
logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
876879
}
877-
if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
878-
String queueName = BlockingQueueConsumer.this.consumerTags.get(consumerTag);
879-
BlockingQueueConsumer.this.applicationEventPublisher
880-
.publishEvent(new ConsumeOkEvent(this, queueName, consumerTag));
881-
}
882880
}
883881

884882
@Override
@@ -952,6 +950,62 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
952950

953951
}
954952

953+
private static final class ConsumerDecorator implements Consumer {
954+
955+
private final String queue;
956+
957+
private final Consumer delegate;
958+
959+
private final ApplicationEventPublisher applicationEventPublisher;
960+
961+
private String consumerTag;
962+
963+
ConsumerDecorator(String queue, Consumer delegate, ApplicationEventPublisher applicationEventPublisher) {
964+
this.queue = queue;
965+
this.delegate = delegate;
966+
this.applicationEventPublisher = applicationEventPublisher;
967+
}
968+
969+
970+
public void handleConsumeOk(String consumerTag) {
971+
this.consumerTag = consumerTag;
972+
this.delegate.handleConsumeOk(consumerTag);
973+
if (this.applicationEventPublisher != null) {
974+
this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this.delegate, this.queue, consumerTag));
975+
}
976+
}
977+
978+
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
979+
this.delegate.handleShutdownSignal(consumerTag, sig);
980+
}
981+
982+
public void handleCancel(String consumerTag) throws IOException {
983+
this.delegate.handleCancel(consumerTag);
984+
}
985+
986+
public void handleCancelOk(String consumerTag) {
987+
this.delegate.handleCancelOk(consumerTag);
988+
}
989+
990+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
991+
byte[] body) throws IOException {
992+
993+
this.delegate.handleDelivery(consumerTag, envelope, properties, body);
994+
}
995+
996+
public void handleRecoverOk(String consumerTag) {
997+
this.delegate.handleRecoverOk(consumerTag);
998+
}
999+
1000+
@Override
1001+
public String toString() {
1002+
return "ConsumerDecorator{" + "queue='" + this.queue + '\'' +
1003+
", consumerTag='" + this.consumerTag + '\'' +
1004+
'}';
1005+
}
1006+
1007+
}
1008+
9551009
@SuppressWarnings("serial")
9561010
private static final class DeclarationException extends AmqpException {
9571011

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConsumeOkEvent.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,12 @@
1919
import org.springframework.amqp.event.AmqpEvent;
2020

2121
/**
22+
* An {@link AmqpEvent} emitted by the listener container
23+
* when consumer is subscribed to the queue.
24+
*
2225
* @author Gary Russell
26+
* @author Artem Bilan
27+
*
2328
* @since 1.7.5
2429
*
2530
*/
@@ -30,12 +35,37 @@ public class ConsumeOkEvent extends AmqpEvent {
3035

3136
private final String consumerTag;
3237

38+
/**
39+
* Instantiate a {@link ConsumeOkEvent} based on the provided
40+
* consumer, queue and consumer tag.
41+
* @param source the consumer subscribed to the queue
42+
* @param queue the queue to consume
43+
* @param consumerTag the tag indicate a consumer subscription
44+
*/
3345
public ConsumeOkEvent(Object source, String queue, String consumerTag) {
3446
super(source);
3547
this.queue = queue;
3648
this.consumerTag = consumerTag;
3749
}
3850

51+
/**
52+
* Obtain the queue name a consumer has been subscribed.
53+
* @return the queue name a consumer subscribed.
54+
* @since 1.7.7
55+
*/
56+
public String getQueue() {
57+
return this.queue;
58+
}
59+
60+
/**
61+
* Obtain the consumer tag assigned to the consumer.
62+
* @return the consumer tag for subscription.
63+
* @since 1.7.7
64+
*/
65+
public String getConsumerTag() {
66+
return this.consumerTag;
67+
}
68+
3969
@Override
4070
public String toString() {
4171
return "ConsumeOkEvent [queue=" + this.queue + ", consumerTag=" + this.consumerTag

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.hamcrest.Matchers.equalTo;
2121
import static org.hamcrest.Matchers.hasItem;
2222
import static org.hamcrest.Matchers.instanceOf;
23+
import static org.hamcrest.Matchers.isOneOf;
2324
import static org.junit.Assert.assertEquals;
2425
import static org.junit.Assert.assertFalse;
2526
import static org.junit.Assert.assertNotNull;
@@ -92,6 +93,7 @@
9293
* @author Gunnar Hillert
9394
* @author Gary Russell
9495
* @author Artem Bilan
96+
*
9597
* @since 1.3
9698
*
9799
*/
@@ -277,7 +279,11 @@ public void testDeleteOneQueue() throws Exception {
277279
assertThat(events.size(), equalTo(8));
278280
assertThat(events.get(0), instanceOf(AsyncConsumerStartedEvent.class));
279281
assertThat(events.get(1), instanceOf(ConsumeOkEvent.class));
282+
ConsumeOkEvent consumeOkEvent = (ConsumeOkEvent) events.get(1);
283+
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
280284
assertThat(events.get(2), instanceOf(ConsumeOkEvent.class));
285+
consumeOkEvent = (ConsumeOkEvent) events.get(2);
286+
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
281287
assertSame(events.get(3), eventRef.get());
282288
assertThat(events.get(4), instanceOf(AsyncConsumerRestartedEvent.class));
283289
assertThat(events.get(5), instanceOf(ConsumeOkEvent.class));

0 commit comments

Comments
 (0)