Skip to content

Commit e09ca96

Browse files
artembilangaryrussell
authored andcommitted
GH-794: add after-receive-post-processors for XML
Fixes #794 * Expose an `after-receive-post-processors` in the `<rabbit:listener-container>` component * Upgrade to the latest dependencies
1 parent 4cf27c9 commit e09ca96

File tree

5 files changed

+87
-30
lines changed

5 files changed

+87
-30
lines changed

build.gradle

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlinVersion = '1.2.51'
2+
ext.kotlinVersion = '1.2.71'
33
repositories {
44
maven { url 'https://repo.spring.io/plugins-release' }
55
}
@@ -71,22 +71,22 @@ subprojects { subproject ->
7171

7272
ext {
7373
assertjVersion = '3.9.1'
74-
assertkVersion = '0.10'
74+
assertkVersion = '0.12'
7575
commonsHttpClientVersion = '4.5.5'
7676
googleJsr305Version = '3.0.2'
7777
hamcrestVersion = '1.3'
78-
jackson2Version = '2.9.5'
78+
jackson2Version = '2.9.6'
7979
junit4Version = '4.12'
80-
junitJupiterVersion = '5.1.1'
81-
junitPlatformVersion = '1.1.1'
80+
junitJupiterVersion = '5.3.1'
81+
junitPlatformVersion = '1.3.1'
8282
log4jVersion = '2.11.0'
8383
logbackVersion = '1.2.3'
84-
mockitoVersion = '2.18.0'
84+
mockitoVersion = '2.22.0'
8585
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.4.1'
8686
rabbitmqHttpClientVersion = '2.1.0.RELEASE'
87-
reactorVersion = '3.1.6.RELEASE'
87+
reactorVersion = '3.2.1.RELEASE'
8888

89-
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.1.0.RELEASE'
89+
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.1.1.BUILD-SNAPSHOT'
9090

9191
springRetryVersion = '1.2.2.RELEASE'
9292
}
@@ -272,12 +272,11 @@ project('spring-rabbit') {
272272
compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional)
273273

274274
testCompile project(":spring-rabbit-junit")
275-
testCompile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
276-
testCompile("com.willowtreeapps.assertk:assertk:$assertkVersion") {
277-
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-reflect'
278-
}
275+
testCompile("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
276+
testCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion"
279277

280278
testRuntime "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
279+
281280
testRuntime "org.springframework:spring-web:$springVersion"
282281
testRuntime "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
283282
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.amqp.rabbit.config;
1818

19+
import java.util.List;
20+
1921
import org.w3c.dom.Element;
2022

2123
import org.springframework.amqp.core.AcknowledgeMode;
2224
import org.springframework.beans.factory.config.BeanDefinition;
2325
import org.springframework.beans.factory.config.RuntimeBeanReference;
2426
import org.springframework.beans.factory.config.TypedStringValue;
27+
import org.springframework.beans.factory.support.ManagedList;
2528
import org.springframework.beans.factory.support.RootBeanDefinition;
2629
import org.springframework.beans.factory.xml.ParserContext;
2730
import org.springframework.util.StringUtils;
@@ -109,6 +112,9 @@ public final class RabbitNamespaceUtils {
109112

110113
private static final String TYPE = "type";
111114

115+
private static final String AFTER_RECEIVE_POST_PROCESSORS = "after-receive-post-processors";
116+
117+
112118
private RabbitNamespaceUtils() {
113119
super();
114120
}
@@ -174,7 +180,8 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
174180

175181
String minConsecutiveMessages = containerEle.getAttribute(MIN_CONSECUTIVE_ACTIVE_ATTRIBUTE);
176182
if (StringUtils.hasText(minConsecutiveMessages)) {
177-
containerDef.getPropertyValues().add("consecutiveActiveTrigger", new TypedStringValue(minConsecutiveMessages));
183+
containerDef.getPropertyValues().add("consecutiveActiveTrigger",
184+
new TypedStringValue(minConsecutiveMessages));
178185
}
179186

180187
String minConsecutiveIdle = containerEle.getAttribute(MIN_CONSECUTIVE_IDLE_ATTRIBUTE);
@@ -270,12 +277,14 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
270277

271278
String failedDeclarationRetryInterval = containerEle.getAttribute(FAILED_DECLARATION_RETRY_INTERVAL);
272279
if (StringUtils.hasText(failedDeclarationRetryInterval)) {
273-
containerDef.getPropertyValues().add("failedDeclarationRetryInterval", new TypedStringValue(failedDeclarationRetryInterval));
280+
containerDef.getPropertyValues().add("failedDeclarationRetryInterval",
281+
new TypedStringValue(failedDeclarationRetryInterval));
274282
}
275283

276284
String retryDeclarationInterval = containerEle.getAttribute(MISSING_QUEUE_RETRY_INTERVAL);
277285
if (StringUtils.hasText(retryDeclarationInterval)) {
278-
containerDef.getPropertyValues().add("retryDeclarationInterval", new TypedStringValue(retryDeclarationInterval));
286+
containerDef.getPropertyValues().add("retryDeclarationInterval",
287+
new TypedStringValue(retryDeclarationInterval));
279288
}
280289

281290
String consumerTagStrategy = containerEle.getAttribute(CONSUMER_TAG_STRATEGY);
@@ -304,6 +313,16 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
304313
containerDef.getPropertyValues().add("monitorInterval", new TypedStringValue(monitorInterval));
305314
}
306315

316+
String afterReceivePostProcessors = containerEle.getAttribute(AFTER_RECEIVE_POST_PROCESSORS);
317+
if (StringUtils.hasText(afterReceivePostProcessors)) {
318+
String[] names = StringUtils.delimitedListToStringArray(afterReceivePostProcessors, ",", " ");
319+
List<RuntimeBeanReference> values = new ManagedList<>();
320+
for (String name : names) {
321+
values.add(new RuntimeBeanReference(name));
322+
}
323+
containerDef.getPropertyValues().add("afterReceivePostProcessors", values);
324+
}
325+
307326
return containerDef;
308327
}
309328

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.1.xsd

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,18 @@
544544
</xsd:appinfo>
545545
</xsd:annotation>
546546
</xsd:attribute>
547+
<xsd:attribute name="after-receive-post-processors" type="xsd:string">
548+
<xsd:annotation>
549+
<xsd:documentation><![CDATA[
550+
The MessagePostProcessors (bean references) for this listener as a comma-separated list.
551+
]]></xsd:documentation>
552+
<xsd:appinfo>
553+
<tool:annotation kind="ref">
554+
<tool:expected-type type="org.springframework.amqp.core.MessagePostProcessor" />
555+
</tool:annotation>
556+
</xsd:appinfo>
557+
</xsd:annotation>
558+
</xsd:attribute>
547559
</xsd:extension>
548560
</xsd:complexContent>
549561
</xsd:complexType>

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2017 the original author or authors.
2+
* Copyright 2010-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.junit.Test;
3636

3737
import org.springframework.amqp.core.AcknowledgeMode;
38+
import org.springframework.amqp.core.MessagePostProcessor;
3839
import org.springframework.amqp.core.Queue;
3940
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
4041
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
@@ -62,16 +63,17 @@ public class ListenerContainerParserTests {
6263
private DefaultListableBeanFactory beanFactory;
6364

6465
@Before
65-
public void setUp() throws Exception {
66+
public void setUp() {
6667
beanFactory = new DefaultListableBeanFactory();
6768
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(beanFactory);
6869
reader.loadBeanDefinitions(new ClassPathResource(getClass().getSimpleName() + "-context.xml", getClass()));
6970
beanFactory.setBeanExpressionResolver(new StandardBeanExpressionResolver());
7071
}
7172

7273
@Test
73-
public void testParseWithQueueNames() throws Exception {
74-
SimpleMessageListenerContainer container = beanFactory.getBean("container1", SimpleMessageListenerContainer.class);
74+
public void testParseWithQueueNames() {
75+
SimpleMessageListenerContainer container =
76+
this.beanFactory.getBean("container1", SimpleMessageListenerContainer.class);
7577
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
7678
assertEquals(beanFactory.getBean(ConnectionFactory.class), container.getConnectionFactory());
7779
assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());
@@ -111,7 +113,7 @@ public void testParseWithQueueNames() throws Exception {
111113
}
112114

113115
@Test
114-
public void testParseWithDirect() throws Exception {
116+
public void testParseWithDirect() {
115117
DirectMessageListenerContainer container = beanFactory.getBean("direct1", DirectMessageListenerContainer.class);
116118
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
117119
assertEquals(beanFactory.getBean(ConnectionFactory.class), container.getConnectionFactory());
@@ -146,7 +148,7 @@ public void testParseWithDirect() throws Exception {
146148
}
147149

148150
@Test
149-
public void testParseWithQueues() throws Exception {
151+
public void testParseWithQueues() {
150152
SimpleMessageListenerContainer container = beanFactory.getBean("container2", SimpleMessageListenerContainer.class);
151153
Queue queue = beanFactory.getBean("bar", Queue.class);
152154
assertEquals("[foo, " + queue.getName() + "]", Arrays.asList(container.getQueueNames()).toString());
@@ -155,7 +157,7 @@ public void testParseWithQueues() throws Exception {
155157
}
156158

157159
@Test
158-
public void testParseWithAdviceChain() throws Exception {
160+
public void testParseWithAdviceChain() {
159161
SimpleMessageListenerContainer container = beanFactory.getBean("container3", SimpleMessageListenerContainer.class);
160162
Object adviceChain = ReflectionTestUtils.getField(container, "adviceChain");
161163
assertNotNull(adviceChain);
@@ -164,35 +166,35 @@ public void testParseWithAdviceChain() throws Exception {
164166
}
165167

166168
@Test
167-
public void testParseWithDefaults() throws Exception {
169+
public void testParseWithDefaults() {
168170
SimpleMessageListenerContainer container = beanFactory.getBean("container4", SimpleMessageListenerContainer.class);
169171
assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
170172
assertEquals(true, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
171173
}
172174

173175
@Test
174-
public void testParseWithDefaultQueueRejectedFalse() throws Exception {
176+
public void testParseWithDefaultQueueRejectedFalse() {
175177
SimpleMessageListenerContainer container = beanFactory.getBean("container5", SimpleMessageListenerContainer.class);
176178
assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
177179
assertEquals(false, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
178180
assertFalse(container.isChannelTransacted());
179181
}
180182

181183
@Test
182-
public void testParseWithTx() throws Exception {
184+
public void testParseWithTx() {
183185
SimpleMessageListenerContainer container = beanFactory.getBean("container6", SimpleMessageListenerContainer.class);
184186
assertTrue(container.isChannelTransacted());
185187
assertEquals(5, ReflectionTestUtils.getField(container, "txSize"));
186188
}
187189

188190
@Test
189-
public void testNamedListeners() throws Exception {
191+
public void testNamedListeners() {
190192
beanFactory.getBean("testListener1", SimpleMessageListenerContainer.class);
191193
beanFactory.getBean("testListener2", SimpleMessageListenerContainer.class);
192194
}
193195

194196
@Test
195-
public void testAnonListeners() throws Exception {
197+
public void testAnonListeners() {
196198
beanFactory.getBean("org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0",
197199
SimpleMessageListenerContainer.class);
198200
beanFactory.getBean("org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1",
@@ -203,7 +205,7 @@ public void testAnonListeners() throws Exception {
203205
}
204206

205207
@Test
206-
public void testAnonEverything() throws Exception {
208+
public void testAnonEverything() {
207209
SimpleMessageListenerContainer container = beanFactory.getBean(
208210
"org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#3",
209211
SimpleMessageListenerContainer.class);
@@ -217,7 +219,7 @@ public void testAnonEverything() throws Exception {
217219
}
218220

219221
@Test
220-
public void testAnonParent() throws Exception {
222+
public void testAnonParent() {
221223
beanFactory.getBean("anonParentL1", SimpleMessageListenerContainer.class);
222224
beanFactory.getBean("anonParentL2", SimpleMessageListenerContainer.class);
223225
}
@@ -234,6 +236,21 @@ public void testIncompatibleTxAtts() {
234236
}
235237
}
236238

239+
@Test
240+
@SuppressWarnings("unchecked")
241+
public void testParseMessagePostProcessor() {
242+
SimpleMessageListenerContainer listenerContainer =
243+
this.beanFactory.getBean("testMessagePostProcessor", SimpleMessageListenerContainer.class);
244+
245+
Collection<MessagePostProcessor> messagePostProcessors =
246+
TestUtils.getPropertyValue(listenerContainer, "afterReceivePostProcessors", Collection.class);
247+
248+
assertFalse(messagePostProcessors.isEmpty());
249+
assertThat(messagePostProcessors,
250+
contains(this.beanFactory.getBean("unzipPostProcessor"),
251+
this.beanFactory.getBean("gUnzipPostProcessor")));
252+
}
253+
237254
static class TestBean {
238255

239256
public void handle(String s) {

spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,14 @@
120120

121121
<task:executor id="exec" />
122122

123+
<rabbit:listener-container connection-factory="connectionFactory" auto-startup="false"
124+
after-receive-post-processors="unzipPostProcessor, gUnzipPostProcessor">
125+
<rabbit:listener id="testMessagePostProcessor" queues="foo" ref="testBean" method="handle" />
126+
</rabbit:listener-container>
127+
128+
129+
<bean id="unzipPostProcessor" class="org.springframework.amqp.support.postprocessor.UnzipPostProcessor" />
130+
131+
<bean id="gUnzipPostProcessor" class="org.springframework.amqp.support.postprocessor.GUnzipPostProcessor" />
132+
123133
</beans>

0 commit comments

Comments
 (0)