Skip to content

Commit daa0afd

Browse files
committed
Fix MessageRecoverer.recover() for nullable cause
1 parent 3789b71 commit daa0afd

File tree

7 files changed

+36
-20
lines changed

7 files changed

+36
-20
lines changed

spring-amqp/src/main/java/org/springframework/amqp/AmqpRejectAndDontRequeueException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public AmqpRejectAndDontRequeueException(String message) {
4646
* Construct an instance with the supplied argument.
4747
* @param cause the cause.
4848
*/
49-
public AmqpRejectAndDontRequeueException(Throwable cause) {
49+
public AmqpRejectAndDontRequeueException(@Nullable Throwable cause) {
5050
this(null, false, cause);
5151
}
5252

@@ -55,7 +55,7 @@ public AmqpRejectAndDontRequeueException(Throwable cause) {
5555
* @param message A message describing the problem.
5656
* @param cause the cause.
5757
*/
58-
public AmqpRejectAndDontRequeueException(String message, Throwable cause) {
58+
public AmqpRejectAndDontRequeueException(String message, @Nullable Throwable cause) {
5959
this(message, false, cause);
6060
}
6161

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamMessageRecoverer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.rabbit.stream.retry;
1818

1919
import com.rabbitmq.stream.MessageHandler.Context;
20+
import org.jspecify.annotations.Nullable;
2021

2122
import org.springframework.amqp.core.Message;
2223
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
@@ -33,7 +34,7 @@
3334
public interface StreamMessageRecoverer extends MessageRecoverer {
3435

3536
@Override
36-
default void recover(Message message, Throwable cause) {
37+
default void recover(Message message, @Nullable Throwable cause) {
3738
}
3839

3940
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/ImmediateRequeueMessageRecoverer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.commons.logging.Log;
2020
import org.apache.commons.logging.LogFactory;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.amqp.ImmediateRequeueAmqpException;
2324
import org.springframework.amqp.core.Message;
@@ -36,11 +37,16 @@ public class ImmediateRequeueMessageRecoverer implements MessageRecoverer {
3637
protected Log logger = LogFactory.getLog(ImmediateRequeueMessageRecoverer.class); // NOSONAR protected
3738

3839
@Override
39-
public void recover(Message message, Throwable cause) {
40+
public void recover(Message message, @Nullable Throwable cause) {
4041
if (this.logger.isWarnEnabled()) {
4142
this.logger.warn("Retries exhausted for message " + message + "; requeuing...", cause);
4243
}
43-
throw new ImmediateRequeueAmqpException(cause);
44+
if (cause != null) {
45+
throw new ImmediateRequeueAmqpException(cause);
46+
}
47+
else {
48+
throw new ImmediateRequeueAmqpException("Re-queueing for message: " + message);
49+
}
4450
}
4551

4652
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageBatchRecoverer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.List;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.amqp.core.Message;
2224

2325
/**
@@ -26,23 +28,24 @@
2628
* the recoverer to properly recover the remaining records.
2729
*
2830
* @author Gary Russell
31+
* @author Artem Bilan
32+
*
2933
* @since 2.2
3034
*
3135
*/
3236
@FunctionalInterface
3337
public interface MessageBatchRecoverer extends MessageRecoverer {
3438

3539
@Override
36-
default void recover(Message message, Throwable cause) {
40+
default void recover(Message message, @Nullable Throwable cause) {
3741
throw new IllegalStateException("MessageBatchRecoverer configured with a non-batch listener");
3842
}
3943

4044
/**
4145
* Callback for message batch that was consumed but failed all retry attempts.
42-
*
4346
* @param messages the messages to recover
4447
* @param cause the cause of the error
4548
*/
46-
void recover(List<Message> messages, Throwable cause);
49+
void recover(List<Message> messages, @Nullable Throwable cause);
4750

4851
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.amqp.rabbit.retry;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.amqp.core.Message;
2022

2123
/**
@@ -24,6 +26,7 @@
2426
*
2527
* @author Dave Syer
2628
* @author Gary Russell
29+
* @author Artem Bilan
2730
*
2831
*/
2932
@FunctionalInterface
@@ -35,6 +38,6 @@ public interface MessageRecoverer {
3538
* @param message the message to recover
3639
* @param cause the cause of the error
3740
*/
38-
void recover(Message message, Throwable cause);
41+
void recover(Message message, @Nullable Throwable cause);
3942

4043
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RejectAndDontRequeueRecoverer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.commons.logging.Log;
2222
import org.apache.commons.logging.LogFactory;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
2526
import org.springframework.amqp.core.Message;
@@ -69,7 +70,7 @@ public RejectAndDontRequeueRecoverer(Supplier<String> messageSupplier) {
6970
}
7071

7172
@Override
72-
public void recover(Message message, Throwable cause) {
73+
public void recover(Message message, @Nullable Throwable cause) {
7374
if (this.logger.isWarnEnabled()) {
7475
this.logger.warn("Retries exhausted for message " + message, cause);
7576
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,18 +189,20 @@ protected MessageDeliveryMode getDeliveryMode() {
189189
}
190190

191191
@Override
192-
public void recover(Message message, Throwable cause) {
192+
public void recover(Message message, @Nullable Throwable cause) {
193193
MessageProperties messageProperties = message.getMessageProperties();
194194
Map<String, @Nullable Object> headers = messageProperties.getHeaders();
195-
String exceptionMessage = cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage();
196-
@Nullable String[] processed = processStackTrace(cause, exceptionMessage);
197-
String stackTraceAsString = processed[0];
198-
String truncatedExceptionMessage = processed[1];
199-
if (truncatedExceptionMessage != null) {
200-
exceptionMessage = truncatedExceptionMessage;
195+
if (cause != null) {
196+
String exceptionMessage = cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage();
197+
@Nullable String[] processed = processStackTrace(cause, exceptionMessage);
198+
String stackTraceAsString = processed[0];
199+
String truncatedExceptionMessage = processed[1];
200+
if (truncatedExceptionMessage != null) {
201+
exceptionMessage = truncatedExceptionMessage;
202+
}
203+
headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString);
204+
headers.put(X_EXCEPTION_MESSAGE, exceptionMessage);
201205
}
202-
headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString);
203-
headers.put(X_EXCEPTION_MESSAGE, exceptionMessage);
204206
headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange());
205207
headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey());
206208
Map<? extends String, ?> additionalHeaders = additionalHeaders(message, cause);
@@ -304,7 +306,7 @@ else if (stackTraceAsString.length() + exceptionMessage.length() > this.maxStack
304306
* @param cause The cause.
305307
* @return A {@link Map} of additional headers to add.
306308
*/
307-
protected @Nullable Map<? extends String, ?> additionalHeaders(Message message, Throwable cause) {
309+
protected @Nullable Map<? extends String, ?> additionalHeaders(Message message, @Nullable Throwable cause) {
308310
return null;
309311
}
310312

0 commit comments

Comments
 (0)