From 55f3b9a565567db1992dcc2241d83718793b52e7 Mon Sep 17 00:00:00 2001 From: Pascal Ayotte Date: Mon, 4 Oct 2021 09:37:34 -0400 Subject: [PATCH 1/4] Support IdlePartitionEventInterval configuration IdlePartitionEventInterval was added, but the value is not retrieved from configuration. Adding support to set the value from configuration. --- .../kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index d6472ee012c1..ae5d49e5eaeb 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -199,6 +199,7 @@ private void configureContainer(ContainerProperties container) { map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); + map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis).to(container::setIdlePartitionEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); From 93acd80bf7ea53cd1a961b7fb039bce3bd187f4f Mon Sep 17 00:00:00 2001 From: Pascal Ayotte Date: Mon, 4 Oct 2021 21:58:17 -0400 Subject: [PATCH 2/4] Support idle-partition-event-interval tests --- .../boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 320a53ea0934..3379f9e6efac 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -389,6 +389,7 @@ void listenerProperties() { "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", @@ -415,6 +416,7 @@ void listenerProperties() { assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); + assertThat(containerProperties.getIdlePartitionEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); From 214bc25ac135521458db7c9db43652c0f4db6fd3 Mon Sep 17 00:00:00 2001 From: radiodeer Date: Mon, 25 Oct 2021 20:25:50 -0400 Subject: [PATCH 3/4] add idlePartitionEventInterval to KafkaProperties --- .../boot/autoconfigure/kafka/KafkaProperties.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e76dd85de209..efe49691c0cf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -901,6 +901,11 @@ public enum Type { */ private Duration idleEventInterval; + /** + * Time between publishing idle partition consumer events (no data received for partition). + */ + private Duration idlePartitionEventInterval; + /** * Time between checks for non-responsive consumers. If a duration suffix is not * specified, seconds will be used. @@ -1005,6 +1010,14 @@ public void setIdleEventInterval(Duration idleEventInterval) { this.idleEventInterval = idleEventInterval; } + public Duration getIdlePartitionEventInterval() { + return this.idlePartitionEventInterval; + } + + public void setIdlePartitionEventInterval(Duration idlePartitionEventInterval) { + this.idlePartitionEventInterval = idlePartitionEventInterval; + } + public Duration getMonitorInterval() { return this.monitorInterval; } From bf27330816490fb6e2b4dd98e2a4276e1ba058f6 Mon Sep 17 00:00:00 2001 From: Pascal Ayotte Date: Wed, 27 Oct 2021 11:37:19 -0400 Subject: [PATCH 4/4] run format --- .../ConcurrentKafkaListenerContainerFactoryConfigurer.java | 3 ++- .../boot/autoconfigure/kafka/KafkaProperties.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index ae5d49e5eaeb..5d526fdfd9d0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -199,7 +199,8 @@ private void configureContainer(ContainerProperties container) { map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); - map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis).to(container::setIdlePartitionEventInterval); + map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis) + .to(container::setIdlePartitionEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index efe49691c0cf..2a142a920e79 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -902,7 +902,8 @@ public enum Type { private Duration idleEventInterval; /** - * Time between publishing idle partition consumer events (no data received for partition). + * Time between publishing idle partition consumer events (no data received for + * partition). */ private Duration idlePartitionEventInterval;