@@ -2,13 +2,14 @@ use async_trait::async_trait;
2
2
use std:: ops:: ControlFlow ;
3
3
use std:: sync:: Arc ;
4
4
use tokio:: sync:: Mutex ;
5
- use tracing:: { error, info} ;
5
+ use tracing:: { debug , error, info} ;
6
6
7
7
use crate :: {
8
8
backoff:: { Backoff , BackoffConfig , ErrorOrThrottle } ,
9
9
client:: { Error , Result } ,
10
10
connection:: {
11
- BrokerCache , BrokerConnection , BrokerConnector , MessengerTransport , MetadataLookupMode ,
11
+ BrokerCache , BrokerCacheGeneration , BrokerConnection , BrokerConnector , MessengerTransport ,
12
+ MetadataLookupMode ,
12
13
} ,
13
14
messenger:: RequestError ,
14
15
protocol:: {
@@ -29,15 +30,15 @@ pub struct ControllerClient {
29
30
backoff_config : BackoffConfig ,
30
31
31
32
/// Current broker connection if any
32
- current_broker : Mutex < Option < BrokerConnection > > ,
33
+ current_broker : Mutex < ( Option < BrokerConnection > , BrokerCacheGeneration ) > ,
33
34
}
34
35
35
36
impl ControllerClient {
36
37
pub ( super ) fn new ( brokers : Arc < BrokerConnector > ) -> Self {
37
38
Self {
38
39
brokers,
39
40
backoff_config : Default :: default ( ) ,
40
- current_broker : Mutex :: new ( None ) ,
41
+ current_broker : Mutex :: new ( ( None , BrokerCacheGeneration :: START ) ) ,
41
42
}
42
43
}
43
44
@@ -64,28 +65,34 @@ impl ControllerClient {
64
65
} ;
65
66
66
67
maybe_retry ( & self . backoff_config , self , "create_topic" , || async move {
67
- let broker = self . get ( ) . await . map_err ( ErrorOrThrottle :: Error ) ?;
68
+ let ( broker, gen) = self
69
+ . get ( )
70
+ . await
71
+ . map_err ( |e| ErrorOrThrottle :: Error ( ( e, None ) ) ) ?;
68
72
let response = broker
69
73
. request ( request)
70
74
. await
71
- . map_err ( |e| ErrorOrThrottle :: Error ( e. into ( ) ) ) ?;
75
+ . map_err ( |e| ErrorOrThrottle :: Error ( ( e. into ( ) , Some ( gen ) ) ) ) ?;
72
76
73
77
maybe_throttle ( response. throttle_time_ms ) ?;
74
78
75
79
let topic = response
76
80
. topics
77
81
. exactly_one ( )
78
- . map_err ( |e| ErrorOrThrottle :: Error ( Error :: exactly_one_topic ( e) ) ) ?;
82
+ . map_err ( |e| ErrorOrThrottle :: Error ( ( Error :: exactly_one_topic ( e) , Some ( gen ) ) ) ) ?;
79
83
80
84
match topic. error {
81
85
None => Ok ( ( ) ) ,
82
- Some ( protocol_error) => Err ( ErrorOrThrottle :: Error ( Error :: ServerError {
83
- protocol_error,
84
- error_message : topic. error_message . and_then ( |s| s. 0 ) ,
85
- request : RequestContext :: Topic ( topic. name . 0 ) ,
86
- response : None ,
87
- is_virtual : false ,
88
- } ) ) ,
86
+ Some ( protocol_error) => Err ( ErrorOrThrottle :: Error ( (
87
+ Error :: ServerError {
88
+ protocol_error,
89
+ error_message : topic. error_message . and_then ( |s| s. 0 ) ,
90
+ request : RequestContext :: Topic ( topic. name . 0 ) ,
91
+ response : None ,
92
+ is_virtual : false ,
93
+ } ,
94
+ Some ( gen) ,
95
+ ) ) ) ,
89
96
}
90
97
} )
91
98
. await ?;
@@ -119,10 +126,10 @@ impl BrokerCache for &ControllerClient {
119
126
type R = MessengerTransport ;
120
127
type E = Error ;
121
128
122
- async fn get ( & self ) -> Result < Arc < Self :: R > > {
129
+ async fn get ( & self ) -> Result < ( Arc < Self :: R > , BrokerCacheGeneration ) > {
123
130
let mut current_broker = self . current_broker . lock ( ) . await ;
124
- if let Some ( broker) = & * current_broker {
125
- return Ok ( Arc :: clone ( broker) ) ;
131
+ if let Some ( broker) = & current_broker. 0 {
132
+ return Ok ( ( Arc :: clone ( broker) , current_broker . 1 ) ) ;
126
133
}
127
134
128
135
info ! ( "Creating new controller broker connection" , ) ;
@@ -135,13 +142,28 @@ impl BrokerCache for &ControllerClient {
135
142
) )
136
143
} ) ?;
137
144
138
- * current_broker = Some ( Arc :: clone ( & broker) ) ;
139
- Ok ( broker)
145
+ current_broker. 0 = Some ( Arc :: clone ( & broker) ) ;
146
+ current_broker. 1 . bump ( ) ;
147
+
148
+ Ok ( ( broker, current_broker. 1 ) )
140
149
}
141
150
142
- async fn invalidate ( & self , reason : & ' static str ) {
151
+ async fn invalidate ( & self , reason : & ' static str , gen : BrokerCacheGeneration ) {
152
+ let mut guard = self . current_broker . lock ( ) . await ;
153
+
154
+ if guard. 1 != gen {
155
+ // stale request
156
+ debug ! (
157
+ reason,
158
+ current_gen = guard. 1 . get( ) ,
159
+ request_gen = gen . get( ) ,
160
+ "stale invalidation request for arbitrary broker cache" ,
161
+ ) ;
162
+ return ;
163
+ }
164
+
143
165
info ! ( reason, "Invalidating cached controller broker" , ) ;
144
- self . current_broker . lock ( ) . await . take ( ) ;
166
+ guard . 0 . take ( ) ;
145
167
}
146
168
}
147
169
@@ -156,13 +178,15 @@ async fn maybe_retry<B, R, F, T>(
156
178
where
157
179
B : BrokerCache ,
158
180
R : ( Fn ( ) -> F ) + Send + Sync ,
159
- F : std:: future:: Future < Output = Result < T , ErrorOrThrottle < Error > > > + Send ,
181
+ F : std:: future:: Future <
182
+ Output = Result < T , ErrorOrThrottle < ( Error , Option < BrokerCacheGeneration > ) > > ,
183
+ > + Send ,
160
184
{
161
185
let mut backoff = Backoff :: new ( backoff_config) ;
162
186
163
187
backoff
164
188
. retry_with_backoff ( request_name, || async {
165
- let error = match f ( ) . await {
189
+ let ( error, cache_gen ) = match f ( ) . await {
166
190
Ok ( v) => {
167
191
return ControlFlow :: Break ( Ok ( v) ) ;
168
192
}
@@ -176,19 +200,26 @@ where
176
200
// broken connection
177
201
Error :: Request ( RequestError :: Poisoned ( _) | RequestError :: IO ( _) )
178
202
| Error :: Connection ( _) => {
179
- broker_cache
180
- . invalidate ( "controller client: connection broken" )
181
- . await
203
+ if let Some ( cache_gen) = cache_gen {
204
+ broker_cache
205
+ . invalidate ( "controller client: connection broken" , cache_gen)
206
+ . await
207
+ }
182
208
}
183
209
184
210
// our broker is actually not the controller
185
211
Error :: ServerError {
186
212
protocol_error : ProtocolError :: NotController ,
187
213
..
188
214
} => {
189
- broker_cache
190
- . invalidate ( "controller client: server error: not controller" )
191
- . await ;
215
+ if let Some ( cache_gen) = cache_gen {
216
+ broker_cache
217
+ . invalidate (
218
+ "controller client: server error: not controller" ,
219
+ cache_gen,
220
+ )
221
+ . await ;
222
+ }
192
223
}
193
224
194
225
// fatal
0 commit comments