diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 80d140f60c03..84722f388e78 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -42,6 +42,7 @@ * @author Gary Russell * @author Artsiom Yudovin * @author Franjo Zilic + * @author Eddú Meléndez * @since 1.0.0 */ @ConfigurationProperties(prefix = "spring.rabbitmq") @@ -1193,6 +1194,11 @@ public static final class Stream { */ private String password; + /** + * Name of the stream. + */ + private String name; + public String getHost() { return this.host; } @@ -1225,6 +1231,14 @@ public void setPassword(String password) { this.password = password; } + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 1f402da6a414..15e92de55934 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -23,6 +23,7 @@ import com.rabbitmq.stream.EnvironmentBuilder; import org.springframework.amqp.rabbit.config.ContainerCustomizer; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -33,11 +34,16 @@ import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * Configuration for Spring RabbitMQ Stream plugin support. * * @author Gary Russell + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamRabbitListenerContainerFactory.class) @@ -78,4 +84,28 @@ private static Function withFallback(Supplier fallback) return (value) -> (value != null) ? value : fallback.get(); } + @Bean + @ConditionalOnMissingBean + RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties, + ObjectProvider messageConverter, + ObjectProvider streamMessageConverter, + ObjectProvider producerCustomizer) { + RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer(); + configurer.setMessageConverter(messageConverter.getIfUnique()); + configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique()); + configurer.setProducerCustomizer(producerCustomizer.getIfUnique()); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(RabbitStreamOperations.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") + RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties, + RabbitStreamTemplateConfigurer configurer) { + RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, + properties.getStream().getName()); + configurer.configure(template); + return template; + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java new file mode 100644 index 000000000000..7456c8d8de64 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java @@ -0,0 +1,74 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +/** + * Configure {@link RabbitStreamTemplate} with sensible defaults. + * + * @author Eddú Meléndez + * @since 2.7.0 + */ +public class RabbitStreamTemplateConfigurer { + + private MessageConverter messageConverter; + + private StreamMessageConverter streamMessageConverter; + + private ProducerCustomizer producerCustomizer; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) { + this.streamMessageConverter = streamMessageConverter; + } + + public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { + this.producerCustomizer = producerCustomizer; + } + + /** + * Configure the specified {@link RabbitStreamTemplate}. The template can be further + * tuned and default settings can be overridden. + * @param template the {@link RabbitStreamTemplate} instance to configure + */ + public void configure(RabbitStreamTemplate template) { + PropertyMapper map = PropertyMapper.get(); + if (this.messageConverter != null) { + template.setMessageConverter(this.messageConverter); + } + if (this.streamMessageConverter != null) { + template.setStreamConverter(this.streamMessageConverter); + } + if (this.producerCustomizer != null) { + template.setProducerCustomizer(this.producerCustomizer); + } + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index f002d1c09b13..348137747f43 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -26,6 +26,7 @@ import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; @@ -33,6 +34,10 @@ import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -44,6 +49,7 @@ * * @author Gary Russell * @author Andy Wilkinson + * @author Eddú Meléndez */ class RabbitStreamConfigurationTests { @@ -149,6 +155,66 @@ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() { verify(builder).password("confidential"); } + @Test + void testDefaultRabbitStreamTemplateConfiguration() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { + this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") + .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { + this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, + "messageConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { + this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils + .getField(streamTemplate, "streamConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { + this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils + .getField(streamTemplate, "producerCustomizer"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + @Configuration(proxyBeanMethods = false) static class TestConfiguration { @@ -196,4 +262,24 @@ RabbitListenerContainerFactory rabbitListenerContainer } + @Configuration(proxyBeanMethods = false) + static class StreamMessageConverterConfiguration { + + @Bean + StreamMessageConverter myStreamMessageConverter() { + return mock(StreamMessageConverter.class); + } + + } + + @Configuration(proxyBeanMethods = false) + static class ProducerCustomizerConfiguration { + + @Bean + ProducerCustomizer myProducerCustomizer() { + return mock(ProducerCustomizer.class); + } + + } + }