Skip to content

Commit 4167e93

Browse files
garyrussellartembilan
authored andcommitted
GH-1026: Fix Delay with CacheMode.CONNECTION
Fixes #1026 When using a `channelCheckoutTimeout` with `CacheModeConnection`, we incorrectly spin waiting for a connection until the timeout expires. We should only wait for a connection if the limit is exceeded. **cherry-pick to all supported** (cherry picked from commit dde7a37) # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java
1 parent af76b70 commit 4167e93

File tree

2 files changed

+33
-9
lines changed

2 files changed

+33
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -638,15 +638,17 @@ public final Connection createConnection() throws AmqpException {
638638
else if (this.cacheMode == CacheMode.CONNECTION) {
639639
ChannelCachingConnectionProxy connection = findIdleConnection();
640640
long now = System.currentTimeMillis();
641-
while (connection == null && System.currentTimeMillis() - now < this.channelCheckoutTimeout) {
642-
if (countOpenConnections() >= this.connectionLimit) {
643-
try {
644-
this.connectionMonitor.wait(this.channelCheckoutTimeout);
645-
connection = findIdleConnection();
646-
}
647-
catch (InterruptedException e) {
648-
Thread.currentThread().interrupt();
649-
throw new AmqpException("Interrupted while waiting for a connection", e);
641+
if (connection == null && countOpenConnections() >= this.connectionLimit) {
642+
while (connection == null && System.currentTimeMillis() - now < this.channelCheckoutTimeout) {
643+
if (countOpenConnections() >= this.connectionLimit) {
644+
try {
645+
this.connectionMonitor.wait(this.channelCheckoutTimeout);
646+
connection = findIdleConnection();
647+
}
648+
catch (InterruptedException e) {
649+
Thread.currentThread().interrupt();
650+
throw new AmqpException("Interrupted while waiting for a connection", e);
651+
}
650652
}
651653
}
652654
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616

1717
package org.springframework.amqp.rabbit.connection;
1818

19+
import static org.hamcrest.Matchers.lessThan;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertNotSame;
2223
import static org.junit.Assert.assertNull;
2324
import static org.junit.Assert.assertSame;
25+
import static org.junit.Assert.assertThat;
2426
import static org.junit.Assert.assertTrue;
2527
import static org.junit.Assert.fail;
2628
import static org.mockito.AdditionalMatchers.aryEq;
2729
import static org.mockito.ArgumentMatchers.any;
2830
import static org.mockito.ArgumentMatchers.anyInt;
2931
import static org.mockito.ArgumentMatchers.anyString;
3032
import static org.mockito.ArgumentMatchers.isNull;
33+
import static org.mockito.BDDMockito.given;
3134
import static org.mockito.BDDMockito.willAnswer;
3235
import static org.mockito.Mockito.atLeastOnce;
3336
import static org.mockito.Mockito.doAnswer;
@@ -1654,4 +1657,23 @@ public void testReturnsNormalCloseDeferredClose() throws Exception {
16541657
Thread.sleep(6000);
16551658
}
16561659

1660+
@Test
1661+
public void testFirstConnectionDoesntWait() throws IOException, TimeoutException {
1662+
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
1663+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
1664+
Channel mockChannel = mock(Channel.class);
1665+
1666+
given(mockConnectionFactory.newConnection((ExecutorService) isNull(), anyString())).willReturn(mockConnection);
1667+
given(mockConnection.createChannel()).willReturn(mockChannel);
1668+
given(mockChannel.isOpen()).willReturn(true);
1669+
given(mockConnection.isOpen()).willReturn(true);
1670+
1671+
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1672+
ccf.setCacheMode(CacheMode.CONNECTION);
1673+
ccf.setChannelCheckoutTimeout(60000);
1674+
long t1 = System.currentTimeMillis();
1675+
ccf.createConnection();
1676+
assertThat(System.currentTimeMillis() - t1, lessThan(30_000L));
1677+
}
1678+
16571679
}

0 commit comments

Comments
 (0)