Skip to content

Commit a0a5d0b

Browse files
committed
Move default subscription name to factory
This commit moves the default subscription name from the `@PulsarListener` and `@ReactivePulsarListener` annotation to the corresponding container factory (props) which allows the `spring.pulsar.consumer.subscription.name` config prop to be respected. See spring-projects/spring-boot#42053
1 parent 5946c95 commit a0a5d0b

File tree

12 files changed

+391
-179
lines changed

12 files changed

+391
-179
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ When you use Spring Boot support, it automatically enables this annotation and c
1111
`PulsarMessageListenerContainer` uses a `PulsarConsumerFactory` to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.
1212

1313
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
14-
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:
15-
16-
TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.
1714

1815
Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section:
1916

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,6 @@ NOTE: There is no support for using `org.apache.pulsar.client.api.Messages<T>` i
118118
=== Configuration - Application Properties
119119
The listener relies on the `ReactivePulsarConsumerFactory` to create and manage the underlying Pulsar consumer that it uses to consume messages.
120120
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
121-
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:
122-
123-
TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.
124-
125-
TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.
126121

127122
=== Generic records with AUTO_CONSUME
128123
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records.

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Arrays;
2020
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import org.apache.pulsar.client.api.Schema;
2324
import org.apache.pulsar.client.api.SubscriptionType;
@@ -39,6 +40,10 @@
3940
*/
4041
public class DefaultReactivePulsarListenerContainerFactory<T> implements ReactivePulsarListenerContainerFactory<T> {
4142

43+
private static final String SUBSCRIPTION_NAME_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#";
44+
45+
private static final AtomicInteger COUNTER = new AtomicInteger();
46+
4247
protected final LogAccessor logger = new LogAccessor(this.getClass());
4348

4449
private final ReactivePulsarConsumerFactory<T> consumerFactory;
@@ -84,58 +89,54 @@ public void setFluxListener(Boolean fluxListener) {
8489
@SuppressWarnings("unchecked")
8590
public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
8691
ReactivePulsarListenerEndpoint<T> endpoint) {
87-
88-
ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
89-
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
90-
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
91-
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());
92-
92+
var containerProps = new ReactivePulsarContainerProperties<T>();
93+
var factoryProps = this.getContainerProperties();
94+
95+
// Map factory props (defaults) to the container props
96+
containerProps.setSchemaResolver(factoryProps.getSchemaResolver());
97+
containerProps.setTopicResolver(factoryProps.getTopicResolver());
98+
containerProps.setSubscriptionType(factoryProps.getSubscriptionType());
99+
containerProps.setSubscriptionName(factoryProps.getSubscriptionName());
100+
containerProps.setSchemaType(factoryProps.getSchemaType());
101+
containerProps.setConcurrency(factoryProps.getConcurrency());
102+
containerProps.setUseKeyOrderedProcessing(factoryProps.isUseKeyOrderedProcessing());
103+
104+
// Map relevant props from the endpoint to the container props
93105
if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
94-
properties.setTopics(endpoint.getTopics());
106+
containerProps.setTopics(endpoint.getTopics());
95107
}
96-
97108
if (StringUtils.hasText(endpoint.getTopicPattern())) {
98-
properties.setTopicsPattern(endpoint.getTopicPattern());
109+
containerProps.setTopicsPattern(endpoint.getTopicPattern());
99110
}
100-
101-
if (StringUtils.hasText(endpoint.getSubscriptionName())) {
102-
properties.setSubscriptionName(endpoint.getSubscriptionName());
103-
}
104-
105111
if (endpoint.getSubscriptionType() != null) {
106-
properties.setSubscriptionType(endpoint.getSubscriptionType());
112+
containerProps.setSubscriptionType(endpoint.getSubscriptionType());
107113
}
108-
// Default to Exclusive if not set on container props or endpoint
109-
if (properties.getSubscriptionType() == null) {
110-
properties.setSubscriptionType(SubscriptionType.Exclusive);
114+
// Default subscription type to Exclusive when not set elsewhere
115+
if (containerProps.getSubscriptionType() == null) {
116+
containerProps.setSubscriptionType(SubscriptionType.Exclusive);
111117
}
112-
113-
if (endpoint.getSchemaType() != null) {
114-
properties.setSchemaType(endpoint.getSchemaType());
118+
if (StringUtils.hasText(endpoint.getSubscriptionName())) {
119+
containerProps.setSubscriptionName(endpoint.getSubscriptionName());
115120
}
116-
else {
117-
properties.setSchemaType(this.containerProperties.getSchemaType());
121+
// Default subscription name to generated when not set elsewhere
122+
if (!StringUtils.hasText(containerProps.getSubscriptionName())) {
123+
var generatedName = SUBSCRIPTION_NAME_PREFIX + COUNTER.getAndIncrement();
124+
containerProps.setSubscriptionName(generatedName);
118125
}
119-
120-
if (properties.getSchema() == null) {
121-
properties.setSchema((Schema<T>) Schema.BYTES);
126+
if (endpoint.getSchemaType() != null) {
127+
containerProps.setSchemaType(endpoint.getSchemaType());
122128
}
123-
124-
if (endpoint.getConcurrency() != null) {
125-
properties.setConcurrency(endpoint.getConcurrency());
129+
// Default to BYTES if not set elsewhere
130+
if (containerProps.getSchema() == null) {
131+
containerProps.setSchema((Schema<T>) Schema.BYTES);
126132
}
127-
else {
128-
properties.setConcurrency(this.containerProperties.getConcurrency());
133+
if (endpoint.getConcurrency() != null) {
134+
containerProps.setConcurrency(endpoint.getConcurrency());
129135
}
130-
131136
if (endpoint.getUseKeyOrderedProcessing() != null) {
132-
properties.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
137+
containerProps.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
133138
}
134-
else {
135-
properties.setUseKeyOrderedProcessing(this.containerProperties.isUseKeyOrderedProcessing());
136-
}
137-
138-
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), properties);
139+
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps);
139140
}
140141

141142
@Override

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,11 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
230230
ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) {
231231
endpoint.setBean(bean);
232232
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
233-
endpoint.setSubscriptionName(getEndpointSubscriptionName(reactivePulsarListener));
234233
endpoint.setId(getEndpointId(reactivePulsarListener));
235234
endpoint.setTopics(topics);
236235
endpoint.setTopicPattern(topicPattern);
237236
resolveSubscriptionType(endpoint, reactivePulsarListener);
237+
resolveSubscriptionName(endpoint, reactivePulsarListener);
238238
endpoint.setSchemaType(reactivePulsarListener.schemaType());
239239
String concurrency = reactivePulsarListener.concurrency();
240240
if (StringUtils.hasText(concurrency)) {
@@ -257,11 +257,18 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
257257
}
258258

259259
private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint<?> endpoint,
260-
ReactivePulsarListener reactivePulsarListener) {
261-
Assert.state(reactivePulsarListener.subscriptionType().length <= 1,
260+
ReactivePulsarListener listener) {
261+
Assert.state(listener.subscriptionType().length <= 1,
262262
() -> "ReactivePulsarListener.subscriptionType must have 0 or 1 elements");
263-
if (reactivePulsarListener.subscriptionType().length == 1) {
264-
endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType()[0]);
263+
if (listener.subscriptionType().length == 1) {
264+
endpoint.setSubscriptionType(listener.subscriptionType()[0]);
265+
}
266+
}
267+
268+
private void resolveSubscriptionName(MethodReactivePulsarListenerEndpoint<?> endpoint,
269+
ReactivePulsarListener listener) {
270+
if (StringUtils.hasText(listener.subscriptionName())) {
271+
endpoint.setSubscriptionName(resolveExpressionAsString(listener.subscriptionName(), "subscriptionName"));
265272
}
266273
}
267274

@@ -322,13 +329,6 @@ private void resolveConsumerCustomizer(MethodReactivePulsarListenerEndpoint<?> e
322329
}
323330
}
324331

325-
private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) {
326-
if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) {
327-
return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName");
328-
}
329-
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
330-
}
331-
332332
private String getEndpointId(ReactivePulsarListener reactivePulsarListener) {
333333
if (StringUtils.hasText(reactivePulsarListener.id())) {
334334
return resolveExpressionAsString(reactivePulsarListener.id(), "id");

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,55 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
7878

7979
}
8080

81+
@SuppressWarnings("unchecked")
82+
@Nested
83+
class SubscriptionNameFrom {
84+
85+
@Test
86+
void factoryPropsUsedWhenNotSetOnEndpoint() {
87+
var factoryProps = new ReactivePulsarContainerProperties<String>();
88+
factoryProps.setSubscriptionName("my-factory-subscription");
89+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
90+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
91+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
92+
when(endpoint.getConcurrency()).thenReturn(1);
93+
var createdContainer = containerFactory.createListenerContainer(endpoint);
94+
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
95+
.isEqualTo("my-factory-subscription");
96+
}
97+
98+
@Test
99+
void endpointTakesPrecedenceOverFactoryProps() {
100+
var factoryProps = new ReactivePulsarContainerProperties<String>();
101+
factoryProps.setSubscriptionName("my-factory-subscription");
102+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
103+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
104+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
105+
when(endpoint.getConcurrency()).thenReturn(1);
106+
when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription");
107+
var createdContainer = containerFactory.createListenerContainer(endpoint);
108+
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
109+
.isEqualTo("my-endpoint-subscription");
110+
}
111+
112+
@Test
113+
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
114+
var factoryProps = new ReactivePulsarContainerProperties<String>();
115+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
116+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
117+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
118+
when(endpoint.getConcurrency()).thenReturn(1);
119+
120+
var container1 = containerFactory.createListenerContainer(endpoint);
121+
assertThat(container1.getContainerProperties().getSubscriptionName())
122+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
123+
var container2 = containerFactory.createListenerContainer(endpoint);
124+
assertThat(container2.getContainerProperties().getSubscriptionName())
125+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
126+
assertThat(container1.getContainerProperties().getSubscriptionName())
127+
.isNotEqualTo(container2.getContainerProperties().getSubscriptionName());
128+
}
129+
130+
}
131+
81132
}

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.pulsar.reactive.client.api.MessageResult;
4545
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
4646
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
47+
import org.assertj.core.api.InstanceOfAssertFactories;
4748
import org.junit.jupiter.api.Nested;
4849
import org.junit.jupiter.api.Test;
4950

