Skip to content

Commit 24783cb

Browse files
artembilangaryrussell
authored andcommitted
GH-841: Close executor in the PublisherConChannel (#843)
* GH-841: Close executor in the PublisherConChannel Fixes #841 **cherry-pick to 2.0.x & 1.7.x** * * `.mapToInt(Map::size)` instead of two mappers * * Rely in the `PublisherCallbackChannelImpl` on the provided executor * Rename `deferredCloseExecutor` to the `channelsExecutor` in the `CachingConnectionFactory` and use it for the `PublisherCallbackChannelImpl` instances * Deprecate a ctor in the `PublisherCallbackChannelImpl` without an executor * Polishing tests according deprecation * Fix race condition in the `DirectMessageListenerContainerMockTests`
1 parent dcce036 commit 24783cb

File tree

8 files changed

+119
-68
lines changed

8 files changed

+119
-68
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.context.ApplicationEventPublisherAware;
4343
import org.springframework.context.ApplicationListener;
4444
import org.springframework.context.event.ContextClosedEvent;
45+
import org.springframework.lang.Nullable;
4546
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4647
import org.springframework.util.Assert;
4748
import org.springframework.util.ObjectUtils;
@@ -387,6 +388,7 @@ public void setExecutor(Executor executor) {
387388
}
388389
}
389390

