Skip to content

Commit 8e615d0

Browse files
garyrussellartembilan
authored andcommitted
GH-1198: Support AddressResolver
Resolves #1198 **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java
1 parent 2496b53 commit 8e615d0

File tree

7 files changed

+120
-4
lines changed

7 files changed

+120
-4
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {
4242

4343
private static final String SHUFFLE_ADDRESSES = "shuffle-addresses";
4444

45+
private static final String ADDRESS_RESOLVER = "address-resolver";
46+
4547
private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host";
4648

4749
private static final String USER_ATTRIBUTE = "username";
@@ -101,6 +103,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
101103
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, EXECUTOR_ATTRIBUTE);
102104
NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES);
103105
NamespaceUtils.setValueIfAttributeDefined(builder, element, SHUFFLE_ADDRESSES);
106+
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, ADDRESS_RESOLVER);
104107
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_CONFIRMS);
105108
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_RETURNS);
106109
NamespaceUtils.setValueIfAttributeDefined(builder, element, REQUESTED_HEARTBEAT, "requestedHeartBeat");

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.util.StringUtils;
5252

5353
import com.rabbitmq.client.Address;
54+
import com.rabbitmq.client.AddressResolver;
5455
import com.rabbitmq.client.BlockedListener;
5556
import com.rabbitmq.client.Recoverable;
5657
import com.rabbitmq.client.RecoveryListener;
@@ -122,6 +123,8 @@ public void handleRecovery(Recoverable recoverable) {
122123

123124
private ApplicationEventPublisher applicationEventPublisher;
124125

126+
private AddressResolver addressResolver;
127+
125128
private volatile boolean contextStopped;
126129

127130
/**
@@ -217,6 +220,16 @@ public void setConnectionThreadFactory(ThreadFactory threadFactory) {
217220
this.rabbitConnectionFactory.setThreadFactory(threadFactory);
218221
}
219222

223+
/**
224+
* Set an {@link AddressResolver} to use when creating connections; overrides
225+
* {@link #setAddresses(String)}, {@link #setHost(String)}, and {@link #setPort(int)}.
226+
* @param addressResolver the resolver.
227+
* @since 2.1.15
228+
*/
229+
public void setAddressResolver(AddressResolver addressResolver) {
230+
this.addressResolver = addressResolver;
231+
}
232+
220233
/**
221234
* @param uri the URI
222235
* @since 1.5
@@ -292,7 +305,8 @@ public void setAddresses(String addresses) {
292305
return;
293306
}
294307
}
295-
this.logger.info("setAddresses() called with an empty value, will be using the host+port properties for connections");
308+
this.logger.info("setAddresses() called with an empty value, will be using the host+port "
309+
+ " or addressResolver properties for connections");
296310
this.addresses = null;
297311
}
298312

@@ -531,6 +545,50 @@ public void handleRecovery(Recoverable recoverable) {
531545
}
532546
}
533547

548+
private com.rabbitmq.client.Connection connect(String connectionName) throws IOException, TimeoutException {
549+
if (this.addressResolver != null) {
550+
return connectResolver(connectionName);
551+
}
552+
if (this.addresses != null) {
553+
return connectAddresses(connectionName);
554+
}
555+
else {
556+
return connectHostPort(connectionName);
557+
}
558+
}
559+
560+
private com.rabbitmq.client.Connection connectResolver(String connectionName) throws IOException, TimeoutException {
561+
if (this.logger.isInfoEnabled()) {
562+
this.logger.info("Attempting to connect with: " + this.addressResolver);
563+
}
564+
return this.rabbitConnectionFactory.newConnection(this.executorService, this.addressResolver,
565+
connectionName);
566+
}
567+
568+
private com.rabbitmq.client.Connection connectAddresses(String connectionName)
569+
throws IOException, TimeoutException {
570+
571+
List<Address> addressesToConnect = this.addresses;
572+
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
573+
List<Address> list = new ArrayList<>(addressesToConnect);
574+
Collections.shuffle(list);
575+
addressesToConnect = list;
576+
}
577+
if (this.logger.isInfoEnabled()) {
578+
this.logger.info("Attempting to connect to: " + addressesToConnect);
579+
}
580+
return this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
581+
connectionName);
582+
}
583+
584+
private com.rabbitmq.client.Connection connectHostPort(String connectionName) throws IOException, TimeoutException {
585+
if (this.logger.isInfoEnabled()) {
586+
this.logger.info("Attempting to connect to: " + this.rabbitConnectionFactory.getHost()
587+
+ ":" + this.rabbitConnectionFactory.getPort());
588+
}
589+
return this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
590+
}
591+
534592
protected final String getDefaultHostName() {
535593
String temp;
536594
try {

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.1.xsd

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,6 +1408,18 @@
14081408
]]></xsd:documentation>
14091409
</xsd:annotation>
14101410
</xsd:attribute>
1411+
<xsd:attribute name="address-resolver" type="xsd:string" use="optional">
1412+
<xsd:annotation>
1413+
<xsd:documentation><![CDATA[
1414+
An address resolver bean; overrides 'addresses' and 'host/port'.
1415+
]]></xsd:documentation>
1416+
<xsd:appinfo>
1417+
<tool:annotation kind="ref">
1418+
<tool:expected-type type="com.rabbitmq.client.AddressResolver" />
1419+
</tool:annotation>
1420+
</xsd:appinfo>
1421+
</xsd:annotation>
1422+
</xsd:attribute>
14111423
<xsd:attribute name="username" type="xsd:string" use="optional">
14121424
<xsd:annotation>
14131425
<xsd:documentation><![CDATA[

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -129,4 +129,10 @@ public void testMultiHost() {
129129
"rabbitConnectionFactory.threadFactory"));
130130
}
131131

132+
@Test
133+
void testResolver() {
134+
CachingConnectionFactory connectionFactory = beanFactory.getBean("resolved", CachingConnectionFactory.class);
135+
assertThat(TestUtils.getPropertyValue(connectionFactory, "addressResolver"))
136+
.isSameAs(this.beanFactory.getBean("resolver"));
137+
}
132138
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.springframework.test.util.ReflectionTestUtils;
9090

9191
import com.rabbitmq.client.Address;
92+
import com.rabbitmq.client.AddressResolver;
9293
import com.rabbitmq.client.Channel;
9394
import com.rabbitmq.client.ConfirmListener;
9495
import com.rabbitmq.client.ConnectionFactory;
@@ -1895,4 +1896,27 @@ public void testShuffle() throws IOException, TimeoutException {
18951896
assertThat(firstAddress).containsExactly("host1", "host2", "host3");
18961897
}
18971898

1899+
@Test
1900+
void testResolver() throws Exception {
1901+
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
1902+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
1903+
Channel mockChannel = mock(Channel.class);
1904+
1905+
AddressResolver resolver = () -> Collections.singletonList(Address.parseAddress("foo:5672"));
1906+
when(mockConnectionFactory.newConnection(any(ExecutorService.class), eq(resolver), anyString()))
1907+
.thenReturn(mockConnection);
1908+
when(mockConnection.createChannel()).thenReturn(mockChannel);
1909+
when(mockChannel.isOpen()).thenReturn(true);
1910+
when(mockConnection.isOpen()).thenReturn(true);
1911+
1912+
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1913+
ccf.setExecutor(mock(ExecutorService.class));
1914+
ccf.setAddressResolver(resolver);
1915+
Connection con = ccf.createConnection();
1916+
assertThat(con).isNotNull();
1917+
assertThat(TestUtils.getPropertyValue(con, "target", SimpleConnection.class).getDelegate())
1918+
.isEqualTo(mockConnection);
1919+
verify(mockConnectionFactory).newConnection(any(ExecutorService.class), eq(resolver), anyString());
1920+
}
1921+
18981922
}

spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests-context.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@
2020

2121
<rabbit:connection-factory id="native" connection-factory="connectionFactory" channel-cache-size="10" />
2222

23-
<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory"/>
23+
24+
<rabbit:connection-factory id="resolved" connection-factory="connectionFactory"
25+
address-resolver="resolver"/>
26+
27+
<bean id="resolver" class="com.rabbitmq.client.ListAddressResolver">
28+
<constructor-arg value="null"/>
29+
</bean>
30+
31+
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean"/>
2432

2533
<rabbit:connection-factory id="withExecutor" host="foo" virtual-host="/bar"
2634
connection-cache-size="10" port="6888" username="user" password="password"

src/reference/asciidoc/amqp.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,11 @@ The following example with a custom thread factory that prefixes thread names wi
378378
----
379379
====
380380

381+
===== AddressResolver
382+
383+
Starting with version 2.1.15, you can now use an `AddressResover` to resolve the connection address(es).
384+
This will override any settings of the `addresses` and `host/port` properties.
385+
381386
===== Naming Connections
382387

383388
Starting with version 1.7, a `ConnectionNameStrategy` is provided for the injection into the `AbstractionConnectionFactory`.

0 commit comments

Comments
 (0)