diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index d6472ee012c1..5d526fdfd9d0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -199,6 +199,8 @@ private void configureContainer(ContainerProperties container) { map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); + map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis) + .to(container::setIdlePartitionEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e76dd85de209..2a142a920e79 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -901,6 +901,12 @@ public enum Type { */ private Duration idleEventInterval; + /** + * Time between publishing idle partition consumer events (no data received for + * partition). + */ + private Duration idlePartitionEventInterval; + /** * Time between checks for non-responsive consumers. If a duration suffix is not * specified, seconds will be used. @@ -1005,6 +1011,14 @@ public void setIdleEventInterval(Duration idleEventInterval) { this.idleEventInterval = idleEventInterval; } + public Duration getIdlePartitionEventInterval() { + return this.idlePartitionEventInterval; + } + + public void setIdlePartitionEventInterval(Duration idlePartitionEventInterval) { + this.idlePartitionEventInterval = idlePartitionEventInterval; + } + public Duration getMonitorInterval() { return this.monitorInterval; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 320a53ea0934..3379f9e6efac 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -389,6 +389,7 @@ void listenerProperties() { "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", @@ -415,6 +416,7 @@ void listenerProperties() { assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); + assertThat(containerProperties.getIdlePartitionEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();