391+
@Nullable
390392
protected ExecutorService getExecutorService() {
391393
return this.executorService;
392394
}
@@ -434,9 +436,12 @@ public void setBeanName(String name) {
434436
* @return the bean name or null.
435437
* @since 1.7.9
436438
*/
439+
@Nullable
437440
protected String getBeanName() {
438441
return this.beanName;
439-
} public boolean hasPublisherConnectionFactory() {
442+
}
443+
444+
public boolean hasPublisherConnectionFactory() {
440445
return this.publisherConnectionFactory != null;
441446
}
442447

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,9 @@ public enum CacheMode {
180180

181181
private volatile boolean initialized;
182182
/**
183-
* Executor used for deferred close if no explicit executor set.
183+
* Executor used for channels if no explicit executor set.
184184
*/
185-
private ExecutorService deferredCloseExecutor;
185+
private volatile ExecutorService channelsExecutor;
186186

187187
private volatile boolean stopped;
188188

@@ -638,7 +638,7 @@ private Channel doCreateBareChannel(ChannelCachingConnectionProxy connection, bo
638638
}
639639
if (this.publisherConfirms || this.publisherReturns) {
640640
if (!(channel instanceof PublisherCallbackChannelImpl)) {
641-
channel = new PublisherCallbackChannelImpl(channel, getExecutorService());
641+
channel = new PublisherCallbackChannelImpl(channel, getChannelsExecutor());
642642
}
643643
}
644644
if (channel != null) {
@@ -781,8 +781,8 @@ public final void destroy() {
781781
resetConnection();
782782
if (getContextStopped()) {
783783
this.stopped = true;
784-
if (this.deferredCloseExecutor != null) {
785-
this.deferredCloseExecutor.shutdownNow();
784+
if (this.channelsExecutor != null) {
785+
this.channelsExecutor.shutdownNow();
786786
}
787787
}
788788
}
@@ -930,25 +930,27 @@ private int countOpenConnections() {
930930
}
931931

932932
/**
933-
* Determine the executor service used to close connections.
933+
* Determine the executor service used for target channels.
934934
* @return specified executor service otherwise the default one is created and returned.
935935
* @since 1.7.9
936936
*/
937-
protected ExecutorService getDeferredCloseExecutor() {
937+
protected ExecutorService getChannelsExecutor() {
938938
if (getExecutorService() != null) {
939939
return getExecutorService();
940940
}
941-
synchronized (this.connectionMonitor) {
942-
if (this.deferredCloseExecutor == null) {
943-
final String threadPrefix =
944-
getBeanName() == null
945-
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
946-
: getBeanName();
947-
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
948-
this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory);
941+
if (this.channelsExecutor == null) {
942+
synchronized (this.connectionMonitor) {
943+
if (this.channelsExecutor == null) {
944+
final String threadPrefix =
945+
getBeanName() == null
946+
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
947+
: getBeanName();
948+
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
949+
this.channelsExecutor = Executors.newCachedThreadPool(threadPoolFactory);
950+
}
949951
}
950952
}
951-
return this.deferredCloseExecutor;
953+
return this.channelsExecutor;
952954
}
953955

954956
@Override
@@ -1231,7 +1233,7 @@ private void physicalClose() throws Exception {
12311233
}
12321234

12331235
private void asyncClose() {
1234-
ExecutorService executorService = getDeferredCloseExecutor();
1236+
ExecutorService executorService = getChannelsExecutor();
12351237
final Channel channel = CachedChannelInvocationHandler.this.target;
12361238
executorService.execute(() -> {
12371239
try {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.ConcurrentMap;
3434
import java.util.concurrent.ConcurrentSkipListMap;
3535
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.Executors;
3736
import java.util.concurrent.TimeoutException;
3837

3938
import org.apache.commons.logging.Log;
@@ -44,7 +43,6 @@
4443
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
4544
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
4645
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
47-
import org.springframework.lang.Nullable;
4846
import org.springframework.util.Assert;
4947
import org.springframework.util.StringUtils;
5048

@@ -86,14 +84,14 @@
8684
*
8785
* @author Gary Russell
8886
* @author Arnaud Cogoluègnes
87+
* @author Artem Bilan
88+
*
8989
* @since 1.0.1
9090
*
9191
*/
9292
public class PublisherCallbackChannelImpl
9393
implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
9494

95-
private static final ExecutorService DEFAULT_EXECUTOR = Executors.newSingleThreadExecutor();
96-
9795
private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
9896

9997
private final Log logger = LogFactory.getLog(this.getClass());
@@ -112,14 +110,20 @@ public class PublisherCallbackChannelImpl
112110

113111
private volatile java.util.function.Consumer<Channel> afterAckCallback;
114112

113+
/**
114+
* Create a {@link PublisherCallbackChannelImpl} instance based on the provided delegate.
115+
* @param delegate the {@link Channel} to delegate.
116+
* @deprecated since 2.2.1 in favor of {@link #PublisherCallbackChannelImpl(Channel, ExecutorService)}
117+
*/
115118
public PublisherCallbackChannelImpl(Channel delegate) {
116119
this(delegate, null);
117120
}
118121

119-
public PublisherCallbackChannelImpl(Channel delegate, @Nullable ExecutorService executor) {
122+
public PublisherCallbackChannelImpl(Channel delegate, ExecutorService executor) {
123+
Assert.notNull(executor, "'executor' must not be null");
120124
delegate.addShutdownListener(this);
121125
this.delegate = delegate;
122-
this.executor = executor != null ? executor : DEFAULT_EXECUTOR;
126+
this.executor = executor;
123127
}
124128

125129
@Override
@@ -854,8 +858,7 @@ public synchronized int getPendingConfirmsCount(Listener listener) {
854858
@Override
855859
public synchronized int getPendingConfirmsCount() {
856860
return this.pendingConfirms.values().stream()
857-
.map(m -> m.size())
858-
.mapToInt(Integer::valueOf)
861+
.mapToInt(Map::size)
859862
.sum();
860863
}
861864

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @author Dave Syer
4545
* @author Gary Russell
4646
* @author Will Droste
47+
* @author Artem Bilan
4748
*/
4849
public final class ListenerContainerPlaceholderParserTests {
4950

@@ -60,7 +61,7 @@ public void closeBeanFactory() throws Exception {
6061
if (this.context != null) {
6162
CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class);
6263
this.context.close();
63-
ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class);
64+
ExecutorService es = TestUtils.getPropertyValue(cf, "channelsExecutor", ThreadPoolExecutor.class);
6465
if (es != null) {
6566
// if it gets started make sure its terminated..
6667
assertTrue(es.isTerminated());

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -110,7 +110,6 @@
110110
import org.springframework.amqp.utils.test.TestUtils;
111111
import org.springframework.beans.DirectFieldAccessor;
112112
import org.springframework.beans.factory.BeanFactory;
113-
import org.springframework.beans.factory.DisposableBean;
114113
import org.springframework.beans.factory.annotation.Autowired;
115114
import org.springframework.context.annotation.Bean;
116115
import org.springframework.expression.common.LiteralExpression;
@@ -201,9 +200,9 @@ public void create() {
201200
}
202201

203202
@After
204-
public void cleanup() throws Exception {
203+
public void cleanup() {
205204
this.template.stop();
206-
((DisposableBean) template.getConnectionFactory()).destroy();
205+
this.connectionFactory.destroy();
207206
this.brokerIsRunning.removeTestQueues();
208207
}
209208

@@ -289,7 +288,7 @@ public void testReceiveNonBlocking() throws Exception {
289288
}
290289

291290
@Test(expected = ConsumerCancelledException.class)
292-
public void testReceiveConsumerCanceled() throws Exception {
291+
public void testReceiveConsumerCanceled() {
293292
ConnectionFactory connectionFactory = new SingleConnectionFactory("localhost", BrokerTestUtils.getPort());
294293

295294
class MockConsumer implements Consumer {
@@ -339,10 +338,12 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
339338

340339
}
341340

341+
ExecutorService executorService = Executors.newSingleThreadExecutor();
342+
342343
class MockChannel extends PublisherCallbackChannelImpl {
343344

344345
MockChannel(Channel delegate) {
345-
super(delegate);
346+
super(delegate, executorService);
346347
}
347348

348349
@Override
@@ -361,7 +362,12 @@ public String basicConsume(String queue, Consumer callback) throws IOException {
361362

362363
this.template = new RabbitTemplate(connectionFactory);
363364
this.template.setReceiveTimeout(10000);
364-
this.template.receive(ROUTE);
365+
try {
366+
this.template.receive(ROUTE);
367+
}
368+
finally {
369+
executorService.shutdown();
370+
}
365371
}
366372

367373
@Test

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public class RabbitTemplatePublisherCallbacksIntegrationTests {
103103
@Rule
104104
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);
105105

106+
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
107+
106108
private CachingConnectionFactory connectionFactory;
107109

108110
private CachingConnectionFactory connectionFactoryWithConfirmsEnabled;
@@ -153,6 +155,8 @@ public void cleanUp() {
153155
this.connectionFactoryWithConfirmsEnabled.destroy();
154156
this.connectionFactoryWithReturnsEnabled.destroy();
155157
this.brokerIsRunning.removeTestQueues();
158+
159+
this.executorService.shutdown();
156160
}
157161

158162
@Test
@@ -327,7 +331,10 @@ public void testPublisherConfirmNotReceived() throws Exception {
327331

328332
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
329333
when(mockConnection.isOpen()).thenReturn(true);
330-
doReturn(new PublisherCallbackChannelImpl(mockChannel)).when(mockConnection).createChannel();
334+
335+
doReturn(new PublisherCallbackChannelImpl(mockChannel, this.executorService))
336+
.when(mockConnection)
337+
.createChannel();
331338

332339
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
333340
ccf.setExecutor(mock(ExecutorService.class));
@@ -359,8 +366,9 @@ public void testPublisherConfirmNotReceivedMultiThreads() throws Exception {
359366

360367
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
361368
when(mockConnection.isOpen()).thenReturn(true);
362-
PublisherCallbackChannelImpl channel1 = new PublisherCallbackChannelImpl(mockChannel1);
363-
PublisherCallbackChannelImpl channel2 = new PublisherCallbackChannelImpl(mockChannel2);
369+
370+
PublisherCallbackChannelImpl channel1 = new PublisherCallbackChannelImpl(mockChannel1, this.executorService);
371+
PublisherCallbackChannelImpl channel2 = new PublisherCallbackChannelImpl(mockChannel2, this.executorService);
364372
when(mockConnection.createChannel()).thenReturn(channel1).thenReturn(channel2);
365373

366374
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
@@ -429,7 +437,10 @@ public void testPublisherConfirmNotReceivedAged() throws Exception {
429437

430438
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
431439
when(mockConnection.isOpen()).thenReturn(true);
432-
doReturn(new PublisherCallbackChannelImpl(mockChannel)).when(mockConnection).createChannel();
440+
441+
doReturn(new PublisherCallbackChannelImpl(mockChannel, this.executorService))
442+
.when(mockConnection)
443+
.createChannel();
433444

434445
final AtomicLong count = new AtomicLong();
435446
doAnswer(invocation -> count.incrementAndGet()).when(mockChannel).getNextPublishSeqNo();
@@ -469,7 +480,8 @@ public void testPublisherConfirmMultiple() throws Exception {
469480

470481
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
471482
when(mockConnection.isOpen()).thenReturn(true);
472-
PublisherCallbackChannelImpl callbackChannel = new PublisherCallbackChannelImpl(mockChannel);
483+
PublisherCallbackChannelImpl callbackChannel =
484+
new PublisherCallbackChannelImpl(mockChannel, this.executorService);
473485
when(mockConnection.createChannel()).thenReturn(callbackChannel);
474486

475487
final AtomicLong count = new AtomicLong();
@@ -508,7 +520,8 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception {
508520

509521
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
510522
when(mockConnection.isOpen()).thenReturn(true);
511-
PublisherCallbackChannelImpl callbackChannel = new PublisherCallbackChannelImpl(mockChannel);
523+
PublisherCallbackChannelImpl callbackChannel =
524+
new PublisherCallbackChannelImpl(mockChannel, this.executorService);
512525
when(mockConnection.createChannel()).thenReturn(callbackChannel);
513526

514527
final AtomicLong count = new AtomicLong();
@@ -569,7 +582,8 @@ public void testConcurrentConfirms() throws Exception {
569582

570583
when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
571584
when(mockConnection.isOpen()).thenReturn(true);
572-
final PublisherCallbackChannelImpl channel = new PublisherCallbackChannelImpl(mockChannel);
585+
final PublisherCallbackChannelImpl channel =
586+
new PublisherCallbackChannelImpl(mockChannel, this.executorService);
573587
when(mockConnection.createChannel()).thenReturn(channel);
574588

575589
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
@@ -798,7 +812,7 @@ public void testPublisherCallbackChannelImplCloseWithPending() throws Exception
798812

799813
Channel channelMock = mock(Channel.class);
800814

801-
PublisherCallbackChannelImpl channel = new PublisherCallbackChannelImpl(channelMock);
815+
PublisherCallbackChannelImpl channel = new PublisherCallbackChannelImpl(channelMock, this.executorService);
802816

803817
channel.addListener(listener);
804818

0 commit comments

Comments
 (0)