@@ -72,7 +73,7 @@
7273
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
7374
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
7475
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
75-
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;
76+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionNameTests.SubscriptionNameTestsConfig;
7677
import org.springframework.pulsar.reactive.support.MessageUtils;
7778
import org.springframework.pulsar.support.PulsarHeaders;
7879
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
@@ -815,80 +816,79 @@ Mono<Void> listen2(String message) {
815816
}
816817

817818
@Nested
818-
@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)
819-
class SubscriptionTypeTests {
819+
@ContextConfiguration(classes = SubscriptionNameTestsConfig.class)
820+
class SubscriptionNameTests {
820821

821-
static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
822+
static final CountDownLatch latchNameNotSet = new CountDownLatch(1);
822823

823-
static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);
824+
static final CountDownLatch latchNameSetOnAnnotation = new CountDownLatch(1);
824825

825-
static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);
826+
static final CountDownLatch latchNameSetOnCustomizer = new CountDownLatch(1);
826827

827828
@Test
828-
void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere(
829+
void defaultNameFromContainerFactoryUsedWhenNameNotSetAnywhere(
829830
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
830-
var topic = "rpl-latchTypeNotSet-topic";
831-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
832-
.isEqualTo(SubscriptionType.Exclusive);
831+
var topic = "rpl-latchNameNotSet-topic";
832+
assertThat(consumerFactory.getSpec(topic))
833+
.extracting(ReactiveMessageConsumerSpec::getSubscriptionName, InstanceOfAssertFactories.STRING)
834+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
833835
pulsarTemplate.send(topic, "hello-" + topic);
834-
assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();
836+
assertThat(latchNameNotSet.await(5, TimeUnit.SECONDS)).isTrue();
835837
}
836838

837839
@Test
838-
void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory(
840+
void nameSetOnAnnotationOverridesDefaultNameFromContainerFactory(
839841
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
840-
var topic = "rpl-typeSetOnAnnotation-topic";
841-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
842-
.isEqualTo(SubscriptionType.Key_Shared);
842+
var topic = "rpl-nameSetOnAnnotation-topic";
843+
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
844+
.isEqualTo("from-annotation");
843845
pulsarTemplate.send(topic, "hello-" + topic);
844-
assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
846+
assertThat(latchNameSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
845847
}
846848

847849
@Test
848-
void typeSetOnCustomizerOverridesTypeSetOnAnnotation(
850+
void nameSetOnCustomizerOverridesNameSetOnAnnotation(
849851
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
850-
var topic = "rpl-typeSetOnCustomizer-topic";
851-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
852-
.isEqualTo(SubscriptionType.Failover);
852+
var topic = "rpl-nameSetOnCustomizer-topic";
853+
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
854+
.isEqualTo("from-customizer");
853855
pulsarTemplate.send(topic, "hello-" + topic);
854-
assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
856+
assertThat(latchNameSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
855857
}
856858

857859
@Configuration(proxyBeanMethods = false)
858-
static class SubscriptionTypeTestsConfig {
860+
static class SubscriptionNameTestsConfig {
859861

860862
@Bean
861-
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
862-
return (b) -> b.subscriptionType(SubscriptionType.Shared);
863+
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubNameCustomizer() {
864+
return (b) -> b.subscriptionName("from-consumer-factory");
863865
}
864866

865-
@ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub",
867+
@ReactivePulsarListener(topics = "rpl-latchNameNotSet-topic",
866868
consumerCustomizer = "subscriptionInitialPositionEarliest")
867-
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
868-
latchTypeNotSet.countDown();
869+
Mono<Void> listenWithoutNameSetAnywhere(String ignored) {
870+
latchNameNotSet.countDown();
869871
return Mono.empty();
870872
}
871873

872-
@ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic",
873-
subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
874+
@ReactivePulsarListener(topics = "rpl-nameSetOnAnnotation-topic", subscriptionName = "from-annotation",
874875
consumerCustomizer = "subscriptionInitialPositionEarliest")
875-
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
876-
latchTypeSetOnAnnotation.countDown();
876+
Mono<Void> listenWithNameSetOnAnnotation(String ignored) {
877+
latchNameSetOnAnnotation.countDown();
877878
return Mono.empty();
878879
}
879880

880-
@ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic",
881-
subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
881+
@ReactivePulsarListener(topics = "rpl-nameSetOnCustomizer-topic", subscriptionName = "from-annotation",
882882
consumerCustomizer = "myCustomizer")
883-
Mono<Void> listenWithTypeSetOnCustomizer(String ignored) {
884-
latchTypeSetOnCustomizer.countDown();
883+
Mono<Void> listenWithNameSetOnCustomizer(String ignored) {
884+
latchNameSetOnCustomizer.countDown();
885885
return Mono.empty();
886886
}
887887

888888
@Bean
889889
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
890890
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
891-
.subscriptionType(SubscriptionType.Failover);
891+
.subscriptionName("from-customizer");
892892
}
893893

894894
}

0 commit comments

Comments
 (0)