Skip to content

Commit c7897ed

Browse files
authored
GH-891 Introduced MultiRabbit to handle multiple brokers (#1111)
* GH-891 Introduced MultiRabbit for handle multiple brokers * GH-891 Fixed formatting and conventions * GH-891 Avoid premature load of factory beans and some minor refactoring * GH-891 Introduced tests for SpringMultirabbit * GH-891 Modified BPP to return list of Declarables created * GH-891 Minor refactoring * GH-891 Updated copyright year * GH-891 Introduced default setAdminsThatShouldDeclare() into Declarable * GH-891 Avoiding premature instantiation of RabbitAdmin by providing the bean name instead * GH-891 Changed to non-default * GH-891 Removed MultiRabbitBootstrapConfiguration and its test
1 parent 4529429 commit c7897ed

File tree

11 files changed

+719
-42
lines changed

11 files changed

+719
-42
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AbstractDeclarable.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,7 @@ public void setIgnoreDeclarationExceptions(boolean ignoreDeclarationExceptions)
9494
this.ignoreDeclarationExceptions = ignoreDeclarationExceptions;
9595
}
9696

97-
/**
98-
* The {@code AmqpAdmin}s that should declare this object; default is
99-
* all admins.
100-
* <br><br>A null argument, or an array/varArg with a single null argument, clears the collection
101-
* ({@code setAdminsThatShouldDeclare((AmqpAdmin) null)} or
102-
* {@code setAdminsThatShouldDeclare((AmqpAdmin[]) null)}). Clearing the collection resets
103-
* the behavior such that all admins will declare the object.
104-
* @param adminArgs The admins.
105-
*/
97+
@Override
10698
public void setAdminsThatShouldDeclare(Object... adminArgs) {
10799
Collection<Object> admins = new ArrayList<Object>();
108100
if (adminArgs != null) {

spring-amqp/src/main/java/org/springframework/amqp/core/Declarable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ public interface Declarable {
5252
*/
5353
boolean isIgnoreDeclarationExceptions();
5454

55+
/**
56+
* The {@code AmqpAdmin}s that should declare this object; default is
57+
* all admins.
58+
* <br><br>A null argument, or an array/varArg with a single null argument, clears the collection
59+
* ({@code setAdminsThatShouldDeclare((AmqpAdmin) null)} or
60+
* {@code setAdminsThatShouldDeclare((AmqpAdmin[]) null)}). Clearing the collection resets
61+
* the behavior such that all admins will declare the object.
62+
* @param adminArgs The admins.
63+
*/
64+
void setAdminsThatShouldDeclare(Object... adminArgs);
65+
5566
/**
5667
* Add an argument to the declarable.
5768
* @param name the argument name.

spring-rabbit-test/src/main/java/org/springframework/amqp/rabbit/test/RabbitListenerTestHarness.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.amqp.rabbit.test;
1818

19+
import java.util.Collection;
1920
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.concurrent.BlockingQueue;
@@ -30,6 +31,7 @@
3031
import org.mockito.Mockito;
3132

3233
import org.springframework.amqp.AmqpException;
34+
import org.springframework.amqp.core.Declarable;
3335
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
3436
import org.springframework.amqp.rabbit.annotation.RabbitListener;
3537
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
@@ -73,8 +75,8 @@ public RabbitListenerTestHarness(AnnotationMetadata importMetadata) {
7375
}
7476

7577
@Override
76-
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
77-
Object target, String beanName) {
78+
protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint endpoint,
79+
RabbitListener rabbitListener, Object bean, Object target, String beanName) {
7880

7981
Object proxy = bean;
8082
String id = rabbitListener.id();
@@ -102,7 +104,7 @@ protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitList
102104
else {
103105
logger.info("The test harness can only proxy @RabbitListeners with an 'id' attribute");
104106
}
105-
super.processListener(endpoint, rabbitListener, proxy, target, beanName); // NOSONAR proxy is not null
107+
return super.processListener(endpoint, rabbitListener, proxy, target, beanName); // NOSONAR proxy is not null
106108
}
107109

108110
public InvocationData getNextInvocationDataFor(String id, long wait, TimeUnit unit) throws InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.annotation;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.Collection;
21+
22+
import org.springframework.amqp.core.Declarable;
23+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
24+
import org.springframework.util.StringUtils;
25+
26+
/**
27+
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that associates the
28+
* proper RabbitAdmin to the beans of Exchanges, Queues, and Bindings after they are
29+
* created.
30+
* <p>
31+
* This processing restricts the {@link RabbitAdmin} according to the related
32+
* configuration, preventing the server from automatic binding non-related structures.
33+
*
34+
* @author Wander Costa
35+
*/
36+
public class MultiRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {
37+
38+
public static final String CONNECTION_FACTORY_BEAN_NAME = "multiRabbitConnectionFactory";
39+
40+
public static final String CONNECTION_FACTORY_CREATOR_BEAN_NAME = "rabbitConnectionFactoryCreator";
41+
42+
private static final String DEFAULT_RABBIT_ADMIN_BEAN_NAME = "defaultRabbitAdmin";
43+
44+
private static final String RABBIT_ADMIN_SUFFIX = "-admin";
45+
46+
@Override
47+
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method,
48+
Object bean, String beanName) {
49+
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListener, method, bean, beanName);
50+
final String rabbitAdmin = resolveMultiRabbitAdminName(rabbitListener);
51+
for (final Declarable declarable : declarables) {
52+
if (declarable.getDeclaringAdmins().isEmpty()) {
53+
declarable.setAdminsThatShouldDeclare(rabbitAdmin);
54+
}
55+
}
56+
return declarables;
57+
}
58+
59+
/**
60+
* Resolves the name of the RabbitAdmin bean based on the RabbitListener, or falls back to
61+
* the default RabbitAdmin name provided by MultiRabbit.
62+
*
63+
* @param rabbitListener The RabbitListener to process the name from.
64+
* @return The name of the RabbitAdmin bean.
65+
*/
66+
protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
67+
String admin = super.resolveExpressionAsString(rabbitListener.admin(), "admin");
68+
if (!StringUtils.hasText(admin) && StringUtils.hasText(rabbitListener.containerFactory())) {
69+
admin = rabbitListener.containerFactory()
70+
+ MultiRabbitListenerAnnotationBeanPostProcessor.RABBIT_ADMIN_SUFFIX;
71+
}
72+
if (!StringUtils.hasText(admin)) {
73+
admin = MultiRabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_ADMIN_BEAN_NAME;
74+
}
75+
return admin;
76+
}
77+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.amqp.core.Base64UrlNamingStrategy;
4242
import org.springframework.amqp.core.Binding;
4343
import org.springframework.amqp.core.Binding.DestinationType;
44+
import org.springframework.amqp.core.Declarable;
4445
import org.springframework.amqp.core.ExchangeBuilder;
4546
import org.springframework.amqp.core.ExchangeTypes;
4647
import org.springframework.amqp.core.Queue;
@@ -362,11 +363,12 @@ private void processMultiMethodListeners(RabbitListener[] classLevelListeners, M
362363
}
363364
}
364365

