Skip to content

Commit 3789b71

Browse files
committed
GH-3110: Improve nullability for RabbitTemplate
Fixes: #3110 * Also improve nullability for `RabbitMessagingTemplate` It is easier for consumers of the API to use only one contract with possible nulls instead of going for choice with `if..else`
1 parent 8f248af commit 3789b71

File tree

3 files changed

+45
-38
lines changed

3 files changed

+45
-38
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitMessagingTemplate.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,31 +133,35 @@ public void afterPropertiesSet() {
133133
}
134134

135135
@Override
136-
public void send(String exchange, String routingKey, Message<?> message) throws MessagingException {
136+
public void send(@Nullable String exchange, @Nullable String routingKey, Message<?> message)
137+
throws MessagingException {
138+
137139
doSend(exchange, routingKey, message);
138140
}
139141

140142
@Override
141-
public void convertAndSend(String exchange, String routingKey, Object payload) throws MessagingException {
143+
public void convertAndSend(@Nullable String exchange, @Nullable String routingKey, Object payload)
144+
throws MessagingException {
145+
142146
convertAndSend(exchange, routingKey, payload, (Map<String, Object>) null);
143147
}
144148

145149
@Override
146-
public void convertAndSend(String exchange, String routingKey, Object payload,
150+
public void convertAndSend(@Nullable String exchange, @Nullable String routingKey, Object payload,
147151
@Nullable Map<String, Object> headers) throws MessagingException {
148152

149153
convertAndSend(exchange, routingKey, payload, headers, null);
150154
}
151155

152156
@Override
153-
public void convertAndSend(String exchange, String routingKey, Object payload,
157+
public void convertAndSend(@Nullable String exchange, @Nullable String routingKey, Object payload,
154158
@Nullable MessagePostProcessor postProcessor) throws MessagingException {
155159

156160
convertAndSend(exchange, routingKey, payload, null, postProcessor);
157161
}
158162

159163
@Override
160-
public void convertAndSend(String exchange, String routingKey, Object payload,
164+
public void convertAndSend(@Nullable String exchange, @Nullable String routingKey, Object payload,
161165
@Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
162166
throws MessagingException {
163167

@@ -166,36 +170,36 @@ public void convertAndSend(String exchange, String routingKey, Object payload,
166170
}
167171

168172
@Override
169-
public @Nullable Message<?> sendAndReceive(String exchange, String routingKey, Message<?> requestMessage)
170-
throws MessagingException {
173+
public @Nullable Message<?> sendAndReceive(@Nullable String exchange, @Nullable String routingKey,
174+
Message<?> requestMessage) throws MessagingException {
171175

172176
return doSendAndReceive(exchange, routingKey, requestMessage);
173177
}
174178

175179
@Override
176-
public <T> @Nullable T convertSendAndReceive(String exchange, String routingKey, Object request,
180+
public <T> @Nullable T convertSendAndReceive(@Nullable String exchange, @Nullable String routingKey, Object request,
177181
Class<T> targetClass) throws MessagingException {
178182

179183
return convertSendAndReceive(exchange, routingKey, request, null, targetClass);
180184
}
181185

182186
@Override
183-
public <T> @Nullable T convertSendAndReceive(String exchange, String routingKey, Object request,
187+
public <T> @Nullable T convertSendAndReceive(@Nullable String exchange, @Nullable String routingKey, Object request,
184188
@Nullable Map<String, Object> headers, Class<T> targetClass) throws MessagingException {
185189

186190
return convertSendAndReceive(exchange, routingKey, request, headers, targetClass, null);
187191
}
188192

189193
@Override
190-
public <T> @Nullable T convertSendAndReceive(String exchange, String routingKey, Object request,
194+
public <T> @Nullable T convertSendAndReceive(@Nullable String exchange, @Nullable String routingKey, Object request,
191195
Class<T> targetClass, @Nullable MessagePostProcessor requestPostProcessor) throws MessagingException {
192196

193197
return convertSendAndReceive(exchange, routingKey, request, null, targetClass, requestPostProcessor);
194198
}
195199

196200
@SuppressWarnings("unchecked")
197201
@Override
198-
public <T> @Nullable T convertSendAndReceive(String exchange, String routingKey, Object request,
202+
public <T> @Nullable T convertSendAndReceive(@Nullable String exchange, @Nullable String routingKey, Object request,
199203
@Nullable Map<String, Object> headers,
200204
Class<T> targetClass, @Nullable MessagePostProcessor requestPostProcessor) throws MessagingException {
201205

@@ -205,7 +209,7 @@ public void convertAndSend(String exchange, String routingKey, Object payload,
205209
}
206210

207211
@Override
208-
protected void doSend(String destination, Message<?> message) {
212+
protected void doSend(@Nullable String destination, Message<?> message) {
209213
try {
210214
Object correlation = message.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
211215
if (correlation instanceof CorrelationData corrData) {
@@ -220,7 +224,7 @@ protected void doSend(String destination, Message<?> message) {
220224
}
221225
}
222226

223-
protected void doSend(String exchange, String routingKey, Message<?> message) {
227+
protected void doSend(@Nullable String exchange, @Nullable String routingKey, Message<?> message) {
224228
try {
225229
Object correlation = message.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
226230
if (correlation instanceof CorrelationData corrData) {
@@ -268,7 +272,7 @@ private String resolveDestination() {
268272
}
269273

270274
@Override
271-
protected @Nullable Message<?> doSendAndReceive(String destination, Message<?> requestMessage) {
275+
protected @Nullable Message<?> doSendAndReceive(@Nullable String destination, Message<?> requestMessage) {
272276
try {
273277
org.springframework.amqp.core.Message amqpMessage = this.rabbitTemplate.sendAndReceive(
274278
destination, createMessage(requestMessage));
@@ -279,7 +283,8 @@ private String resolveDestination() {
279283
}
280284
}
281285

282-
protected @Nullable Message<?> doSendAndReceive(String exchange, String routingKey, Message<?> requestMessage) {
286+
protected @Nullable Message<?> doSendAndReceive(@Nullable String exchange, @Nullable String routingKey,
287+
Message<?> requestMessage) {
283288
try {
284289
org.springframework.amqp.core.Message amqpMessage = this.rabbitTemplate.sendAndReceive(
285290
exchange, routingKey, createMessage(requestMessage));

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitOperations.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public interface RabbitOperations extends AmqpTemplate, Lifecycle {
4545
* {@link ChannelCallback#doInRabbit(com.rabbitmq.client.Channel)}.
4646
* @throws AmqpException if one occurs.
4747
*/
48-
<T> @Nullable T execute(ChannelCallback<T> action) throws AmqpException;
48+
<T> @Nullable T execute(ChannelCallback<? extends @Nullable T> action) throws AmqpException;
4949

5050
/**
5151
* Invoke the callback and run all operations on the template argument in a dedicated
@@ -57,7 +57,7 @@ public interface RabbitOperations extends AmqpTemplate, Lifecycle {
5757
* @throws AmqpException if one occurs.
5858
* @since 2.0
5959
*/
60-
default <T> @Nullable T invoke(OperationsCallback<T> action) throws AmqpException {
60+
default <T> @Nullable T invoke(OperationsCallback<? extends @Nullable T> action) throws AmqpException {
6161
return invoke(action, null, null);
6262
}
6363

@@ -71,8 +71,8 @@ public interface RabbitOperations extends AmqpTemplate, Lifecycle {
7171
* @return the result of the action method.
7272
* @since 2.1
7373
*/
74-
<T> @Nullable T invoke(OperationsCallback<T> action, com.rabbitmq.client.@Nullable ConfirmCallback acks,
75-
com.rabbitmq.client.@Nullable ConfirmCallback nacks);
74+
<T> @Nullable T invoke(OperationsCallback<? extends @Nullable T> action,
75+
com.rabbitmq.client.@Nullable ConfirmCallback acks, com.rabbitmq.client.@Nullable ConfirmCallback nacks);
7676

7777
/**
7878
* Delegate to the underlying dedicated channel to wait for confirms. The connection
@@ -100,7 +100,7 @@ public interface RabbitOperations extends AmqpTemplate, Lifecycle {
100100
void waitForConfirmsOrDie(long timeout) throws AmqpException;
101101

102102
/**
103-
* Return the connection factory for this operations.
103+
* Return the connection factory for these operations.
104104
* @return the connection factory.
105105
* @since 2.0
106106
*/

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@
113113
import org.springframework.expression.spel.standard.SpelExpressionParser;
114114
import org.springframework.expression.spel.support.StandardEvaluationContext;
115115
import org.springframework.retry.RecoveryCallback;
116-
import org.springframework.retry.RetryCallback;
117116
import org.springframework.retry.support.RetryTemplate;
118117
import org.springframework.util.Assert;
119118
import org.springframework.util.ErrorHandler;
@@ -1111,17 +1110,17 @@ public void send(Message message) throws AmqpException {
11111110
}
11121111

11131112
@Override
1114-
public void send(String routingKey, Message message) throws AmqpException {
1113+
public void send(@Nullable String routingKey, Message message) throws AmqpException {
11151114
send(this.exchange, routingKey, message);
11161115
}
11171116

11181117
@Override
1119-
public void send(String routingKey, Message message, CorrelationData correlationData) throws AmqpException {
1118+
public void send(@Nullable String routingKey, Message message, CorrelationData correlationData) throws AmqpException {
11201119
send(this.exchange, routingKey, message, correlationData);
11211120
}
11221121

11231122
@Override
1124-
public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {
1123+
public void send(@Nullable String exchange, @Nullable String routingKey, final Message message) throws AmqpException {
11251124
send(exchange, routingKey, message, null);
11261125
}
11271126

@@ -1630,24 +1629,24 @@ private <S> void doSendReply(final ReplyToAddressCallback<S> replyToAddressCallb
16301629
}
16311630

16321631
@Override
1633-
public @Nullable Message sendAndReceive(final String routingKey, final Message message) throws AmqpException {
1632+
public @Nullable Message sendAndReceive(@Nullable String routingKey, final Message message) throws AmqpException {
16341633
return sendAndReceive(routingKey, message, null);
16351634
}
16361635

1637-
public @Nullable Message sendAndReceive(final String routingKey, final Message message,
1636+
public @Nullable Message sendAndReceive(@Nullable String routingKey, Message message,
16381637
@Nullable CorrelationData correlationData) throws AmqpException {
16391638

16401639
return doSendAndReceive(this.exchange, routingKey, message, correlationData);
16411640
}
16421641

16431642
@Override
1644-
public @Nullable Message sendAndReceive(final String exchange, final String routingKey, final Message message)
1643+
public @Nullable Message sendAndReceive(@Nullable String exchange, @Nullable String routingKey, Message message)
16451644
throws AmqpException {
16461645

16471646
return sendAndReceive(exchange, routingKey, message, null);
16481647
}
16491648

1650-
public @Nullable Message sendAndReceive(final String exchange, final String routingKey, final Message message,
1649+
public @Nullable Message sendAndReceive(@Nullable String exchange, @Nullable String routingKey, Message message,
16511650
@Nullable CorrelationData correlationData) throws AmqpException {
16521651

16531652
return doSendAndReceive(exchange, routingKey, message, correlationData);
@@ -2165,16 +2164,18 @@ public Boolean isMandatoryFor(final Message message) {
21652164
}
21662165

21672166
@Override
2168-
public <T> @Nullable T execute(ChannelCallback<T> action) {
2167+
public <T> @Nullable T execute(ChannelCallback<? extends @Nullable T> action) {
21692168
return execute(action, getConnectionFactory());
21702169
}
21712170

21722171
@SuppressWarnings(UNCHECKED)
2173-
private <T> @Nullable T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
2172+
private <T> @Nullable T execute(ChannelCallback<? extends @Nullable T> action,
2173+
ConnectionFactory connectionFactory) {
2174+
21742175
if (this.retryTemplate != null) {
21752176
try {
21762177
return this.retryTemplate.execute(
2177-
(RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
2178+
context -> doExecute(action, connectionFactory),
21782179
(RecoveryCallback<T>) this.recoveryCallback);
21792180
}
21802181
catch (RuntimeException e) { // NOSONAR catch and rethrow needed to avoid next catch
@@ -2189,7 +2190,9 @@ public Boolean isMandatoryFor(final Message message) {
21892190
}
21902191
}
21912192

2192-
private <T> @Nullable T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) { // NOSONAR complexity
2193+
private <T> @Nullable T doExecute(ChannelCallback<? extends @Nullable T> action,
2194+
ConnectionFactory connectionFactory) {
2195+
21932196
Assert.notNull(action, "Callback object must not be null");
21942197
Channel channel = null;
21952198
boolean invokeScope = false;
@@ -2211,8 +2214,7 @@ public Boolean isMandatoryFor(final Message message) {
22112214
}
22122215
}
22132216
else {
2214-
connection = ConnectionFactoryUtils.createConnection(connectionFactory,
2215-
this.usePublisherConnection); // NOSONAR - RabbitUtils closes
2217+
connection = ConnectionFactoryUtils.createConnection(connectionFactory, this.usePublisherConnection);
22162218
try {
22172219
channel = connection.createChannel(false);
22182220
}
@@ -2253,8 +2255,8 @@ private void cleanUpAfterAction(@Nullable Channel channel, boolean invokeScope,
22532255
}
22542256
}
22552257

2256-
private <T> @Nullable T invokeAction(ChannelCallback<T> action, ConnectionFactory connectionFactory, Channel channel)
2257-
throws Exception { // NOSONAR see the callback
2258+
private <T> @Nullable T invokeAction(ChannelCallback<? extends @Nullable T> action,
2259+
ConnectionFactory connectionFactory, Channel channel) throws Exception {
22582260

22592261
if (isPublisherConfirmsOrReturns(connectionFactory)) {
22602262
addListener(channel);
@@ -2268,8 +2270,8 @@ private void cleanUpAfterAction(@Nullable Channel channel, boolean invokeScope,
22682270
}
22692271

22702272
@Override
2271-
public <T> @Nullable T invoke(OperationsCallback<T> action, com.rabbitmq.client.@Nullable ConfirmCallback acks,
2272-
com.rabbitmq.client.@Nullable ConfirmCallback nacks) {
2273+
public <T> @Nullable T invoke(OperationsCallback<? extends @Nullable T> action,
2274+
com.rabbitmq.client.@Nullable ConfirmCallback acks, com.rabbitmq.client.@Nullable ConfirmCallback nacks) {
22732275

22742276
final Channel currentChannel = this.dedicatedChannels.get();
22752277
Assert.state(currentChannel == null, () -> "Nested invoke() calls are not supported; channel '" + currentChannel

0 commit comments

Comments
 (0)