diff --git a/spring-integration-core/src/main/java/org/springframework/integration/acks/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/acks/package-info.java index f341052a884..c263244d02d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/acks/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/acks/package-info.java @@ -1,5 +1,5 @@ /** * Provides classes related to message acknowledgment. */ -@org.springframework.lang.NonNullApi +@org.jspecify.annotations.NullMarked package org.springframework.integration.acks; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java index 2e3ad25b6f0..645acfe3c42 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag private boolean messageBuilderFactorySet; + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 7a7cae7320d..6110ed63ba2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -125,8 +125,10 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP private boolean releaseStrategySet; + @SuppressWarnings("NullAway.Init") private MessageChannel discardChannel; + @Nullable private String discardChannelName; private boolean sendPartialResultOnExpiry; @@ -143,18 +145,23 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP private boolean releasePartialSequences; + @Nullable private Expression groupTimeoutExpression; + @Nullable private List forceReleaseAdviceChain; private long expireTimeout; + @Nullable private Duration expireDuration; private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor(); + @SuppressWarnings("NullAway.Init") private EvaluationContext evaluationContext; + @Nullable private ApplicationEventPublisher applicationEventPublisher; private boolean expireGroupsUponTimeout = true; @@ -165,10 +172,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP private volatile boolean running; + @Nullable private BiFunction, String, String> groupConditionSupplier; public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, - CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { + @Nullable CorrelationStrategy correlationStrategy, @Nullable ReleaseStrategy releaseStrategy) { Assert.notNull(processor, "'processor' must not be null"); Assert.notNull(store, "'store' must not be null"); @@ -504,6 +512,7 @@ public MessageChannel getDiscardChannel() { return this.discardChannel; } + @Nullable protected String getDiscardChannelName() { return this.discardChannelName; } @@ -532,6 +541,7 @@ protected boolean isReleasePartialSequences() { return this.releasePartialSequences; } + @Nullable protected Expression getGroupTimeoutExpression() { return this.groupTimeoutExpression; } @@ -753,7 +763,7 @@ private void discardMessage(Message message) { * @param group The group. * @param completedMessages The completed messages. */ - protected abstract void afterRelease(MessageGroup group, Collection> completedMessages); + protected abstract void afterRelease(MessageGroup group, @Nullable Collection> completedMessages); /** * Subclasses may override if special action is needed because the group was released or discarded @@ -912,14 +922,12 @@ protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock) } protected void completeGroup(Object correlationKey, MessageGroup group, Lock lock) { - Message first = null; - if (group != null) { - first = group.getOne(); - } + Message first = group.getOne(); completeGroup(first, correlationKey, group, lock); } @SuppressWarnings("unchecked") + @Nullable protected Collection> completeGroup(Message message, Object correlationKey, MessageGroup group, Lock lock) { @@ -929,6 +937,7 @@ protected Collection> completeGroup(Message message, Object correl this.logger.debug(() -> "Completing group with correlationKey [" + correlationKey + "]"); result = this.outputProcessor.processMessageGroup(group); + Assert.state(result != null, "The processorMessageGroup returned a null result. Null result is not expected."); if (isResultCollectionOfMessages(result)) { partialSequence = (Collection>) result; } @@ -988,6 +997,7 @@ private static boolean isResultCollectionOfMessages(Object result) { return false; } + @Nullable protected Object obtainGroupTimeout(MessageGroup group) { if (this.groupTimeoutExpression != null) { Object timeout = this.groupTimeoutExpression.getValue(this.evaluationContext, group); @@ -1062,6 +1072,7 @@ public void purgeOrphanedGroups() { protected static class SequenceAwareMessageGroup extends SimpleMessageGroup { + @Nullable private final SimpleMessageGroup sourceGroup; public SequenceAwareMessageGroup(MessageGroup messageGroup) { @@ -1124,6 +1135,7 @@ private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor } @Override + @Nullable public Object processMessageGroup(MessageGroup group) { forceComplete(group); return null; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java index fdeeb7eb819..ad8b69ffee5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.Collection; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; @@ -95,7 +97,7 @@ protected boolean shouldSplitOutput(Iterable reply) { * @param completedMessages The completed messages. Ignored in this implementation. */ @Override - protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { + protected void afterRelease(MessageGroup messageGroup, @Nullable Collection> completedMessages) { Object groupId = messageGroup.getGroupId(); MessageGroupStore messageStore = getMessageStore(); messageStore.completeGroup(groupId); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java index 5d1d28e0ba5..1d479f75f83 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java @@ -68,8 +68,10 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler private final MessageGroupProcessor messageGroupProcessor; + @Nullable private String discardChannelName; + @Nullable private MessageChannel discardChannel; /** @@ -159,7 +161,7 @@ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, Correlati * @since 5.4 */ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor, - CorrelationStrategy correlationStrategy) { + @Nullable CorrelationStrategy correlationStrategy) { Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null"); this.messageGroupProcessor = outputProcessor; @@ -218,6 +220,7 @@ public IntegrationPatternType getIntegrationPatternType() { } @Override + @Nullable protected Object handleRequestMessage(Message requestMessage) { Object key = this.correlationStrategy.getCorrelationKey(requestMessage); if (key == null) { @@ -247,6 +250,7 @@ protected Object handleRequestMessage(Message requestMessage) { return null; } + @Nullable private Object processRelease(Object key, Message requestMessage, Message releaseMessage) { this.suspensions.remove(key); if (releaseMessage.getPayload() instanceof Throwable) { @@ -266,6 +270,7 @@ private Object processRelease(Object key, Message requestMessage, Message * @param releaseMessage the release message. * @return the result. */ + @Nullable protected Object buildResult(Object key, Message requestMessage, Message releaseMessage) { SimpleMessageGroup group = new SimpleMessageGroup(key); group.add(requestMessage); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java index 5c074928c3e..c3da415afcc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.jspecify.annotations.Nullable; + import org.springframework.core.log.LogMessage; import org.springframework.integration.core.MessageSource; import org.springframework.integration.handler.AbstractMessageHandler; @@ -27,6 +29,7 @@ import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.SimpleMessageStore; import org.springframework.messaging.Message; +import org.springframework.util.Assert; /** * This Endpoint serves as a barrier for messages that should not be processed yet. The decision when a message can be @@ -58,8 +61,10 @@ public class CorrelatingMessageBarrier extends AbstractMessageHandler implements private final MessageGroupStore store; + @Nullable private CorrelationStrategy correlationStrategy; + @Nullable private ReleaseStrategy releaseStrategy; public CorrelatingMessageBarrier() { @@ -88,7 +93,9 @@ public void setReleaseStrategy(ReleaseStrategy releaseStrategy) { @Override protected void handleMessageInternal(Message message) { + Assert.notNull(this.correlationStrategy, "'correlationStrategy' must not be null"); Object correlationKey = this.correlationStrategy.getCorrelationKey(message); + Assert.notNull(correlationKey, "The correlation key is required"); Object lock = getLock(correlationKey); synchronized (lock) { this.store.addMessagesToGroup(correlationKey, message); @@ -103,12 +110,14 @@ private Object getLock(Object correlationKey) { @SuppressWarnings("unchecked") @Override + @Nullable public Message receive() { for (Object key : this.correlationLocks.keySet()) { Object lock = getLock(key); synchronized (lock) { MessageGroup group = this.store.getMessageGroup(key); //group might be removed by another thread + Assert.notNull(this.releaseStrategy, "'releaseStrategy' must not be null"); if (group != null && this.releaseStrategy.canRelease(group)) { Message nextMessage = null; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelationStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelationStrategy.java index f08e87abeca..5bd6e3fcbab 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelationStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.aggregator; +import org.jspecify.annotations.Nullable; + import org.springframework.messaging.Message; /** @@ -35,6 +37,7 @@ public interface CorrelationStrategy { * @param message The message. * @return The correlation key. */ + @Nullable Object getCorrelationKey(Message message); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DelegatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DelegatingMessageGroupProcessor.java index d6ed43cf8ca..7df1a29d14d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DelegatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DelegatingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,7 @@ public class DelegatingMessageGroupProcessor implements MessageGroupProcessor, B private volatile boolean messageBuilderFactorySet; + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; public DelegatingMessageGroupProcessor(MessageGroupProcessor delegate, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java index e4f27b46538..ed842e2b194 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.core.convert.ConversionService; import org.springframework.integration.store.MessageGroup; +import org.springframework.util.Assert; /** * A {@link MessageGroupProcessor} implementation that evaluates a SpEL expression. The SpEL context root is the list of @@ -59,7 +60,9 @@ public void setExpectedType(Class expectedType) { */ @Override protected Object aggregatePayloads(MessageGroup group, Map headers) { - return this.processor.process(group.getMessages()); + Object object = this.processor.process(group.getMessages()); + Assert.state(object != null, "The process returned a null result. Null result is not expected."); + return object; } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageListProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageListProcessor.java index 7ef5c73976f..85d31c1194a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageListProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageListProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.Collection; +import org.jspecify.annotations.Nullable; + import org.springframework.expression.Expression; import org.springframework.expression.ParseException; import org.springframework.integration.util.AbstractExpressionEvaluator; @@ -37,6 +39,7 @@ public class ExpressionEvaluatingMessageListProcessor extends AbstractExpression private final Expression expression; + @Nullable private volatile Class expectedType = null; /** @@ -102,7 +105,9 @@ public void setExpectedType(Class expectedType) { */ @Override public Object process(Collection> messages) { - return this.evaluateExpression(this.expression, messages, this.expectedType); + Object object = this.evaluateExpression(this.expression, messages, this.expectedType); + Assert.state(object != null, "The evaluation of the expression returned a null. Null result is not expected." + this.expression); + return object; } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java index 275e36ba07d..6a0aef85e89 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,12 @@ package org.springframework.integration.aggregator; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; +import org.jspecify.annotations.Nullable; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -62,18 +64,23 @@ public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandle private CorrelationStrategy correlationStrategy = new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID); + @Nullable private Predicate> boundaryTrigger; - private Function, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader; + private Function, @Nullable Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader; + @Nullable private Function>, Flux>>> windowConfigurer; + @Nullable private Duration windowTimespan; private Function>, Mono>> combineFunction = this::messageForWindowFlux; + @SuppressWarnings("NullAway.Init") private FluxSink> sink; + @Nullable private volatile Disposable subscription; /** @@ -90,7 +97,9 @@ public FluxAggregatorMessageHandler() { } private Object groupBy(Message message) { - return this.correlationStrategy.getCorrelationKey(message); + Object result = this.correlationStrategy.getCorrelationKey(message); + Assert.notNull(result, "Correlation key cannot be null"); + return result; } private Flux> releaseBy(Flux> groupFlux) { @@ -106,7 +115,8 @@ private Flux>> applyWindowOptions(Flux> groupFlux) { return groupFlux .switchOnFirst((signal, group) -> { if (signal.hasValue()) { - Integer maxSize = this.windowSizeFunction.apply(signal.get()); + Assert.notNull(this.windowSizeFunction, "'windowSizeFunction' must not be null"); + Integer maxSize = this.windowSizeFunction.apply(Objects.requireNonNull(signal.get())); if (maxSize != null) { if (this.windowTimespan != null) { return group.windowTimeout(maxSize, this.windowTimespan); @@ -185,13 +195,13 @@ public void setWindowSize(int windowSize) { /** * Specify a {@link Function} to determine a size for windows to close against the first message in group. * Tne result of the function can be combined with the {@link #setWindowTimespan(Duration)}. - * By default an {@link IntegrationMessageHeaderAccessor#SEQUENCE_SIZE} header is consulted. + * By default, an {@link IntegrationMessageHeaderAccessor#SEQUENCE_SIZE} header is consulted. * @param windowSizeFunction the {@link Function} to use to determine a window size * against a first message in the group. * @see Flux#window(int) * @see Flux#windowTimeout(int, Duration) */ - public void setWindowSizeFunction(Function, Integer> windowSizeFunction) { + public void setWindowSizeFunction(Function, @Nullable Integer> windowSizeFunction) { Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null"); this.windowSizeFunction = windowSizeFunction; } @@ -243,8 +253,9 @@ public void start() { @Override public void stop() { - if (this.subscribed.compareAndSet(true, false) && this.subscription != null) { - this.subscription.dispose(); + Disposable subscriptionToDispose = this.subscription; + if (this.subscribed.compareAndSet(true, false) && subscriptionToDispose != null) { + subscriptionToDispose.dispose(); } } @@ -277,8 +288,7 @@ private Mono> messageForWindowFlux(Flux> messageFlux) { .build()); } - private static Integer sequenceSizeHeader(Message message) { + private static @Nullable Integer sequenceSizeHeader(Message message) { return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class); } - } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/HeaderAttributeCorrelationStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/HeaderAttributeCorrelationStrategy.java index 777c23bbac3..928a5a2ec6d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/HeaderAttributeCorrelationStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/HeaderAttributeCorrelationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.aggregator; +import org.jspecify.annotations.Nullable; + import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -35,6 +37,7 @@ public HeaderAttributeCorrelationStrategy(String attributeName) { this.attributeName = attributeName; } + @Nullable public Object getCorrelationKey(Message message) { return message.getHeaders().get(this.attributeName); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageGroupProcessor.java index 83b28fce2ee..7ba26209fc9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.aggregator; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.store.MessageGroup; /** @@ -37,6 +39,7 @@ public interface MessageGroupProcessor { * @param group The message group. * @return The result of processing the group. */ + @Nullable Object processMessageGroup(MessageGroup group); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java index a147da20cfa..49271e48f27 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingCorrelationStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingCorrelationStrategy.java index c78e056a028..1a40b7b43a7 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingCorrelationStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingCorrelationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.lang.reflect.Method; +import org.jspecify.annotations.Nullable; + import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -57,6 +59,7 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { } @Override + @Nullable public Object getCorrelationKey(Message message) { return this.processor.processMessage(message); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java index 773601ca58a..580f6b1477a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.springframework.integration.store.MessageGroup; import org.springframework.integration.support.management.ManageableLifecycle; import org.springframework.messaging.Message; +import org.springframework.util.Assert; /** * MessageGroupProcessor that serves as an adapter for the invocation of a POJO method. @@ -87,7 +88,9 @@ public void setBeanFactory(BeanFactory beanFactory) { @Override protected final Object aggregatePayloads(MessageGroup group, Map headers) { final Collection> messagesUpForProcessing = group.getMessages(); - return this.processor.process(messagesUpForProcessing, headers); + Object object = this.processor.process(messagesUpForProcessing, headers); + Assert.state(object != null, "The process returned a null result. Null result is not expected."); + return object; } @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageListProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageListProcessor.java index eea61289da7..0eedc66f6b6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageListProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageListProcessor.java @@ -22,6 +22,7 @@ import java.util.Map; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.handler.support.MessagingMethodInvokerHelper; @@ -89,7 +90,7 @@ public String toString() { } @SuppressWarnings("unchecked") - public T process(Collection> messages, Map aggregateHeaders) { + public @Nullable T process(Collection> messages, @Nullable Map aggregateHeaders) { return (T) this.delegate.process(messages, aggregateHeaders); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java index b52864bd31d..952f72d90e3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ public void setBeanFactory(BeanFactory beanFactory) { @Override public boolean canRelease(MessageGroup messages) { - return this.adapter.process(messages.getMessages(), null); + return Boolean.TRUE.equals(this.adapter.process(messages.getMessages(), null)); } @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java index 154351860f8..f6aaaa17304 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import java.util.Comparator; import java.util.List; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.store.MessageGroup; import org.springframework.messaging.Message; @@ -40,6 +42,7 @@ public class ResequencingMessageGroupProcessor implements MessageGroupProcessor private final Comparator> comparator = new MessageSequenceComparator(); + @Nullable public Object processMessageGroup(MessageGroup group) { Collection> messages = group.getMessages(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java index 6d986702863..587a597c970 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.Collection; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; @@ -78,7 +80,7 @@ protected boolean shouldCopyRequestHeaders() { } @Override - protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { + protected void afterRelease(MessageGroup messageGroup, @Nullable Collection> completedMessages) { afterRelease(messageGroup, completedMessages, false); } @@ -90,7 +92,7 @@ protected void afterRelease(MessageGroup messageGroup, Collection> co * @param timeout True if the release/discard was due to a timeout. */ @Override - protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages, boolean timeout) { + protected void afterRelease(MessageGroup messageGroup, @Nullable Collection> completedMessages, boolean timeout) { int size = messageGroup.size(); int sequenceSize = messageGroup.getSequenceSize(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/package-info.java index 2ac1aa53803..7deb51b0ff1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes related to message aggregation. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.aggregator; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/ExpressionCapable.java b/spring-integration-core/src/main/java/org/springframework/integration/context/ExpressionCapable.java index cedeb5bdafb..c80f8240a04 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/ExpressionCapable.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/ExpressionCapable.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.context; +import org.jspecify.annotations.Nullable; + import org.springframework.expression.Expression; /** @@ -32,6 +34,7 @@ public interface ExpressionCapable { * Return the primary SpEL expression if this component is expression-based. * @return the expression as a String. */ + @Nullable Expression getExpression(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index dd605557cf9..f1626b83b5e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -16,6 +16,8 @@ package org.springframework.integration.context; +import org.jspecify.annotations.Nullable; + import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanDefinition; @@ -120,6 +122,7 @@ public abstract class IntegrationContextUtils { * @param beanFactory BeanFactory for lookup, must not be null. * @return The {@link MetadataStore} bean whose name is "metadataStore". */ + @Nullable public static MetadataStore getMetadataStore(BeanFactory beanFactory) { return getBeanOfType(beanFactory, METADATA_STORE_BEAN_NAME, MetadataStore.class); } @@ -129,7 +132,9 @@ public static MetadataStore getMetadataStore(BeanFactory beanFactory) { * @return The {@link MessageChannel} bean whose name is "errorChannel". */ public static MessageChannel getErrorChannel(BeanFactory beanFactory) { - return getBeanOfType(beanFactory, ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); + MessageChannel channel = getBeanOfType(beanFactory, ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); + Assert.state(channel != null, "Error Channel was not found"); + return channel; } /** @@ -137,7 +142,9 @@ public static MessageChannel getErrorChannel(BeanFactory beanFactory) { * @return The {@link TaskScheduler} bean whose name is "taskScheduler" if available. */ public static TaskScheduler getTaskScheduler(BeanFactory beanFactory) { - return getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class); + TaskScheduler taskScheduler = getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class); + Assert.state(taskScheduler != null, "No such bean '" + TASK_SCHEDULER_BEAN_NAME + "'"); + return taskScheduler; } /** @@ -156,6 +163,7 @@ public static TaskScheduler getRequiredTaskScheduler(BeanFactory beanFactory) { * @return the instance of {@link StandardEvaluationContext} bean whose name is * {@value #INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME}. */ + @Nullable public static StandardEvaluationContext getEvaluationContext(BeanFactory beanFactory) { return getBeanOfType(beanFactory, INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME, StandardEvaluationContext.class); } @@ -166,11 +174,13 @@ public static StandardEvaluationContext getEvaluationContext(BeanFactory beanFac * {@value #INTEGRATION_SIMPLE_EVALUATION_CONTEXT_BEAN_NAME}. * @since 4.3.15 */ + @Nullable public static SimpleEvaluationContext getSimpleEvaluationContext(BeanFactory beanFactory) { return getBeanOfType(beanFactory, INTEGRATION_SIMPLE_EVALUATION_CONTEXT_BEAN_NAME, SimpleEvaluationContext.class); } + @Nullable private static T getBeanOfType(BeanFactory beanFactory, String beanName, Class type) { Assert.notNull(beanFactory, "BeanFactory must not be null"); if (!beanFactory.containsBean(beanName)) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java index de14144d7d7..29326a7f3f9 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java @@ -72,38 +72,50 @@ public abstract class IntegrationObjectSupport implements ComponentSourceAware, protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR protected + @SuppressWarnings("NullAway.Init") private DestinationResolver channelResolver; + @SuppressWarnings("NullAway.Init") private String beanName; + @Nullable private String componentName; + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; + @Nullable private TaskScheduler taskScheduler; private IntegrationProperties integrationProperties = new IntegrationProperties(); + @Nullable private ConversionService conversionService; + @SuppressWarnings("NullAway.Init") private ApplicationContext applicationContext; + @SuppressWarnings("NullAway.Init") private MessageBuilderFactory messageBuilderFactory; + @Nullable private Expression expression; + @Nullable private Object beanSource; + @Nullable private String beanDescription; private boolean initialized; @Override - public final void setBeanName(@Nullable String beanName) { + public final void setBeanName(String beanName) { this.beanName = beanName; } @Override + @Nullable public String getBeanName() { return this.beanName; } @@ -113,6 +125,7 @@ public String getBeanName() { * If {@link #componentName} was not set this method will default to the 'beanName' of this component; */ @Override + @Nullable public String getComponentName() { return StringUtils.hasText(this.componentName) ? this.componentName : this.beanName; } @@ -129,6 +142,7 @@ public void setComponentName(String componentName) { * Subclasses may implement this method to provide component type information. */ @Override + @Nullable public String getComponentType() { return null; } @@ -138,8 +152,8 @@ public void setComponentSource(Object source) { this.beanSource = source; } - @Nullable @Override + @Nullable public Object getComponentSource() { return this.beanSource; } @@ -149,8 +163,8 @@ public void setComponentDescription(String description) { this.beanDescription = description; } - @Nullable @Override + @Nullable public String getComponentDescription() { return this.beanDescription; } @@ -195,6 +209,7 @@ public void setChannelResolver(DestinationResolver channelResolv } @Override + @Nullable public Expression getExpression() { return this.expression; } @@ -209,6 +224,7 @@ public final void setPrimaryExpression(Expression expression) { } @Override + @SuppressWarnings("NullAway") public final void afterPropertiesSet() { this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory); if (this.messageBuilderFactory == null) { @@ -268,6 +284,7 @@ protected TaskScheduler getTaskScheduler() { if (this.taskScheduler == null && this.beanFactory != null) { this.taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory); } + Assert.notNull(this.taskScheduler, "'taskScheduler' must not be null"); return this.taskScheduler; } @@ -278,6 +295,7 @@ protected DestinationResolver getChannelResolver() { return this.channelResolver; } + @Nullable public ConversionService getConversionService() { if (this.conversionService == null && this.beanFactory != null) { this.conversionService = IntegrationUtils.getConversionService(this.beanFactory); @@ -299,6 +317,7 @@ public void setConversionService(ConversionService conversionService) { * {@link ApplicationContext} is available. * @return The id, or null if there is no application context. */ + @Nullable public String getApplicationContextId() { return this.applicationContext == null ? null : this.applicationContext.getId(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java index d31bd8c9303..3d005ad2076 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import java.util.Arrays; import java.util.Properties; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.JavaUtils; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -139,6 +141,7 @@ public final class IntegrationProperties { private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + @Nullable private volatile Properties properties; static { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/context/package-info.java index 1acc7924df0..e72e965c195 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes relating to application context configuration. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.context; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java index fa6c4770b63..27faee4246a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java @@ -54,10 +54,13 @@ public class ErrorMessagePublisher implements BeanFactoryAware { protected final MessagingTemplate messagingTemplate = new MessagingTemplate(); // NOSONAR final + @SuppressWarnings("NullAway.Init") private DestinationResolver channelResolver; + @Nullable private MessageChannel channel; + @Nullable private String channelName; private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); @@ -79,6 +82,7 @@ public ErrorMessageStrategy getErrorMessageStrategy() { return this.errorMessageStrategy; } + @Nullable public MessageChannel getChannel() { populateChannel(); return this.channel; @@ -146,7 +150,7 @@ public void publish(Message inputMessage, MessagingException exception) { * @param failedMessage the message. * @param throwable the throwable. */ - public void publish(@Nullable Message inputMessage, Message failedMessage, Throwable throwable) { + public void publish(@Nullable Message inputMessage, @Nullable Message failedMessage, Throwable throwable) { publish(throwable, ErrorMessageUtils.getAttributeAccessor(inputMessage, failedMessage)); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java b/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java index 95552bb7285..1d2da3fb15a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java @@ -44,6 +44,7 @@ public class MessagingTemplate extends GenericMessagingTemplate { private final Lock lock = new ReentrantLock(); + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; private volatile boolean throwExceptionOnLateReplySet; @@ -84,7 +85,7 @@ public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { * backward compatibility. * @param channel the channel to set. */ - public void setDefaultChannel(MessageChannel channel) { + public void setDefaultChannel(@Nullable MessageChannel channel) { super.setDefaultDestination(channel); } @@ -109,6 +110,7 @@ public Message sendAndReceive(MessageChannel destination, Message requestM return super.sendAndReceive(destination, requestMessage); } + @Nullable public Object receiveAndConvert(MessageChannel destination, long timeout) { Message message = doReceive(destination, timeout); if (message != null) { @@ -119,6 +121,7 @@ public Object receiveAndConvert(MessageChannel destination, long timeout) { } } + @Nullable public Message receive(MessageChannel destination, long timeout) { return doReceive(destination, timeout); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/core/package-info.java index b6f850c4881..b1ecdac03f3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/package-info.java @@ -1,4 +1,5 @@ /** * Provides core classes. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.core; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java index 6ddbb415807..87d91025a2d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; /** * @author Artem Bilan @@ -159,6 +160,7 @@ void testCustomCombineFunction() { void testWindowTimespan() { QueueChannel resultChannel = new QueueChannel(); FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler(); + fluxAggregatorMessageHandler.setTaskScheduler(mock()); fluxAggregatorMessageHandler.setOutputChannel(resultChannel); fluxAggregatorMessageHandler.setWindowTimespan(Duration.ofMillis(100)); fluxAggregatorMessageHandler.start(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java index 502bbd79dde..ab41c2b1766 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2024 the original author or authors. + * Copyright 2013-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -238,6 +239,7 @@ public void testBFCRNoRegistry() { @Test public void testRemoveOnGet() { DefaultHeaderChannelRegistry registry = new DefaultHeaderChannelRegistry(); + registry.setTaskScheduler(new SimpleAsyncTaskScheduler()); MessageChannel channel = new DirectChannel(); String foo = (String) registry.channelToChannelName(channel); Map map = TestUtils.getPropertyValue(registry, "channels", Map.class); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index 54a2cdf9477..9e215b32305 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -57,6 +57,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -130,6 +131,7 @@ void testReactiveFlow() throws Exception { @Test void testPollableReactiveFlow() throws Exception { assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class); + this.reactiveTransformer.setTaskScheduler(new SimpleAsyncTaskScheduler()); this.inputChannel.send(new GenericMessage<>("1,2,3,4,5")); CountDownLatch latch = new CountDownLatch(6); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java index ba168382492..dd4509ed491 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import reactor.test.StepVerifier; import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.MessageDispatchingException; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.GatewayHeader; @@ -42,6 +41,7 @@ import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -63,9 +63,7 @@ public void futureWithMessageReturned() throws Exception { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Future> f = service.returnMessage("foo"); @@ -86,9 +84,7 @@ protected boolean doSend(Message message, long timeout) { }; GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(channel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(channel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Future> f = service.returnMessage("foo"); @@ -104,9 +100,7 @@ public void listenableFutureWithMessageReturned() throws Exception { addThreadEnricher(requestChannel); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); CompletableFuture> f = service.returnMessageListenable("foo"); @@ -130,9 +124,7 @@ public void customFutureReturned() { addThreadEnricher(requestChannel); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); CustomFuture f = service.returnCustomFuture("foo"); @@ -147,10 +139,7 @@ public void nonAsyncFutureReturned() { addThreadEnricher(requestChannel); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); - + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.setAsyncExecutor(null); // Not async - user flow returns Future proxyFactory.afterPropertiesSet(); @@ -179,9 +168,7 @@ public void futureWithPayloadReturned() throws Exception { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Future f = service.returnString("foo"); @@ -195,9 +182,7 @@ public void futureWithWildcardReturned() throws Exception { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Future f = service.returnSomething("foo"); @@ -209,9 +194,7 @@ public void futureWithWildcardReturned() throws Exception { @Test public void futureVoid() throws Exception { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(new NullChannel()); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(new NullChannel(), proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Future f = service.asyncSendAndForget("test1"); @@ -249,9 +232,7 @@ public void futureVoidReply() throws Exception { } }).start(); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.setAsyncExecutor(null); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); @@ -266,9 +247,7 @@ public void monoWithMessageReturned() { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); - proxyFactory.setBeanName("testGateway"); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Mono> mono = service.returnMessagePromise("foo"); @@ -281,9 +260,7 @@ public void monoWithPayloadReturned() { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); - proxyFactory.setBeanName("testGateway"); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Mono mono = service.returnStringPromise("foo"); @@ -296,9 +273,7 @@ public void monoWithWildcardReturned() { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); - proxyFactory.setBeanName("testGateway"); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Mono mono = service.returnSomethingPromise("foo"); @@ -312,9 +287,7 @@ public void monoWithConsumer() { QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); - proxyFactory.setBeanName("testGateway"); + setupProxyFactory(requestChannel, proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Mono mono = service.returnStringPromise("foo"); @@ -327,9 +300,7 @@ public void monoWithConsumer() { @Test public void monoVoid() throws InterruptedException { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class); - proxyFactory.setDefaultRequestChannel(new NullChannel()); - proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + setupProxyFactory(new NullChannel(), proxyFactory); proxyFactory.afterPropertiesSet(); TestEchoService service = proxyFactory.getObject(); Mono mono = service.monoVoid("test1"); @@ -402,6 +373,13 @@ private interface TestEchoService { } + private static void setupProxyFactory(MessageChannel messageChannel, GatewayProxyFactoryBean proxyFactory) { + proxyFactory.setDefaultRequestChannel(messageChannel); + proxyFactory.setBeanName("testGateway"); + proxyFactory.setTaskScheduler(new SimpleAsyncTaskScheduler()); + proxyFactory.setBeanFactory(mock()); + } + private record CustomFuture(String result, Thread thread) implements Future { @Override diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java index a4f360bbc16..5249e62f478 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java @@ -89,6 +89,7 @@ import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; @@ -340,6 +341,7 @@ public void testWithServiceEquals() { bf.registerSingleton("requestChannelBar", channel); bf.registerSingleton("requestChannelBaz", channel); bf.registerSingleton("requestChannelFoo", channel); + bf.registerSingleton("taskScheduler", mock(TaskScheduler.class)); fb.setBeanFactory(bf); fb.afterPropertiesSet(); assertThat(fb.getObject()).isNotSameAs(bar); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java index 70f6253252c..d55192f51eb 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,15 +59,19 @@ import org.springframework.messaging.PollableChannel; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.TaskScheduler; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * @author Mark Fisher @@ -85,7 +89,7 @@ public void testRequestReplyWithAnonymousChannel() { startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.setBeanName("testGateway"); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); @@ -113,7 +117,7 @@ public byte[] convert(String source) { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); DefaultListableBeanFactory bf = new DefaultListableBeanFactory(); bf.registerSingleton(IntegrationUtils.INTEGRATION_CONVERSION_SERVICE_BEAN_NAME, cs); - + bf.registerSingleton("taskScheduler", mock(TaskScheduler.class)); proxyFactory.setBeanFactory(bf); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); @@ -121,7 +125,7 @@ public byte[] convert(String source) { TestService service = proxyFactory.getObject(); byte[] result = service.requestReplyInBytes("foo"); assertThat(result.length).isEqualTo(6); - Mockito.verify(stringToByteConverter, Mockito.times(1)).convert(Mockito.any(String.class)); + Mockito.verify(stringToByteConverter, Mockito.times(1)).convert(any(String.class)); } @Test @@ -130,7 +134,7 @@ public void testOneWay() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); service.oneWay("test"); @@ -142,7 +146,7 @@ public void testOneWay() { @Test public void testOneWayIgnoreReply() { DirectChannel requestChannel = new DirectChannel(); - BeanFactory beanFactory = mock(BeanFactory.class); + BeanFactory beanFactory = getBeanFactory(); QueueChannel nullChannel = new QueueChannel(); willReturn(nullChannel) .given(beanFactory) @@ -173,7 +177,7 @@ public void testSolicitResponse() { proxyFactory.setDefaultRequestChannel(new DirectChannel()); proxyFactory.setDefaultReplyChannel(replyChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); String result = service.solicitResponse(); @@ -188,7 +192,7 @@ public void testReceiveMessage() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultReplyChannel(replyChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); Message message = service.getMessage(); @@ -210,7 +214,7 @@ public void testReactiveReplyChannel() { proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setDefaultReplyChannel(replyChannel); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); @@ -229,7 +233,7 @@ public void testRequestReplyWithTypeConversion() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); Integer result = service.requestReplyWithIntegers(123); @@ -300,7 +304,7 @@ public void testMessageAsMethodArgument() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); String result = service.requestReplyWithMessageParameter(new GenericMessage<>("foo")); @@ -314,7 +318,7 @@ public void testNoArgMethodWithPayloadAnnotation() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); String result = service.requestReplyWithPayloadAnnotation(); @@ -338,7 +342,7 @@ public void testMessageAsReturnValue() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(requestChannel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestService service = proxyFactory.getObject(); Message result = service.requestReplyWithMessageReturnValue("foo"); @@ -356,7 +360,7 @@ public void testProxiedToStringMethod() { GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean<>(TestService.class); proxyFactory.setDefaultRequestChannel(new DirectChannel()); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); Object proxy = proxyFactory.getObject(); String expected = "gateway proxy for"; @@ -380,7 +384,7 @@ public void handleMessage(Message message) { consumer.start(); proxyFactory.setDefaultRequestChannel(channel); proxyFactory.setBeanName("testGateway"); - proxyFactory.setBeanFactory(mock(BeanFactory.class)); + proxyFactory.setBeanFactory(getBeanFactory()); proxyFactory.afterPropertiesSet(); TestExceptionThrowingInterface proxy = proxyFactory.getObject(); assertThatExceptionOfType(TestException.class) @@ -398,7 +402,7 @@ private static void startResponder(final PollableChannel requestChannel) { @Test public void testProgrammaticWiring() { GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>(TestEchoService.class); - gpfb.setBeanFactory(mock(BeanFactory.class)); + gpfb.setBeanFactory(getBeanFactory()); QueueChannel drc = new QueueChannel(); gpfb.setDefaultRequestChannel(drc); gpfb.setDefaultReplyTimeout(0L); @@ -414,6 +418,15 @@ public void testProgrammaticWiring() { assertThat(bar).isEqualTo("bar"); } + private BeanFactory getBeanFactory() { + BeanFactory beanFactory = mock(BeanFactory.class); + TaskScheduler taskScheduler = mock(TaskScheduler.class); + when(beanFactory.getBean(eq("taskScheduler"), any(Class.class))) + .thenReturn(taskScheduler); + when(beanFactory.containsBean("taskScheduler")).thenReturn(true); + return beanFactory; + } + @Test public void testIdHeaderOverrideHeaderExpression() { GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>(); @@ -495,7 +508,7 @@ public void autowiredGateway() { @Test public void testOverriddenMethod() { GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>(InheritChild.class); - gpfb.setBeanFactory(mock(BeanFactory.class)); + gpfb.setBeanFactory(getBeanFactory()); gpfb.afterPropertiesSet(); Map gateways = gpfb.getGateways(); assertThat(gateways.size()).isEqualTo(2); @@ -508,6 +521,7 @@ public void testAliasForSupport() throws NoSuchMethodException { beanFactory.registerSingleton("requestChannel", requestChannel); GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>( CompositedGatewayService.class); + beanFactory.registerSingleton("taskScheduler", mock(TaskScheduler.class)); gpfb.setBeanFactory(beanFactory); gpfb.afterPropertiesSet(); Map gateways = gpfb.getGateways(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyMessageMappingTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyMessageMappingTests.java index 47cfb7debcd..11a48f2ec25 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyMessageMappingTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyMessageMappingTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,9 +32,11 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.scheduling.TaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; /** * @author Mark Fisher @@ -58,6 +60,8 @@ public void initializeGateway() { context.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME, new RootBeanDefinition(IntegrationEvaluationContextFactoryBean.class)); context.refresh(); + context.getBeanFactory().registerSingleton("taskScheduler", mock(TaskScheduler.class)); + factoryBean.setBeanFactory(context); factoryBean.afterPropertiesSet(); this.gateway = factoryBean.getObject(); @@ -144,6 +148,7 @@ public void payloadAnnotationAtMethodLevelUsingBeanResolver() { context.registerBeanDefinition("testBean", new RootBeanDefinition(TestBean.class)); context.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME, new RootBeanDefinition(IntegrationEvaluationContextFactoryBean.class)); + context.getBeanFactory().registerSingleton("taskScheduler", mock(TaskScheduler.class)); context.refresh(); TestGateway gateway = context.getBean("testGateway", TestGateway.class); gateway.payloadAnnotationAtMethodLevelUsingBeanResolver("foo"); @@ -171,6 +176,7 @@ public void payloadAnnotationWithExpressionUsingBeanResolver() { context.registerBeanDefinition("testBean", new RootBeanDefinition(TestBean.class)); context.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME, new RootBeanDefinition(IntegrationEvaluationContextFactoryBean.class)); + context.getBeanFactory().registerSingleton("taskScheduler", mock(TaskScheduler.class)); context.refresh(); TestGateway gateway = context.getBean("testGateway", TestGateway.class); gateway.payloadAnnotationWithExpressionUsingBeanResolver("foo"); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/AsyncHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/AsyncHandlerTests.java index c2393ec4c2b..7a981066a4f 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/AsyncHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/AsyncHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ import org.springframework.messaging.core.DestinationResolutionException; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -218,7 +219,8 @@ public void testMessagingExceptionNoErrorChannel() throws Exception { public void testGateway() { this.whichTest = 0; GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>(Foo.class); - gpfb.setBeanFactory(mock(BeanFactory.class)); + gpfb.setBeanFactory(mock()); + gpfb.setTaskScheduler(new SimpleAsyncTaskScheduler()); DirectChannel input = new DirectChannel(); gpfb.setDefaultRequestChannel(input); gpfb.setDefaultReplyTimeout(10000L); @@ -237,7 +239,8 @@ public void testGateway() { public void testGatewayWithException() { this.whichTest = 0; GatewayProxyFactoryBean gpfb = new GatewayProxyFactoryBean<>(Foo.class); - gpfb.setBeanFactory(mock(BeanFactory.class)); + gpfb.setBeanFactory(mock()); + gpfb.setTaskScheduler(new SimpleAsyncTaskScheduler()); DirectChannel input = new DirectChannel(); gpfb.setDefaultRequestChannel(input); gpfb.setDefaultReplyTimeout(10000L); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java index ef365013d17..10bf9c6f329 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/MethodInvokingMessageProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,7 @@ import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.util.StopWatch; import static org.assertj.core.api.Assertions.assertThat; @@ -605,7 +606,8 @@ public void testOverloadedNonVoidReturningMethodsWithExactMatchForType() { @Test public void gatewayTest() throws Exception { GatewayProxyFactoryBean gwFactoryBean = new GatewayProxyFactoryBean<>(); - gwFactoryBean.setBeanFactory(mock(BeanFactory.class)); + gwFactoryBean.setTaskScheduler(new SimpleAsyncTaskScheduler()); + gwFactoryBean.setBeanFactory(mock()); gwFactoryBean.afterPropertiesSet(); Object target = gwFactoryBean.getObject(); // just instantiate a helper with a simple target; we're going to invoke getTargetClass with reflection diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java index bd03f34dd7f..ac99f0a6a1b 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import org.springframework.integration.transformer.ObjectToStringTransformer; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -57,6 +58,7 @@ public void test() throws Exception { ApplicationEventPublisher publisher = e -> { }; AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).getObject(); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); final AtomicReference> received = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); server.registerListener(m -> { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java index a243d6b4684..c68fcd447fe 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,11 +68,14 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; /** * @author Gary Russell @@ -134,6 +137,7 @@ void testTcpAdapters() { AbstractServerConnectionFactory server = Tcp.netServer(0).backlog(2).soTimeout(5000).id("server").getObject(); assertThat(server.getComponentName()).isEqualTo("server"); server.setApplicationEventPublisher(publisher); + server.setTaskScheduler(mock(TaskScheduler.class)); server.afterPropertiesSet(); TcpReceivingChannelAdapter inbound = Tcp.inboundAdapter(server).getObject(); QueueChannel received = new QueueChannel(); @@ -182,6 +186,7 @@ void testUdp() throws Exception { .setHeader("udp_dest", "udp://localhost:" + this.udpInbound.getPort()) .build(); this.udpOut.send(outMessage); + this.udpIn.setTaskScheduler(new SimpleAsyncTaskScheduler()); Message received = this.udpIn.receive(10000); assertThat(received).isNotNull(); assertThat(Transformers.objectToString().transform(received).getPayload()).isEqualTo("foo"); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapterTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapterTests.java index 785237a5bc3..6b05834bf0a 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapterTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,8 @@ import org.springframework.integration.ip.util.TestingUtilities; import org.springframework.messaging.Message; import org.springframework.messaging.SubscribableChannel; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; @@ -68,7 +70,7 @@ public class TcpReceivingChannelAdapterTests extends AbstractTcpChannelAdapterTe @Test public void testNet() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -80,7 +82,8 @@ public void testNet() throws Exception { int port = scf.getPort(); QueueChannel channel = new QueueChannel(); adapter.setOutputChannel(channel); - adapter.setBeanFactory(mock(BeanFactory.class)); + adapter.setBeanFactory(mock()); + adapter.setTaskScheduler(new SimpleAsyncTaskScheduler()); adapter.afterPropertiesSet(); Socket socket = SocketFactory.getDefault().createSocket("localhost", port); socket.getOutputStream().write("Test1\r\n".getBytes()); @@ -155,7 +158,7 @@ public void testNetClientMode() throws Exception { @Test public void testNio() throws Exception { - TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -186,7 +189,7 @@ public void testNio() throws Exception { @Test public void testNetShared() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -220,7 +223,7 @@ public void testNetShared() throws Exception { @Test public void testNioShared() throws Exception { - TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -254,7 +257,7 @@ public void testNioShared() throws Exception { @Test public void testNetSingleNoOutbound() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -286,7 +289,7 @@ public void testNetSingleNoOutbound() throws Exception { @Test public void testNioSingleNoOutbound() throws Exception { - TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -328,7 +331,7 @@ private void readFully(InputStream is, byte[] buff) throws IOException { @Test public void testNetSingleShared() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -365,7 +368,7 @@ public void testNetSingleShared() throws Exception { @Test public void testNioSingleShared() throws Exception { - TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -402,7 +405,7 @@ public void testNioSingleShared() throws Exception { @Test public void testNioSingleSharedMany() throws Exception { - TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); @@ -442,7 +445,7 @@ public void testNioSingleSharedMany() throws Exception { @Test public void testNetInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); interceptorsGuts(scf); scf.stop(); @@ -450,7 +453,7 @@ public void testNetInterceptors() throws Exception { @Test public void testNetSingleNoOutboundInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); singleNoOutboundInterceptorsGuts(scf); scf.stop(); @@ -458,7 +461,7 @@ public void testNetSingleNoOutboundInterceptors() throws Exception { @Test public void testNetSingleSharedInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); singleSharedInterceptorsGuts(scf); scf.stop(); @@ -466,7 +469,7 @@ public void testNetSingleSharedInterceptors() throws Exception { @Test public void testNioInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); interceptorsGuts(scf); scf.stop(); @@ -474,7 +477,7 @@ public void testNioInterceptors() throws Exception { @Test public void testNioSingleNoOutboundInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); singleNoOutboundInterceptorsGuts(scf); scf.stop(); @@ -482,7 +485,7 @@ public void testNioSingleNoOutboundInterceptors() throws Exception { @Test public void testNioSingleSharedInterceptors() throws Exception { - AbstractServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory scf = getDefaultServerConnectionFactory(); noopPublisher(scf); singleSharedInterceptorsGuts(scf); scf.stop(); @@ -623,8 +626,11 @@ public void testException() throws Exception { ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); scf.setDeserializer(serializer); + scf.setTaskScheduler(mock(TaskScheduler.class)); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); adapter.setConnectionFactory(scf); + adapter.setTaskScheduler(mock(TaskScheduler.class)); scf.start(); TestingUtilities.waitListening(scf, null); int port = scf.getPort(); @@ -648,6 +654,12 @@ public void testException() throws Exception { scf.stop(); } + private static AbstractServerConnectionFactory getDefaultServerConnectionFactory() { + AbstractServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + scf.setTaskScheduler(new SimpleAsyncTaskScheduler()); + return scf; + } + private class FailingService { @SuppressWarnings("unused") diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index 8c6bb858c60..8d9b93ff9b9 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,7 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; @@ -1203,6 +1204,7 @@ public void testConnectionException() throws Exception { public void testInterceptedConnection() throws Exception { final CountDownLatch latch = new CountDownLatch(1); AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + scf.setTaskScheduler(new SimpleAsyncTaskScheduler()); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); scf.setDeserializer(serializer); @@ -1236,6 +1238,7 @@ public void testInterceptedConnection() throws Exception { public void testInterceptedCleanup() throws Exception { final CountDownLatch latch = new CountDownLatch(1); AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + scf.setTaskScheduler(new SimpleAsyncTaskScheduler()); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); scf.setSerializer(serializer); scf.setDeserializer(serializer); @@ -1260,5 +1263,4 @@ public void testInterceptedCleanup() throws Exception { await().untilAsserted(() -> assertThat(handler.getConnections()).isEmpty()); scf.stop(); } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java index 190fcf92d32..4520a0c61c2 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,7 @@ import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -698,6 +699,7 @@ public void testRealConnection() throws Exception { @Test //INT-3722 public void testGatewayRelease() { TcpNetServerConnectionFactory in = new TcpNetServerConnectionFactory(0); + in.setTaskScheduler(new SimpleAsyncTaskScheduler()); in.setApplicationEventPublisher(mock(ApplicationEventPublisher.class)); final TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); handler.setConnectionFactory(in); @@ -820,5 +822,4 @@ private static AbstractClientConnectionFactory createFactoryWithMockConnection(T when(factory.isActive()).thenReturn(true); return factory; } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java index c298fcf994d..93c3ba3fd79 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,14 +40,12 @@ import org.mockito.ArgumentCaptor; import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.BeanFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.log.LogAccessor; import org.springframework.core.log.LogMessage; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.ip.config.TcpConnectionFactoryFactoryBean; import org.springframework.integration.ip.event.IpIntegrationEvent; import org.springframework.integration.ip.tcp.TcpOutboundGateway; @@ -57,7 +55,7 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; -import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; @@ -84,7 +82,7 @@ public class ConnectionFactoryTests { @Test void netOpenEventOnReadThread() throws InterruptedException, IOException { - TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); + TcpNetServerConnectionFactory server = getTcpNetServerConnectionFactory(0); AtomicReference readThread = new AtomicReference<>(); AtomicReference openEventThread = new AtomicReference<>(); CountDownLatch latch1 = new CountDownLatch(1); @@ -162,10 +160,8 @@ public void testObtainConnectionIds(AbstractServerConnectionFactory serverFactor ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.afterPropertiesSet(); - BeanFactory bf = mock(BeanFactory.class); - when(bf.containsBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)).thenReturn(true); - when(bf.getBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class)).thenReturn(scheduler); - serverFactory.setBeanFactory(bf); + + serverFactory.setTaskScheduler(new SimpleAsyncTaskScheduler()); TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); adapter.setOutputChannel(new NullChannel()); adapter.setConnectionFactory(serverFactory); @@ -273,22 +269,22 @@ private void testEarlyClose(final AbstractServerConnectionFactory factory, Strin @Test void healthCheckSuccessNet() throws InterruptedException { - healthCheckSuccess(new TcpNetServerConnectionFactory(0), false); + healthCheckSuccess(getTcpNetServerConnectionFactory(0), false); } @Test void healthCheckSuccessNio() throws InterruptedException { - healthCheckSuccess(new TcpNioServerConnectionFactory(0), false); + healthCheckSuccess(getTcpNetServerConnectionFactory(0), false); } @Test void healthCheckFailureNet() throws InterruptedException { - healthCheckSuccess(new TcpNetServerConnectionFactory(0), true); + healthCheckSuccess(getTcpNetServerConnectionFactory(0), true); } @Test void healthCheckFailureNio() throws InterruptedException { - healthCheckSuccess(new TcpNioServerConnectionFactory(0), true); + healthCheckSuccess(getTcpNetServerConnectionFactory(0), true); } private void healthCheckSuccess(AbstractServerConnectionFactory server, boolean fail) throws InterruptedException { @@ -298,7 +294,7 @@ private void healthCheckSuccess(AbstractServerConnectionFactory server, boolean serverUp.countDown(); } }); - server.setBeanFactory(mock(BeanFactory.class)); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); AtomicReference connection = new AtomicReference<>(); server.registerSender(conn -> { connection.set(conn); @@ -367,6 +363,12 @@ private void healthCheckSuccess(AbstractServerConnectionFactory server, boolean server.stop(); } + private TcpNetServerConnectionFactory getTcpNetServerConnectionFactory(int port) { + TcpNetServerConnectionFactory result = new TcpNetServerConnectionFactory(port); + result.setTaskScheduler(new SimpleAsyncTaskScheduler()); + return result; + } + @SuppressWarnings("serial") private class FooEvent extends TcpConnectionOpenEvent { @@ -375,5 +377,4 @@ private class FooEvent extends TcpConnectionOpenEvent { } } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index 2733937d29d..f7249518fd2 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -54,6 +54,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static org.assertj.core.api.Assertions.assertThat; @@ -328,39 +329,39 @@ public TcpConnectionSupport makeMockConnection() { @Test public void testRealNet() throws Exception { - AbstractServerConnectionFactory server1 = new TcpNetServerConnectionFactory(0); - AbstractServerConnectionFactory server2 = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server1 = getTcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server2 = getTcpNetServerConnectionFactory(0); Holder holder = setupAndStartServers(server1, server2); - AbstractClientConnectionFactory client1 = new TcpNetClientConnectionFactory("localhost", server1.getPort()); - AbstractClientConnectionFactory client2 = new TcpNetClientConnectionFactory("localhost", server2.getPort()); + AbstractClientConnectionFactory client1 = getTcpNetServerConnectionFactory("localhost", server1.getPort()); + AbstractClientConnectionFactory client2 = getTcpNetServerConnectionFactory("localhost", server2.getPort()); testRealGuts(client1, client2, holder); } @Test public void testRealNio() throws Exception { - AbstractServerConnectionFactory server1 = new TcpNioServerConnectionFactory(0); - AbstractServerConnectionFactory server2 = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory server1 = getTcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server2 = getTcpNetServerConnectionFactory(0); Holder holder = setupAndStartServers(server1, server2); - AbstractClientConnectionFactory client1 = new TcpNioClientConnectionFactory("localhost", server1.getPort()); - AbstractClientConnectionFactory client2 = new TcpNioClientConnectionFactory("localhost", server2.getPort()); + AbstractClientConnectionFactory client1 = getTcpNetServerConnectionFactory("localhost", server1.getPort()); + AbstractClientConnectionFactory client2 = getTcpNetServerConnectionFactory("localhost", server2.getPort()); testRealGuts(client1, client2, holder); } @Test public void testRealNetSingleUse() throws Exception { - AbstractServerConnectionFactory server1 = new TcpNetServerConnectionFactory(0); - AbstractServerConnectionFactory server2 = new TcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server1 = getTcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server2 = getTcpNetServerConnectionFactory(0); Holder holder = setupAndStartServers(server1, server2); - AbstractClientConnectionFactory client1 = new TcpNetClientConnectionFactory("localhost", server1.getPort()); - AbstractClientConnectionFactory client2 = new TcpNetClientConnectionFactory("localhost", server2.getPort()); + AbstractClientConnectionFactory client1 = getTcpNetServerConnectionFactory("localhost", server1.getPort()); + AbstractClientConnectionFactory client2 = getTcpNetServerConnectionFactory("localhost", server2.getPort()); client1.setSingleUse(true); client2.setSingleUse(true); testRealGuts(client1, client2, holder); @@ -369,13 +370,13 @@ public void testRealNetSingleUse() throws Exception { @Test public void testRealNioSingleUse() throws Exception { - AbstractServerConnectionFactory server1 = new TcpNioServerConnectionFactory(0); - AbstractServerConnectionFactory server2 = new TcpNioServerConnectionFactory(0); + AbstractServerConnectionFactory server1 = getTcpNetServerConnectionFactory(0); + AbstractServerConnectionFactory server2 = getTcpNetServerConnectionFactory(0); Holder holder = setupAndStartServers(server1, server2); - AbstractClientConnectionFactory client1 = new TcpNioClientConnectionFactory("localhost", server1.getPort()); - AbstractClientConnectionFactory client2 = new TcpNioClientConnectionFactory("localhost", server2.getPort()); + AbstractClientConnectionFactory client1 = getTcpNetServerConnectionFactory("localhost", server1.getPort()); + AbstractClientConnectionFactory client2 = getTcpNetServerConnectionFactory("localhost", server2.getPort()); client1.setSingleUse(true); client2.setSingleUse(true); testRealGuts(client1, client2, holder); @@ -698,5 +699,19 @@ private static AbstractClientConnectionFactory createFactoryWithMockConnection(T return factory; } + private TcpNetServerConnectionFactory getTcpNetServerConnectionFactory(int port) { + TcpNetServerConnectionFactory result = new TcpNetServerConnectionFactory(port); + result.setTaskScheduler(new SimpleAsyncTaskScheduler()); + + return result; + } + + private static TcpNetClientConnectionFactory getTcpNetServerConnectionFactory(String host, int port) { + TcpNetClientConnectionFactory result = new TcpNetClientConnectionFactory(host, port); + result.setTaskScheduler(new SimpleAsyncTaskScheduler()); + + return result; + } + } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java index 8a7c8da7748..096b58d165e 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -470,6 +471,7 @@ public void postProcessServerSocket(ServerSocket serverSocket) { public void testNioClientAndServerSSL() throws Exception { System.setProperty("javax.net.debug", "all"); // SSL activity in the console TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); server.setSslHandshakeTimeout(43); DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks", "test.truststore.ks", "secret", "secret"); @@ -576,6 +578,7 @@ protected void postProcessSSLEngine(SSLEngine sslEngine) { public void testNioClientAndServerSSLDifferentContextsLargeDataWithReply() throws Exception { System.setProperty("javax.net.debug", "all"); // SSL activity in the console TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); TcpSSLContextSupport serverSslContextSupport = new DefaultTcpSSLContextSupport("server.ks", "server.truststore.ks", "secret", "secret"); DefaultTcpNioSSLConnectionSupport serverTcpNioConnectionSupport = diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java index b5b39b4445b..8b84ab66a23 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java @@ -28,6 +28,7 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; @@ -41,6 +42,7 @@ public class TcpNetConnectionSupportTests { @Test public void testBadCode() throws Exception { TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); AtomicReference> message = new AtomicReference<>(); CountDownLatch latch1 = new CountDownLatch(1); server.registerListener(m -> { @@ -80,5 +82,4 @@ public TcpNetConnection createNewConnection(Socket socket, boolean isServer, boo assertThat(message.get()).isNotNull(); server.stop(); } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java index 0ca13eed336..207f7bf4b7d 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java @@ -46,6 +46,7 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -153,6 +154,7 @@ public void transferHeaders() throws Exception { @Test public void socketClosedNextRead() throws InterruptedException, IOException { TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); AtomicInteger port = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(1); ApplicationEventPublisher publisher = ev -> { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java index 2489780414c..a3aac38a7fa 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import org.springframework.integration.ip.util.TestingUtilities; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.with; @@ -62,6 +63,7 @@ private AbstractServerConnectionFactory getConnectionFactory( AbstractByteArraySerializer serializer, TcpListener listener, TcpSender sender) { TcpNioServerConnectionFactory scf = new TcpNioServerConnectionFactory(0); + scf.setTaskScheduler(new SimpleAsyncTaskScheduler()); scf.setUsingDirectBuffers(true); scf.setApplicationEventPublisher(e -> { }); @@ -119,7 +121,6 @@ public void testFragmented() throws Exception { semaphore.release(); return false; }); - int howMany = 2; scf.setBacklog(howMany + 5); // Fire up the sender. @@ -144,7 +145,6 @@ public void testReadStxEtx() throws Exception { semaphore.release(); return false; }); - // Fire up the sender. CountDownLatch done = SocketTestUtils.testSendStxEtx(scf.getPort(), latch); @@ -533,5 +533,4 @@ private void whileOpen(Semaphore semaphore, List added) .atMost(Duration.ofSeconds(20)) .until(() -> !added.get(0).isOpen()); } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java index 3f7f3b002f0..a86233d012b 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,6 +76,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ReflectionUtils; import org.springframework.util.StopWatch; @@ -543,9 +544,8 @@ public Integer answer(InvocationOnMock invocation) { @Test public void testAssemblerUsesSecondaryExecutor() throws Exception { - TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(0); + TcpNioServerConnectionFactory factory = newTcpNioServerConnectionFactory(); factory.setApplicationEventPublisher(nullPublisher); - CompositeExecutor compositeExec = compositeExecutor(); factory.setSoTimeout(1000); @@ -586,6 +586,12 @@ public void testAssemblerUsesSecondaryExecutor() throws Exception { cleanupCompositeExecutor(compositeExec); } + private static TcpNioServerConnectionFactory newTcpNioServerConnectionFactory() { + TcpNioServerConnectionFactory tcpNioServerConnectionFactory = new TcpNioServerConnectionFactory(0); + tcpNioServerConnectionFactory.setTaskScheduler(new SimpleAsyncTaskScheduler()); + return tcpNioServerConnectionFactory; + } + private void cleanupCompositeExecutor(CompositeExecutor compositeExec) throws Exception { TestUtils.getPropertyValue(compositeExec, "primaryTaskExecutor", DisposableBean.class).destroy(); TestUtils.getPropertyValue(compositeExec, "secondaryTaskExecutor", DisposableBean.class).destroy(); @@ -595,6 +601,7 @@ private void cleanupCompositeExecutor(CompositeExecutor compositeExec) throws Ex public void testAllMessagesDelivered() throws Exception { final int numberOfSockets = 10; TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(0); + factory.setTaskScheduler(mock()); factory.setApplicationEventPublisher(nullPublisher); CompositeExecutor compositeExec = compositeExecutor(); @@ -682,6 +689,7 @@ private CompositeExecutor compositeExecutor() { @Test public void int3453RaceTest() throws Exception { TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(0); + factory.setTaskScheduler(mock()); final CountDownLatch connectionLatch = new CountDownLatch(1); factory.setApplicationEventPublisher(new ApplicationEventPublisher() { @@ -768,7 +776,7 @@ public void publishEvent(Object event) { @Test public void testNoDelayOnClose() throws Exception { - TcpNioServerConnectionFactory cf = new TcpNioServerConnectionFactory(0); + TcpNioServerConnectionFactory cf = newTcpNioServerConnectionFactory(); final CountDownLatch reading = new CountDownLatch(1); final StopWatch watch = new StopWatch(); cf.setDeserializer(is -> { @@ -807,7 +815,8 @@ private void testMulti(boolean multiAccept) throws InterruptedException, IOExcep CountDownLatch serverReadyLatch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(21); List sockets = new ArrayList<>(); - TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0); + TcpNioServerConnectionFactory server = newTcpNioServerConnectionFactory(); + try { List events = Collections.synchronizedList(new ArrayList<>()); List> messages = Collections.synchronizedList(new ArrayList<>()); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java index 76300c4bd07..85e0bce8d88 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -43,6 +45,7 @@ public class TcpSenderTests { void senderCalledForDeadConnectionClientNet() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); server.registerListener(msg -> false); server.afterPropertiesSet(); server.setApplicationEventPublisher(event -> { @@ -61,6 +64,7 @@ void senderCalledForDeadConnectionClientNet() throws InterruptedException { void senderCalledForDeadConnectionClientNio() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); + server.setTaskScheduler(new SimpleAsyncTaskScheduler()); server.registerListener(msg -> false); server.afterPropertiesSet(); server.setApplicationEventPublisher(event -> { @@ -160,5 +164,4 @@ public synchronized void removeDeadConnection(TcpConnection connection) { assertThat(passedConnectionsToSenderViaAddNewConnection.get(0)).isSameAs(interceptorsPerInstance.get(3)); assertThat(passedConnectionsToSenderViaAddNewConnection.get(1)).isSameAs(interceptorsPerInstance.get(6)); } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/DeserializationTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/DeserializationTests.java index 551b1e965e8..d36430bca18 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/DeserializationTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/DeserializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; @@ -308,9 +309,15 @@ public void testTimeoutWithRawDeserializer() { testTimeoutWhileDecoding(new ByteArrayRawSerializer(), "reply"); } + private TcpNioServerConnectionFactory getTcpNioServerConnectionFactory(int port) { + TcpNioServerConnectionFactory result = new TcpNioServerConnectionFactory(port); + result.setTaskScheduler(new SimpleAsyncTaskScheduler()); + return result; + } + private void testTimeoutWhileDecoding(AbstractByteArraySerializer deserializer, String reply) { ByteArrayRawSerializer serializer = new ByteArrayRawSerializer(); - TcpNioServerConnectionFactory serverNio = new TcpNioServerConnectionFactory(0); + TcpNioServerConnectionFactory serverNio = getTcpNioServerConnectionFactory(0); serverNio.setApplicationEventPublisher(event -> { }); ByteArrayLengthHeaderSerializer lengthHeaderSerializer = new ByteArrayLengthHeaderSerializer(1); @@ -364,7 +371,7 @@ private void testTimeoutWhileDecoding(AbstractByteArraySerializer deserializer, @Test public void testTimeoutWithRawDeserializerEofIsTerminator() { ByteArrayRawSerializer serializer = new ByteArrayRawSerializer(); - TcpNioServerConnectionFactory serverNio = new TcpNioServerConnectionFactory(0); + TcpNioServerConnectionFactory serverNio = getTcpNioServerConnectionFactory(0); ByteArrayLengthHeaderSerializer lengthHeaderSerializer = new ByteArrayLengthHeaderSerializer(1); serverNio.setDeserializer(lengthHeaderSerializer); serverNio.setSerializer(serializer); diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java index 76f5a90a662..152986fdc51 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java @@ -83,6 +83,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.PollableChannel; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; @@ -93,6 +94,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willDoNothing; @@ -170,6 +172,7 @@ void stopImapServer() { public void testIdleWithServerCustomSearch() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver("imap://user:pw@localhost:" + imapIdleServer.getImap().getPort() + "/INBOX"); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); receiver.setSearchTermStrategy((supportedFlags, folder) -> { try { FromTerm fromTerm = new FromTerm(new InternetAddress("bar@baz")); @@ -186,6 +189,7 @@ public void testIdleWithServerCustomSearch() throws Exception { public void testIdleWithServerDefaultSearch() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver("imap://user:pw@localhost:" + imapIdleServer.getImap().getPort() + "/INBOX"); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); testIdleWithServerGuts(receiver, false); assertThat(imapSearches.searches.get(0)).contains("testSIUserFlag"); } @@ -213,6 +217,7 @@ public void testIdleWithMessageMappingSimple() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver("imap://user:pw@localhost:" + imapIdleServer.getImap().getPort() + "/INBOX"); receiver.setSimpleContent(true); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); receiver.setHeaderMapper(new DefaultMailHeaderMapper()); testIdleWithServerGuts(receiver, true, true); } @@ -311,7 +316,7 @@ private AbstractMailReceiver receiveAndMarkAsReadDontDeleteGuts(AbstractMailRece ((ImapMailReceiver) receiver).setShouldMarkMessagesAsRead(true); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); folderField.setAccessible(true); @@ -419,7 +424,7 @@ public void receiveMarkAsReadAndDelete() throws Exception { ((ImapMailReceiver) receiver).setShouldMarkMessagesAsRead(true); receiver.setShouldDeleteMessages(true); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -457,7 +462,7 @@ public void receiveAndDontMarkAsRead() throws Exception { AbstractMailReceiver receiver = new ImapMailReceiver(); ((ImapMailReceiver) receiver).setShouldMarkMessagesAsRead(false); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -487,7 +492,7 @@ public void receiveAndDontMarkAsReadButDelete() throws Exception { receiver.setShouldDeleteMessages(true); ((ImapMailReceiver) receiver).setShouldMarkMessagesAsRead(false); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -525,7 +530,7 @@ public void receiveAndDontMarkAsReadButDelete() throws Exception { public void receiveAndIgnoreMarkAsReadDontDelete() throws Exception { AbstractMailReceiver receiver = new ImapMailReceiver(); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -563,7 +568,7 @@ public void testMessageHistory() throws Exception { AbstractMailReceiver receiver = new ImapMailReceiver(); receiver = spy(receiver); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter); @@ -619,7 +624,7 @@ protected Object handleRequestMessage(org.springframework.messaging.Message r adapter.setReconnectDelay(10); AbstractMailReceiver receiver = new ImapMailReceiver(); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -810,7 +815,7 @@ public void testImapLifecycleForRaceCondition() throws Exception { DirectFieldAccessor df = new DirectFieldAccessor(receiver); df.setPropertyValue("store", store); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); new Thread(() -> { @@ -890,7 +895,7 @@ private Folder testAttachmentsGuts(final ImapMailReceiver receiver) throws Messa given(folder.getPermanentFlags()).willReturn(new Flags(Flags.Flag.USER)); DirectFieldAccessor df = new DirectFieldAccessor(receiver); df.setPropertyValue("store", store); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); return folder; @@ -946,7 +951,7 @@ public Message[] receive() throws MessagingException { @Test public void testIdleReconnects() throws Exception { ImapMailReceiver receiver = spy(new ImapMailReceiver("imap:foo")); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setBeanFactory(getBeanFactory()); receiver.afterPropertiesSet(); IMAPFolder folder = mock(IMAPFolder.class); given(folder.getPermanentFlags()).willReturn(new Flags(Flags.Flag.USER)); @@ -982,7 +987,7 @@ public void testIdleReconnects() throws Exception { private void setUpScheduler(ImapMailReceiver mailReceiver, ThreadPoolTaskScheduler taskScheduler) { taskScheduler.setPoolSize(5); taskScheduler.initialize(); - BeanFactory bf = mock(BeanFactory.class); + BeanFactory bf = getBeanFactory(taskScheduler); given(bf.containsBean("taskScheduler")).willReturn(true); given(bf.getBean("taskScheduler", TaskScheduler.class)).willReturn(taskScheduler); mailReceiver.setBeanFactory(bf); @@ -1013,6 +1018,23 @@ public void receiveAndMarkAsReadDontDeleteWithThrowingWhenCopying() throws Excep verify(receiver, times(0)).deleteMessages(Mockito.any()); } + private BeanFactory getBeanFactory(TaskScheduler taskScheduler) { + BeanFactory beanFactory = mock(BeanFactory.class); + when(beanFactory.getBean(eq("taskScheduler"), any(Class.class))) + .thenReturn(taskScheduler); + when(beanFactory.containsBean("taskScheduler")).thenReturn(true); + return beanFactory; + } + + private BeanFactory getBeanFactory() { + BeanFactory beanFactory = mock(BeanFactory.class); + TaskScheduler taskScheduler = mock(TaskScheduler.class); + when(beanFactory.getBean(eq("taskScheduler"), any(Class.class))) + .thenReturn(taskScheduler); + when(beanFactory.containsBean("taskScheduler")).thenReturn(true); + return beanFactory; + } + private static class ImapSearchLoggingHandler extends Handler { private final List searches = new ArrayList<>(); diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailSearchTermsTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailSearchTermsTests.java index 376f80aff2e..d0bbc0553a1 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailSearchTermsTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailSearchTermsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import jakarta.mail.search.SearchTerm; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.BeanFactory; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -62,7 +62,7 @@ public void validateSearchTermsWhenShouldMarkAsReadNoExistingFlagsCustom() throw public void validateSearchTermsWhenShouldMarkAsReadNoExistingFlagsGuts(String userFlag, ImapMailReceiver receiver) throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { receiver.setShouldMarkMessagesAsRead(true); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); folderField.setAccessible(true); @@ -85,7 +85,7 @@ public void validateSearchTermsWhenShouldMarkAsReadNoExistingFlagsGuts(String us public void validateSearchTermsWhenShouldMarkAsReadWithExistingFlags() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver(); receiver.setShouldMarkMessagesAsRead(true); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); @@ -115,7 +115,7 @@ public void validateSearchTermsWhenShouldMarkAsReadWithExistingFlags() throws Ex public void validateSearchTermsWhenShouldNotMarkAsReadNoExistingFlags() throws Exception { ImapMailReceiver receiver = new ImapMailReceiver(); receiver.setShouldMarkMessagesAsRead(false); - receiver.setBeanFactory(mock(BeanFactory.class)); + receiver.setTaskScheduler(new SimpleAsyncTaskScheduler()); receiver.afterPropertiesSet(); Field folderField = AbstractMailReceiver.class.getDeclaredField("folder"); diff --git a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/inbound/SyslogReceivingChannelAdapterTests.java b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/inbound/SyslogReceivingChannelAdapterTests.java index 0c92034e3b3..4eeb56f1e2d 100644 --- a/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/inbound/SyslogReceivingChannelAdapterTests.java +++ b/spring-integration-syslog/src/test/java/org/springframework/integration/syslog/inbound/SyslogReceivingChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,14 +46,17 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; +import org.springframework.scheduling.TaskScheduler; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * @author Gary Russell @@ -72,13 +75,14 @@ public void testUdp() throws Exception { PollableChannel outputChannel = new QueueChannel(); factory.setPort(0); factory.setOutputChannel(outputChannel); - factory.setBeanFactory(mock(BeanFactory.class)); + factory.setBeanFactory(mock()); factory.afterPropertiesSet(); factory.start(); UnicastReceivingChannelAdapter server = TestUtils.getPropertyValue(factory, "syslogAdapter.udpAdapter", UnicastReceivingChannelAdapter.class); TestingUtilities.waitListening(server, null); UdpSyslogReceivingChannelAdapter adapter = (UdpSyslogReceivingChannelAdapter) factory.getObject(); + adapter.setBeanFactory(mock()); byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes(StandardCharsets.UTF_8); DatagramPacket packet = new DatagramPacket(buf, buf.length, new InetSocketAddress("localhost", server.getPort())); @@ -106,7 +110,7 @@ public void testTcp() throws Exception { return null; }).when(publisher).publishEvent(any(ApplicationEvent.class)); factory.setApplicationEventPublisher(publisher); - factory.setBeanFactory(mock(BeanFactory.class)); + factory.setBeanFactory(getBeanFactory()); factory.afterPropertiesSet(); factory.start(); AbstractServerConnectionFactory server = TestUtils.getPropertyValue(factory, "syslogAdapter.connectionFactory", @@ -144,15 +148,18 @@ public void testAsMapFalse() throws Exception { factory.setPort(0); PollableChannel outputChannel = new QueueChannel(); factory.setOutputChannel(outputChannel); - factory.setBeanFactory(mock(BeanFactory.class)); + factory.setBeanFactory(mock()); factory.afterPropertiesSet(); factory.start(); UnicastReceivingChannelAdapter server = TestUtils.getPropertyValue(factory, "syslogAdapter.udpAdapter", UnicastReceivingChannelAdapter.class); + server.setBeanFactory(mock()); TestingUtilities.waitListening(server, null); UdpSyslogReceivingChannelAdapter adapter = (UdpSyslogReceivingChannelAdapter) factory.getObject(); + adapter.setBeanFactory(mock()); DefaultMessageConverter defaultMessageConverter = new DefaultMessageConverter(); defaultMessageConverter.setAsMap(false); + defaultMessageConverter.setBeanFactory(mock()); adapter.setConverter(defaultMessageConverter); byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes(StandardCharsets.UTF_8); DatagramPacket packet = new DatagramPacket(buf, buf.length, new InetSocketAddress("localhost", @@ -173,6 +180,7 @@ public void testAsMapFalse() throws Exception { public void testTcpRFC5424() throws Exception { SyslogReceivingChannelAdapterFactoryBean factory = new SyslogReceivingChannelAdapterFactoryBean( SyslogReceivingChannelAdapterFactoryBean.Protocol.tcp); + factory.setBeanFactory(getBeanFactory()); PollableChannel outputChannel = new QueueChannel(); factory.setOutputChannel(outputChannel); ApplicationEventPublisher publisher = mock(ApplicationEventPublisher.class); @@ -181,8 +189,9 @@ public void testTcpRFC5424() throws Exception { latch.countDown(); return null; }).when(publisher).publishEvent(any(ApplicationEvent.class)); - factory.setBeanFactory(mock(BeanFactory.class)); + factory.setBeanFactory(getBeanFactory()); AbstractServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(0); + connectionFactory.setBeanFactory(getBeanFactory()); connectionFactory.setDeserializer(new RFC6587SyslogDeserializer()); connectionFactory.setApplicationEventPublisher(publisher); factory.setConnectionFactory(connectionFactory); @@ -191,6 +200,7 @@ public void testTcpRFC5424() throws Exception { factory.start(); TestingUtilities.waitListening(connectionFactory, null); TcpSyslogReceivingChannelAdapter adapter = (TcpSyslogReceivingChannelAdapter) factory.getObject(); + adapter.setBeanFactory(getBeanFactory()); LogAccessor logger = spy(TestUtils.getPropertyValue(adapter, "logger", LogAccessor.class)); doReturn(true).when(logger).isDebugEnabled(); final CountDownLatch sawLog = new CountDownLatch(1); @@ -223,21 +233,23 @@ public void testTcpRFC5424() throws Exception { public void testUdpRFC5424() throws Exception { SyslogReceivingChannelAdapterFactoryBean factory = new SyslogReceivingChannelAdapterFactoryBean( SyslogReceivingChannelAdapterFactoryBean.Protocol.udp); + factory.setBeanFactory(mock()); factory.setPort(0); PollableChannel outputChannel = new QueueChannel(); factory.setOutputChannel(outputChannel); - factory.setBeanFactory(mock(BeanFactory.class)); factory.setConverter(new RFC5424MessageConverter()); factory.afterPropertiesSet(); factory.start(); UnicastReceivingChannelAdapter server = TestUtils.getPropertyValue(factory, "syslogAdapter.udpAdapter", UnicastReceivingChannelAdapter.class); + server.setBeanFactory(mock()); TestingUtilities.waitListening(server, null); UdpSyslogReceivingChannelAdapter adapter = (UdpSyslogReceivingChannelAdapter) factory.getObject(); byte[] buf = ("<14>1 2014-06-20T09:14:07+00:00 loggregator d0602076-b14a-4c55-852a-981e7afeed38 DEA - " + "[exampleSDID@32473 iut=\\\"3\\\" eventSource=\\\"Application\\\" eventID=\\\"1011\\\"]" + "[exampleSDID@32473 iut=\\\"3\\\" eventSource=\\\"Application\\\" eventID=\\\"1011\\\"] Removing instance") .getBytes(StandardCharsets.UTF_8); + adapter.setBeanFactory(mock()); DatagramPacket packet = new DatagramPacket(buf, buf.length, new InetSocketAddress("localhost", adapter.getPort())); DatagramSocket socket = new DatagramSocket(); @@ -251,4 +263,12 @@ public void testUdpRFC5424() throws Exception { adapter.stop(); } + private BeanFactory getBeanFactory() { + BeanFactory beanFactory = mock(BeanFactory.class); + TaskScheduler taskScheduler = mock(TaskScheduler.class); + when(beanFactory.getBean(eq("taskScheduler"), any(Class.class))) + .thenReturn(taskScheduler); + when(beanFactory.containsBean("taskScheduler")).thenReturn(true); + return beanFactory; + } }