diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java index 56ce94231aa3..fd1a6f334e1c 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java @@ -21,6 +21,10 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -52,14 +56,37 @@ public class SseEmitter extends ResponseBodyEmitter { private final Lock writeLock = new ReentrantLock(); /** - * Create a new SseEmitter instance. + * The interval (in milliseconds) at which heartbeat messages are sent to the client. + * A value of 0 means no heartbeat messages will be sent. + */ + private final long heartbeatInterval; + + /** + * The scheduled future for the heartbeat task. Used to cancel the task when needed. + */ + @Nullable + private ScheduledFuture> heartbeatFuture; + + /** + * The scheduler used to execute the heartbeat task at fixed intervals. + * Used to schedule and manage the periodic heartbeat messages. + */ + @Nullable + private ScheduledExecutorService scheduler; + + /** + * Create a new {@code SseEmitter} instance. + *
By default, the timeout is not set (i.e., it depends on the MVC configuration), + * and no heartbeat messages are sent. */ public SseEmitter() { + this.heartbeatInterval = 0; } /** * Create a SseEmitter with a custom timeout value. *
By default not set in which case the default configured in the MVC + *
No heartbeat messages will be sent unless specified. * Java Config or the MVC namespace is used, or if that's not set, then the * timeout depends on the default of the underlying server. * @param timeout the timeout value in milliseconds @@ -67,6 +94,24 @@ public SseEmitter() { */ public SseEmitter(Long timeout) { super(timeout); + heartbeatInterval = 0; + } + + /** + * Create a new {@code SseEmitter} instance with a custom timeout and heartbeat interval. + * @param timeout the timeout value in milliseconds + * @param heartbeatInterval the interval (in milliseconds) at which heartbeat messages are sent. + * A value of 0 means no heartbeat messages will be sent. + */ + public SseEmitter(Long timeout, long heartbeatInterval) { + super(timeout); + this.heartbeatInterval = heartbeatInterval; + if (heartbeatInterval > 0) { + startHeartbeat(); + onCompletion(this::stopHeartbeat); + onTimeout(this::stopHeartbeat); + onError(ex -> stopHeartbeat()); + } } @@ -139,6 +184,40 @@ public void send(SseEventBuilder builder) throws IOException { } } + /** + * Start sending heartbeat messages at the specified interval. + *
Heartbeat messages are sent as comments (":heartbeat") to keep the connection alive + * and to detect client disconnects. + */ + private void startHeartbeat() { + if (heartbeatInterval > 0) { + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + this.heartbeatFuture = this.scheduler.scheduleAtFixedRate(() -> { + try { + send(SseEmitter.event().comment("heartbeat")); + } catch (IOException ex) { + completeWithError(ex); + stopHeartbeat(); + } + }, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); + } + } + + /** + * Stop sending heartbeat messages. + *
Cancels the scheduled heartbeat task and shuts down the scheduler to release resources. + */ + private void stopHeartbeat() { + if (heartbeatFuture != null) { + heartbeatFuture.cancel(true); + this.heartbeatFuture = null; + } + if (this.scheduler != null) { + this.scheduler.shutdown(); + this.scheduler = null; + } + } + @Override public String toString() { return "SseEmitter@" + ObjectUtils.getIdentityHexString(this); diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java index ba886f626e0a..dbbbaebdea57 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterTests.java @@ -137,6 +137,38 @@ void sendEventFullWithTwoDataLinesInTheMiddle() throws Exception { this.handler.assertWriteCount(1); } + @Test + void heartbeatIsSent() throws Exception { + this.emitter = new SseEmitter(0L, 100L); + this.emitter.initialize(this.handler); + Thread.sleep(250); + + long heartbeatCount = this.handler.objects.stream() + .filter(data -> data.equals(":heartbeat\n\n")) + .count(); + + assertThat(heartbeatCount).isGreaterThanOrEqualTo(2); + } + + @Test + void heartbeatStopsAfterCompletion() throws Exception { + this.emitter = new SseEmitter(0L, 100L); + this.emitter.initialize(this.handler); + Thread.sleep(150); + this.emitter.complete(); + + long heartbeatCountBeforeCompletion = this.handler.objects.stream() + .filter(data -> data.equals(":heartbeat\n\n")) + .count(); + + Thread.sleep(150); + long totalHeartbeatCount = this.handler.objects.stream() + .filter(data -> data.equals(":heartbeat\n\n")) + .count(); + + assertThat(totalHeartbeatCount).isEqualTo(heartbeatCountBeforeCompletion); + } + private static class TestHandler implements ResponseBodyEmitter.Handler {