365-
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
366+
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method, Object bean,
367+
String beanName) {
366368
Method methodToUse = checkProxy(method, bean);
367369
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
368370
endpoint.setMethod(methodToUse);
369-
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
371+
return processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
370372
}
371373

372374
private Method checkProxy(Method methodArg, Object bean) {
@@ -401,13 +403,14 @@ private Method checkProxy(Method methodArg, Object bean) {
401403
return method;
402404
}
403405

404-
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
405-
Object target, String beanName) {
406+
protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint endpoint,
407+
RabbitListener rabbitListener, Object bean, Object target, String beanName) {
406408

409+
final List<Declarable> declarables = new ArrayList<>();
407410
endpoint.setBean(bean);
408411
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
409412
endpoint.setId(getEndpointId(rabbitListener));
410-
endpoint.setQueueNames(resolveQueues(rabbitListener));
413+
endpoint.setQueueNames(resolveQueues(rabbitListener, declarables));
411414
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
412415
endpoint.setBeanFactory(this.beanFactory);
413416
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
@@ -456,6 +459,7 @@ else if (errorHandler instanceof String) {
456459
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);
457460

458461
this.registrar.registerEndpoint(endpoint, factory);
462+
return declarables;
459463
}
460464

461465
private void resolveAckMode(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener) {
@@ -562,7 +566,7 @@ private String getEndpointId(RabbitListener rabbitListener) {
562566
}
563567
}
564568

