@@ -51,6 +51,26 @@ pub struct RetryProps {
5151 pub max_delay : Duration ,
5252}
5353
54+ impl RetryProps {
55+ #[ inline]
56+ fn dl_exchange ( & self , props : & QueueProps ) -> String {
57+ #[ allow( clippy:: drop_ref) ]
58+ std:: mem:: drop ( self ) ; // self isn't used, but is required to exist for
59+ // this method to make sense
60+ format ! ( "dlx.{}" , props. queue)
61+ }
62+
63+ // returns (exchange, queue, triage_queue)
64+ #[ inline]
65+ fn dl_info ( & self , props : & QueueProps ) -> ( String , String , String ) {
66+ (
67+ self . dl_exchange ( props) ,
68+ format ! ( "dlq.{}" , props. queue) ,
69+ format ! ( "triage.dlq.{}" , props. queue) ,
70+ )
71+ }
72+ }
73+
5474#[ derive( Debug , Clone ) ]
5575pub struct QueueProps {
5676 pub exchange : String ,
@@ -123,18 +143,6 @@ impl<'a> QueueInfo<'a> {
123143
124144#[ cfg( feature = "consumer" ) ]
125145impl < ' a > QueueInfo < ' a > {
126- fn dl_exchange ( self ) -> String {
127- format ! ( "dlx.{}" , self . 0 . queue)
128- }
129-
130- fn dl_queue ( self ) -> String {
131- format ! ( "dlq.{}" , self . 0 . queue)
132- }
133-
134- fn dl_triage_queue ( self ) -> String {
135- format ! ( "triage.dlq.{}" , self . 0 . queue)
136- }
137-
138146 async fn queue_declare ( self , chan : & Channel ) -> Result < ( ) > {
139147 let mut queue_fields = FieldTable :: default ( ) ;
140148
@@ -143,15 +151,17 @@ impl<'a> QueueInfo<'a> {
143151 AMQPValue :: LongLongInt ( self . 0 . max_len_bytes ) ,
144152 ) ;
145153
146- queue_fields. insert (
147- "x-dead-letter-exchange" . into ( ) ,
148- AMQPValue :: LongString ( self . dl_exchange ( ) . into ( ) ) ,
149- ) ;
154+ if let Some ( ref retry) = self . 0 . retry {
155+ queue_fields. insert (
156+ "x-dead-letter-exchange" . into ( ) ,
157+ AMQPValue :: LongString ( retry. dl_exchange ( self . 0 ) . into ( ) ) ,
158+ ) ;
150159
151- queue_fields. insert (
152- "x-dead-letter-routing-key" . into ( ) ,
153- AMQPValue :: LongString ( DLX_TRIAGE_KEY . into ( ) ) ,
154- ) ;
160+ queue_fields. insert (
161+ "x-dead-letter-routing-key" . into ( ) ,
162+ AMQPValue :: LongString ( DLX_TRIAGE_KEY . into ( ) ) ,
163+ ) ;
164+ }
155165
156166 chan. queue_declare (
157167 self . 0 . queue . as_ref ( ) ,
@@ -167,24 +177,27 @@ impl<'a> QueueInfo<'a> {
167177 }
168178
169179 /// Returns (`dl_exchange`, `dl_queue`, `dl_triage_queue`)
170- async fn dl_exchange_declare ( self , chan : & Channel ) -> Result < ( String , String , String ) > {
180+ async fn dl_exchange_declare ( self , chan : & Channel ) -> Result < Option < ( String , String , String ) > > {
171181 let mut exchg_fields = FieldTable :: default ( ) ;
172182
183+ let retry = if let Some ( retry) = self . 0 . retry {
184+ retry
185+ } else {
186+ return Ok ( None ) ;
187+ } ;
188+
173189 exchg_fields. insert (
174190 "x-message-ttl" . into ( ) ,
175191 AMQPValue :: LongLongInt (
176- self . 0
177- . retry
178- . as_ref ( )
179- . ok_or ( Error :: InvalidQueueType ( "Missing retry info" ) ) ?
192+ retry
180193 . max_delay
181194 . as_millis ( )
182195 . try_into ( )
183196 . map_err ( |_| Error :: InvalidQueueType ( "Max delay overflowed i64" ) ) ?,
184197 ) ,
185198 ) ;
186199
187- let exchg = self . dl_exchange ( ) ;
200+ let ( exchg, queue , triage ) = retry . dl_info ( self . 0 ) ;
188201
189202 chan. exchange_declare (
190203 exchg. as_ref ( ) ,
@@ -197,15 +210,17 @@ impl<'a> QueueInfo<'a> {
197210 )
198211 . await ?;
199212
200- Ok ( ( exchg, self . dl_queue ( ) , self . dl_triage_queue ( ) ) )
213+ Ok ( Some ( ( exchg, queue , triage ) ) )
201214 }
202215
203216 pub ( crate ) async fn init_consumer (
204217 self ,
205218 chan : & Channel ,
206219 tag : impl AsRef < str > ,
207220 ) -> Result < Consumer > {
208- self . dl_exchange_declare ( chan) . await ?;
221+ if self . 0 . retry . is_some ( ) {
222+ self . dl_exchange_declare ( chan) . await ?;
223+ }
209224 self . exchange_declare ( chan) . await ?;
210225 self . queue_declare ( chan) . await ?;
211226
@@ -235,7 +250,10 @@ impl<'a> QueueInfo<'a> {
235250 self ,
236251 chan : & Channel ,
237252 ) -> Result < ( Consumer , DlConsumerInfo ) > {
238- let ( exchange, queue, triage_queue) = self . dl_exchange_declare ( chan) . await ?;
253+ let ( exchange, queue, triage_queue) = self
254+ . dl_exchange_declare ( chan)
255+ . await ?
256+ . ok_or ( Error :: InvalidQueueType ( "Missing retry info" ) ) ?;
239257
240258 {
241259 let mut queue_fields = FieldTable :: default ( ) ;
0 commit comments