From 96300a368f7746be6cfa023d54d96cbf683a77d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 17 Sep 2021 17:41:04 -0500 Subject: [PATCH 1/2] Add RabbitStreamTemplate configuration `RabbitStreamTemplate` is provided at spring-amqp 2.4. --- .../amqp/RabbitAutoConfiguration.java | 38 +++++++++ .../autoconfigure/amqp/RabbitProperties.java | 14 ++++ .../amqp/RabbitAutoConfigurationTests.java | 84 +++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java index e1f68c7f3ba8..f12fc90161ed 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java @@ -21,6 +21,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.impl.CredentialsProvider; import com.rabbitmq.client.impl.CredentialsRefreshService; +import com.rabbitmq.stream.Environment; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; @@ -43,6 +44,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.io.ResourceLoader; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * {@link EnableAutoConfiguration Auto-configuration} for {@link RabbitTemplate}. @@ -83,6 +88,7 @@ * @author Phillip Webb * @author Artsiom Yudovin * @author Chris Bono + * @author Eddú Meléndez * @since 1.0.0 */ @Configuration(proxyBeanMethods = false) @@ -186,4 +192,36 @@ public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemp } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(RabbitStreamTemplate.class) + @ConditionalOnMissingBean(RabbitStreamTemplate.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream") + protected static class RabbitStreamTemplateConfiguration { + + @Bean + @ConditionalOnMissingBean(RabbitStreamOperations.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") + public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, + RabbitProperties properties, ObjectProvider messageConverters, + ObjectProvider streamMessageConverters, + ObjectProvider producerCustomizers) { + RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, + properties.getStream().getName()); + MessageConverter messageConverter = messageConverters.getIfUnique(); + if (messageConverter != null) { + template.setMessageConverter(messageConverter); + } + StreamMessageConverter streamMessageConverter = streamMessageConverters.getIfUnique(); + if (streamMessageConverter != null) { + template.setStreamConverter(streamMessageConverter); + } + ProducerCustomizer producerCustomizer = producerCustomizers.getIfUnique(); + if (producerCustomizer != null) { + template.setProducerCustomizer(producerCustomizer); + } + return template; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 80d140f60c03..84722f388e78 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -42,6 +42,7 @@ * @author Gary Russell * @author Artsiom Yudovin * @author Franjo Zilic + * @author Eddú Meléndez * @since 1.0.0 */ @ConfigurationProperties(prefix = "spring.rabbitmq") @@ -1193,6 +1194,11 @@ public static final class Stream { */ private String password; + /** + * Name of the stream. + */ + private String name; + public String getHost() { return this.host; } @@ -1225,6 +1231,14 @@ public void setPassword(String password) { this.password = password; } + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index db625a906a03..087e019105f8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -68,6 +68,9 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; @@ -96,6 +99,7 @@ * @author Gary Russell * @author HaiTao Zhang * @author Franjo Zilic + * @author Eddú Meléndez */ @ExtendWith(OutputCaptureExtension.class) class RabbitAutoConfigurationTests { @@ -872,6 +876,66 @@ void whenADirectContainerCustomizerIsDefinedThenItIsCalledToConfigureTheContaine .configure(any(DirectMessageListenerContainer.class))); } + @Test + void testDefaultRabbitStreamTemplateConfiguration() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { + this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") + .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { + this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, + "messageConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { + this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils + .getField(streamTemplate, "streamConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { + this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils + .getField(streamTemplate, "producerCustomizer"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory(AssertableApplicationContext context) { CachingConnectionFactory connectionFactory = context.getBean(CachingConnectionFactory.class); return connectionFactory.getRabbitConnectionFactory(); @@ -1168,4 +1232,24 @@ void listen(String in) { } + @Configuration(proxyBeanMethods = false) + static class StreamMessageConverterConfiguration { + + @Bean + StreamMessageConverter myStreamMessageConverter() { + return mock(StreamMessageConverter.class); + } + + } + + @Configuration(proxyBeanMethods = false) + static class ProducerCustomizerConfiguration { + + @Bean + ProducerCustomizer myProducerCustomizer() { + return mock(ProducerCustomizer.class); + } + + } + } From d8af4d5ea53df3b3cbf75424771072d9c5d69887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Tue, 9 Nov 2021 20:32:38 -0600 Subject: [PATCH 2/2] Move changes to RabbitStreamConfiguration --- .../amqp/RabbitAutoConfiguration.java | 38 -------- .../amqp/RabbitStreamConfiguration.java | 30 +++++++ .../amqp/RabbitStreamTemplateConfigurer.java | 74 ++++++++++++++++ .../amqp/RabbitAutoConfigurationTests.java | 84 ------------------ .../amqp/RabbitStreamConfigurationTests.java | 86 +++++++++++++++++++ 5 files changed, 190 insertions(+), 122 deletions(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java index f12fc90161ed..e1f68c7f3ba8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java @@ -21,7 +21,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.impl.CredentialsProvider; import com.rabbitmq.client.impl.CredentialsRefreshService; -import com.rabbitmq.stream.Environment; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; @@ -44,10 +43,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.io.ResourceLoader; -import org.springframework.rabbit.stream.producer.ProducerCustomizer; -import org.springframework.rabbit.stream.producer.RabbitStreamOperations; -import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; -import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * {@link EnableAutoConfiguration Auto-configuration} for {@link RabbitTemplate}. @@ -88,7 +83,6 @@ * @author Phillip Webb * @author Artsiom Yudovin * @author Chris Bono - * @author Eddú Meléndez * @since 1.0.0 */ @Configuration(proxyBeanMethods = false) @@ -192,36 +186,4 @@ public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemp } - @Configuration(proxyBeanMethods = false) - @ConditionalOnClass(RabbitStreamTemplate.class) - @ConditionalOnMissingBean(RabbitStreamTemplate.class) - @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "stream") - protected static class RabbitStreamTemplateConfiguration { - - @Bean - @ConditionalOnMissingBean(RabbitStreamOperations.class) - @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") - public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, - RabbitProperties properties, ObjectProvider messageConverters, - ObjectProvider streamMessageConverters, - ObjectProvider producerCustomizers) { - RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, - properties.getStream().getName()); - MessageConverter messageConverter = messageConverters.getIfUnique(); - if (messageConverter != null) { - template.setMessageConverter(messageConverter); - } - StreamMessageConverter streamMessageConverter = streamMessageConverters.getIfUnique(); - if (streamMessageConverter != null) { - template.setStreamConverter(streamMessageConverter); - } - ProducerCustomizer producerCustomizer = producerCustomizers.getIfUnique(); - if (producerCustomizer != null) { - template.setProducerCustomizer(producerCustomizer); - } - return template; - } - - } - } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 1f402da6a414..15e92de55934 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -23,6 +23,7 @@ import com.rabbitmq.stream.EnvironmentBuilder; import org.springframework.amqp.rabbit.config.ContainerCustomizer; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -33,11 +34,16 @@ import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * Configuration for Spring RabbitMQ Stream plugin support. * * @author Gary Russell + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamRabbitListenerContainerFactory.class) @@ -78,4 +84,28 @@ private static Function withFallback(Supplier fallback) return (value) -> (value != null) ? value : fallback.get(); } + @Bean + @ConditionalOnMissingBean + RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties, + ObjectProvider messageConverter, + ObjectProvider streamMessageConverter, + ObjectProvider producerCustomizer) { + RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer(); + configurer.setMessageConverter(messageConverter.getIfUnique()); + configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique()); + configurer.setProducerCustomizer(producerCustomizer.getIfUnique()); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(RabbitStreamOperations.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") + RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties, + RabbitStreamTemplateConfigurer configurer) { + RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, + properties.getStream().getName()); + configurer.configure(template); + return template; + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java new file mode 100644 index 000000000000..7456c8d8de64 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java @@ -0,0 +1,74 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +/** + * Configure {@link RabbitStreamTemplate} with sensible defaults. + * + * @author Eddú Meléndez + * @since 2.7.0 + */ +public class RabbitStreamTemplateConfigurer { + + private MessageConverter messageConverter; + + private StreamMessageConverter streamMessageConverter; + + private ProducerCustomizer producerCustomizer; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) { + this.streamMessageConverter = streamMessageConverter; + } + + public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { + this.producerCustomizer = producerCustomizer; + } + + /** + * Configure the specified {@link RabbitStreamTemplate}. The template can be further + * tuned and default settings can be overridden. + * @param template the {@link RabbitStreamTemplate} instance to configure + */ + public void configure(RabbitStreamTemplate template) { + PropertyMapper map = PropertyMapper.get(); + if (this.messageConverter != null) { + template.setMessageConverter(this.messageConverter); + } + if (this.streamMessageConverter != null) { + template.setStreamConverter(this.streamMessageConverter); + } + if (this.producerCustomizer != null) { + template.setProducerCustomizer(this.producerCustomizer); + } + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 087e019105f8..db625a906a03 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -68,9 +68,6 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; -import org.springframework.rabbit.stream.producer.ProducerCustomizer; -import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; -import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; @@ -99,7 +96,6 @@ * @author Gary Russell * @author HaiTao Zhang * @author Franjo Zilic - * @author Eddú Meléndez */ @ExtendWith(OutputCaptureExtension.class) class RabbitAutoConfigurationTests { @@ -876,66 +872,6 @@ void whenADirectContainerCustomizerIsDefinedThenItIsCalledToConfigureTheContaine .configure(any(DirectMessageListenerContainer.class))); } - @Test - void testDefaultRabbitStreamTemplateConfiguration() { - this.contextRunner - .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") - .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(context).hasSingleBean(RabbitStreamTemplate.class); - assertThat(streamName).isEqualTo("stream-test"); - }); - } - - @Test - void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { - this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") - .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); - } - - @Test - void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { - this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) - .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") - .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, - "messageConverter"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); - assertThat(streamName).isEqualTo("stream-test"); - }); - } - - @Test - void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { - this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) - .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") - .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils - .getField(streamTemplate, "streamConverter"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); - assertThat(streamName).isEqualTo("stream-test"); - }); - } - - @Test - void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { - this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) - .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") - .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils - .getField(streamTemplate, "producerCustomizer"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); - assertThat(streamName).isEqualTo("stream-test"); - }); - } - private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory(AssertableApplicationContext context) { CachingConnectionFactory connectionFactory = context.getBean(CachingConnectionFactory.class); return connectionFactory.getRabbitConnectionFactory(); @@ -1232,24 +1168,4 @@ void listen(String in) { } - @Configuration(proxyBeanMethods = false) - static class StreamMessageConverterConfiguration { - - @Bean - StreamMessageConverter myStreamMessageConverter() { - return mock(StreamMessageConverter.class); - } - - } - - @Configuration(proxyBeanMethods = false) - static class ProducerCustomizerConfiguration { - - @Bean - ProducerCustomizer myProducerCustomizer() { - return mock(ProducerCustomizer.class); - } - - } - } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index f002d1c09b13..348137747f43 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -26,6 +26,7 @@ import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; @@ -33,6 +34,10 @@ import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -44,6 +49,7 @@ * * @author Gary Russell * @author Andy Wilkinson + * @author Eddú Meléndez */ class RabbitStreamConfigurationTests { @@ -149,6 +155,66 @@ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() { verify(builder).password("confidential"); } + @Test + void testDefaultRabbitStreamTemplateConfiguration() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { + this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") + .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { + this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, + "messageConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { + this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils + .getField(streamTemplate, "streamConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { + this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils + .getField(streamTemplate, "producerCustomizer"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + @Configuration(proxyBeanMethods = false) static class TestConfiguration { @@ -196,4 +262,24 @@ RabbitListenerContainerFactory rabbitListenerContainer } + @Configuration(proxyBeanMethods = false) + static class StreamMessageConverterConfiguration { + + @Bean + StreamMessageConverter myStreamMessageConverter() { + return mock(StreamMessageConverter.class); + } + + } + + @Configuration(proxyBeanMethods = false) + static class ProducerCustomizerConfiguration { + + @Bean + ProducerCustomizer myProducerCustomizer() { + return mock(ProducerCustomizer.class); + } + + } + }