565-
private String[] resolveQueues(RabbitListener rabbitListener) {
569+
private String[] resolveQueues(RabbitListener rabbitListener, Collection<Declarable> declarables) {
566570
String[] queues = rabbitListener.queues();
567571
QueueBinding[] bindings = rabbitListener.bindings();
568572
org.springframework.amqp.rabbit.annotation.Queue[] queuesToDeclare = rabbitListener.queuesToDeclare();
@@ -578,15 +582,15 @@ private String[] resolveQueues(RabbitListener rabbitListener) {
578582
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
579583
}
580584
for (int i = 0; i < queuesToDeclare.length; i++) {
581-
result.add(declareQueue(queuesToDeclare[i]));
585+
result.add(declareQueue(queuesToDeclare[i], declarables));
582586
}
583587
}
584588
if (bindings.length > 0) {
585589
if (queues.length > 0 || queuesToDeclare.length > 0) {
586590
throw new BeanInitializationException(
587591
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
588592
}
589-
return registerBeansForDeclaration(rabbitListener);
593+
return registerBeansForDeclaration(rabbitListener, declarables);
590594
}
591595
return result.toArray(new String[result.size()]);
592596
}
@@ -618,19 +622,20 @@ else if (resolvedValueToUse instanceof Iterable) {
618622
}
619623
}
620624

621-
private String[] registerBeansForDeclaration(RabbitListener rabbitListener) {
625+
private String[] registerBeansForDeclaration(RabbitListener rabbitListener, Collection<Declarable> declarables) {
622626
List<String> queues = new ArrayList<String>();
623627
if (this.beanFactory instanceof ConfigurableBeanFactory) {
624628
for (QueueBinding binding : rabbitListener.bindings()) {
625-
String queueName = declareQueue(binding.value());
629+
String queueName = declareQueue(binding.value(), declarables);
626630
queues.add(queueName);
627-
declareExchangeAndBinding(binding, queueName);
631+
declareExchangeAndBinding(binding, queueName, declarables);
628632
}
629633
}
630634
return queues.toArray(new String[queues.size()]);
631635
}
632636

633-
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue) {
637+
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
638+
Collection<Declarable> declarables) {
634639
String queueName = (String) resolveExpression(bindingQueue.value());
635640
boolean isAnonymous = false;
636641
if (!StringUtils.hasText(queueName)) {
@@ -649,10 +654,11 @@ private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bin
649654
queue.setAdminsThatShouldDeclare((Object[]) bindingQueue.admins());
650655
}
651656
queue.setShouldDeclare(resolveExpressionAsBoolean(bindingQueue.declare()));
657+
declarables.add(queue);
652658
return queueName;
653659
}
654660

655-
private void declareExchangeAndBinding(QueueBinding binding, String queueName) {
661+
private void declareExchangeAndBinding(QueueBinding binding, String queueName, Collection<Declarable> declarables) {
656662
org.springframework.amqp.rabbit.annotation.Exchange bindingExchange = binding.exchange();
657663
String exchangeName = resolveExpressionAsString(bindingExchange.value(), "@Exchange.exchange");
658664
Assert.isTrue(StringUtils.hasText(exchangeName), () -> "Exchange name required; binding queue " + queueName);
@@ -697,10 +703,12 @@ private void declareExchangeAndBinding(QueueBinding binding, String queueName) {
697703

698704
((ConfigurableBeanFactory) this.beanFactory)
699705
.registerSingleton(exchangeName + ++this.increment, exchange);
700-
registerBindings(binding, queueName, exchangeName, exchangeType);
706+
registerBindings(binding, queueName, exchangeName, exchangeType, declarables);
707+
declarables.add(exchange);
701708
}
702709

703-
private void registerBindings(QueueBinding binding, String queueName, String exchangeName, String exchangeType) {
710+
private void registerBindings(QueueBinding binding, String queueName, String exchangeName, String exchangeType,
711+
Collection<Declarable> declarables) {
704712
final List<String> routingKeys;
705713
if (exchangeType.equals(ExchangeTypes.FANOUT) || binding.key().length == 0) {
706714
routingKeys = Collections.singletonList("");
@@ -725,6 +733,7 @@ private void registerBindings(QueueBinding binding, String queueName, String exc
725733
}
726734
((ConfigurableBeanFactory) this.beanFactory)
727735
.registerSingleton(exchangeName + "." + queueName + ++this.increment, actualBinding);
736+
declarables.add(actualBinding);
728737
}
729738
}
730739

@@ -815,7 +824,7 @@ else if (resolved instanceof String) {
815824
}
816825
}
817826

818-
private String resolveExpressionAsString(String value, String attribute) {
827+
protected String resolveExpressionAsString(String value, String attribute) {
819828
Object resolved = resolveExpression(value);
820829
if (resolved instanceof String) {
821830
return (String) resolved;

0 commit comments

Comments
 (0)