Skip to content

Commit 9ff4ea5

Browse files
committed
Merge pull request #29089 from garyrussell
* pr/29089: Polish "Add transactionIdPrefix Property to KafkaTemplate" Add transactionIdPrefix Property to KafkaTemplate Closes gh-29089
2 parents af933f2 + 4799d2a commit 9ff4ea5

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2020 the original author or authors.
2+
* Copyright 2012-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2626
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
2727
import org.springframework.boot.context.properties.EnableConfigurationProperties;
28+
import org.springframework.boot.context.properties.PropertyMapper;
2829
import org.springframework.context.annotation.Bean;
2930
import org.springframework.context.annotation.Configuration;
3031
import org.springframework.context.annotation.Import;
@@ -66,10 +67,12 @@ public KafkaAutoConfiguration(KafkaProperties properties) {
6667
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
6768
ProducerListener<Object, Object> kafkaProducerListener,
6869
ObjectProvider<RecordMessageConverter> messageConverter) {
70+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
6971
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
7072
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
71-
kafkaTemplate.setProducerListener(kafkaProducerListener);
72-
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
73+
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
74+
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
75+
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
7376
return kafkaTemplate;
7477
}
7578

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,12 @@ public static class Template {
824824
*/
825825
private String defaultTopic;
826826

827+
/**
828+
* Transaction id prefix, override the transaction id prefix in the producer
829+
* factory.
830+
*/
831+
private String transactionIdPrefix;
832+
827833
public String getDefaultTopic() {
828834
return this.defaultTopic;
829835
}
@@ -832,6 +838,14 @@ public void setDefaultTopic(String defaultTopic) {
832838
this.defaultTopic = defaultTopic;
833839
}
834840

841+
public String getTransactionIdPrefix() {
842+
return this.transactionIdPrefix;
843+
}
844+
845+
public void setTransactionIdPrefix(String transactionIdPrefix) {
846+
this.transactionIdPrefix = transactionIdPrefix;
847+
}
848+
835849
}
836850

837851
public static class Listener {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
384384
void listenerProperties() {
385385
this.contextRunner
386386
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
387+
"spring.kafka.template.transaction-id-prefix=txOverride",
387388
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
388389
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
389390
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
@@ -406,6 +407,7 @@ void listenerProperties() {
406407
assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(MessagingMessageConverter.class);
407408
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
408409
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
410+
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
409411
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
410412
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
411413
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);

0 commit comments

Comments
 (0)