diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 6a6e0183293f..3486f39506bf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -66,10 +67,13 @@ public KafkaAutoConfiguration(KafkaProperties properties) { public KafkaTemplate kafkaTemplate(ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener, ObjectProvider messageConverter) { + + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); KafkaTemplate kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); - kafkaTemplate.setProducerListener(kafkaProducerListener); - kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); + map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); + map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); + map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); return kafkaTemplate; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e7054900f573..43bd131347f8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -824,6 +824,11 @@ public static class Template { */ private String defaultTopic; + /** + * Override the transaction id prefix in the producer factory. + */ + private String transactionIdPrefix; + public String getDefaultTopic() { return this.defaultTopic; } @@ -832,6 +837,14 @@ public void setDefaultTopic(String defaultTopic) { this.defaultTopic = defaultTopic; } + public String getTransactionIdPrefix() { + return this.transactionIdPrefix; + } + + public void setTransactionIdPrefix(String transactionIdPrefix) { + this.transactionIdPrefix = transactionIdPrefix; + } + } public static class Listener { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 320a53ea0934..4e09226f42f9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -384,6 +384,7 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() { void listenerProperties() { this.contextRunner .withPropertyValues("spring.kafka.template.default-topic=testTopic", + "spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", @@ -405,6 +406,7 @@ void listenerProperties() { assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(MessagingMessageConverter.class); assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); + assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride"); assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory); ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties(); assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);