Skip to content

Commit 55af4a3

Browse files
garyrussellartembilan
authored andcommitted
Add well-known arguments to Queue/Exchange Builder
- also add type parameter to the exchange `build()` method
1 parent dde7a37 commit 55af4a3

File tree

5 files changed

+356
-9
lines changed

5 files changed

+356
-9
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeBuilder.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ public ExchangeBuilder autoDelete() {
110110

111111
/**
112112
* Set the durable flag.
113-
* @param durable the durable flag (default true).
113+
* @param isDurable the durable flag (default true).
114114
* @return the builder.
115115
*/
116-
public ExchangeBuilder durable(boolean durable) {
117-
this.durable = durable;
116+
public ExchangeBuilder durable(boolean isDurable) {
117+
this.durable = isDurable;
118118
return this;
119119
}
120120

@@ -139,6 +139,10 @@ public ExchangeBuilder withArguments(Map<String, Object> arguments) {
139139
return this;
140140
}
141141

142+
public ExchangeBuilder alternate(String exchange) {
143+
return withArgument("alternate-exchange", exchange);
144+
}
145+
142146
/**
143147
* Set the internal flag.
144148
* @return the builder.
@@ -190,7 +194,8 @@ public ExchangeBuilder admins(Object... admins) {
190194
return this;
191195
}
192196

193-
public Exchange build() {
197+
@SuppressWarnings("unchecked")
198+
public <T extends Exchange> T build() {
194199
AbstractExchange exchange;
195200
if (ExchangeTypes.DIRECT.equals(this.type)) {
196201
exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments());
@@ -214,7 +219,7 @@ else if (ExchangeTypes.HEADERS.equals(this.type)) {
214219
if (!ObjectUtils.isEmpty(this.declaringAdmins)) {
215220
exchange.setAdminsThatShouldDeclare(this.declaringAdmins);
216221
}
217-
return exchange;
222+
return (T) exchange;
218223
}
219224

220225
}

spring-amqp/src/main/java/org/springframework/amqp/core/QueueBuilder.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,119 @@ public QueueBuilder withArguments(Map<String, Object> arguments) {
120120
return this;
121121
}
122122

123+
/**
124+
* Set the message time-to-live after which it will be discarded, or routed to the
125+
* dead-letter-exchange, if so configured.
126+
* @param ttl the time to live (milliseconds).
127+
* @return the builder.
128+
* @since 2.2
129+
* @see #deadLetterExchange(String)
130+
*/
131+
public QueueBuilder ttl(int ttl) {
132+
return withArgument("x-message-ttl", ttl);
133+
}
134+
135+
/**
136+
* Set the time that the queue can remain unused before being deleted.
137+
* @param expires the expiration (milliseconds).
138+
* @return the builder.
139+
* @since 2.2
140+
*/
141+
public QueueBuilder expires(int expires) {
142+
return withArgument("x-expires", expires);
143+
}
144+
145+
/**
146+
* Set the number of (ready) messages allowed in the queue before it starts to drop
147+
* them.
148+
* @param count the number of (ready) messages allowed.
149+
* @return the builder.
150+
* @since 2.2
151+
* @see #overflow(Overflow)
152+
*/
153+
public QueueBuilder maxLength(int count) {
154+
return withArgument("x-max-length", count);
155+
}
156+
157+
/**
158+
* Set the total aggregate body size allowed in the queue before it starts to drop
159+
* them.
160+
* @param bytes the total aggregate body size.
161+
* @return the builder.
162+
* @since 2.2
163+
*/
164+
public QueueBuilder maxLengthBytes(int bytes) {
165+
return withArgument("x-max-length-bytes", bytes);
166+
}
167+
168+
/**
169+
* Set the overflow mode when messages are dropped due to max messages or max message
170+
* size is exceeded.
171+
* @param overflow {@link Overflow#dropHead} or {@link Overflow#rejectPublish}.
172+
* @return the builder.
173+
* @since 2.2
174+
*/
175+
public QueueBuilder overflow(Overflow overflow) {
176+
return withArgument("x-overflow", overflow.getValue());
177+
}
178+
179+
/**
180+
* Set the dead-letter exchange to which to route expired or rejected messages.
181+
* @param dlx the dead-letter exchange.
182+
* @return the builder.
183+
* @since 2.2
184+
* @see #deadLetterRoutingKey(String)
185+
*/
186+
public QueueBuilder deadLetterExchange(String dlx) {
187+
return withArgument("x-dead-letter-exchange", dlx);
188+
}
189+
190+
/**
191+
* Set the routing key to use when routing expired or rejected messages to the
192+
* dead-letter exchange.
193+
* @param dlrk the dead-letter routing key.
194+
* @return the builder.
195+
* @since 2.2
196+
* @see #deadLetterExchange(String)
197+
*/
198+
public QueueBuilder deadLetterRoutingKey(String dlrk) {
199+
return withArgument("x-dead-letter-routing-key", dlrk);
200+
}
201+
202+
/**
203+
* Set the maximum number if priority levels for the queue to support; if not set, the
204+
* queue will not support message priorities.
205+
* @param maxPriority the maximum priority.
206+
* @return the builder.
207+
* @since 2.2
208+
*/
209+
public QueueBuilder maxPriority(int maxPriority) {
210+
return withArgument("x-max-priority", maxPriority);
211+
}
212+
213+
/**
214+
* Set the queue into lazy mode, keeping as many messages as possible on disk to
215+
* reduce RAM usage on the broker. if not set, the queue will keep an in-memory cache
216+
* to deliver messages as fast as possible.
217+
* @return the builder.
218+
* @since 2.2
219+
*/
220+
public QueueBuilder lazy() {
221+
return withArgument("x-queue-mode", "lazy");
222+
}
223+
224+
/**
225+
* Set the master locator mode which determines which node a queue master will be
226+
* located on a cluster of nodes.
227+
* @param locator {@link MasterLocator#minMasters}, {@link MasterLocator#clientLocal}
228+
* or {@link MasterLocator#random}.
229+
* @return the builder.
230+
* @since 2.2
231+
*/
232+
public QueueBuilder masterLocator(MasterLocator locator) {
233+
return withArgument("x-queue-master-locator", locator.getValue());
234+
}
235+
123236
/**
124237
* Builds a final queue.
125238
* @return the Queue instance.
@@ -128,4 +241,57 @@ public Queue build() {
128241
return new Queue(this.name, this.durable, this.exclusive, this.autoDelete, getArguments());
129242
}
130243

244+
public enum Overflow {
245+
246+
/**
247+
* Drop the oldest message.
248+
*/
249+
dropHead("drop-head"),
250+
251+
/**
252+
* Reject the new message.
253+
*/
254+
rejectPublish("reject-publish");
255+
256+
private final String value;
257+
258+
Overflow(String value) {
259+
this.value = value;
260+
}
261+
262+
public String getValue() {
263+
return this.value;
264+
}
265+
266+
}
267+
268+
public enum MasterLocator {
269+
270+
/**
271+
* Deploy on the node with the fewest masters.
272+
*/
273+
minMasters("min-masters"),
274+
275+
/**
276+
* Deploy on the node we are connected to.
277+
*/
278+
clientLocal("client-local"),
279+
280+
/**
281+
* Deploy on a random node.
282+
*/
283+
random("random");
284+
285+
private final String value;
286+
287+
MasterLocator(String value) {
288+
this.value = value;
289+
}
290+
291+
public String getValue() {
292+
return this.value;
293+
}
294+
295+
}
296+
131297
}

0 commit comments

Comments
 (0)