Skip to content

Commit 80d13de

Browse files
committed
feat: respect throttling
Closes #182.
1 parent 12c8d4c commit 80d13de

File tree

7 files changed

+235
-114
lines changed

7 files changed

+235
-114
lines changed

src/backoff.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ impl Default for BackoffConfig {
2929
pub type BackoffError = std::convert::Infallible;
3030
pub type BackoffResult<T> = Result<T, BackoffError>;
3131

32+
/// Error (which should increase backoff) or throttle for a specific duration (as asked for by the broker).
33+
#[derive(Debug)]
34+
pub enum ErrorOrThrottle<E>
35+
where
36+
E: std::error::Error + Send,
37+
{
38+
Error(E),
39+
Throttle(Duration),
40+
}
41+
3242
/// [`Backoff`] can be created from a [`BackoffConfig`]
3343
///
3444
/// Consecutive calls to [`Backoff::next`] will return the next backoff interval
@@ -99,23 +109,32 @@ impl Backoff {
99109
) -> BackoffResult<B>
100110
where
101111
F: (Fn() -> F1) + Send + Sync,
102-
F1: std::future::Future<Output = ControlFlow<B, E>> + Send,
112+
F1: std::future::Future<Output = ControlFlow<B, ErrorOrThrottle<E>>> + Send,
103113
E: std::error::Error + Send,
104114
{
105115
loop {
106-
let e = match do_stuff().await {
107-
ControlFlow::Break(r) => break Ok(r),
108-
ControlFlow::Continue(e) => e,
116+
// split match statement from `tokio::time::sleep`, because otherwise rustc requires `B: Send`
117+
let sleep_time = match do_stuff().await {
118+
ControlFlow::Break(r) => {
119+
break Ok(r);
120+
}
121+
ControlFlow::Continue(ErrorOrThrottle::Error(e)) => {
122+
let backoff = self.next();
123+
info!(
124+
e=%e,
125+
request_name,
126+
backoff_secs = backoff.as_secs(),
127+
"request encountered non-fatal error - backing off",
128+
);
129+
backoff
130+
}
131+
ControlFlow::Continue(ErrorOrThrottle::Throttle(throttle)) => {
132+
info!(?throttle, request_name, "broker asked us to throttle",);
133+
throttle
134+
}
109135
};
110136

111-
let backoff = self.next();
112-
info!(
113-
e=%e,
114-
request_name,
115-
backoff_secs = backoff.as_secs(),
116-
"request encountered non-fatal error - backing off",
117-
);
118-
tokio::time::sleep(backoff).await;
137+
tokio::time::sleep(sleep_time).await;
119138
}
120139
}
121140
}

src/client/controller.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tokio::sync::Mutex;
55
use tracing::{debug, error, info};
66

77
use crate::{
8-
backoff::{Backoff, BackoffConfig},
8+
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
99
client::{Error, Result},
1010
connection::{
1111
BrokerCache, BrokerConnection, BrokerConnector, MessengerTransport, MetadataLookupMode,
@@ -16,6 +16,7 @@ use crate::{
1616
messages::{CreateTopicRequest, CreateTopicsRequest},
1717
primitives::{Int16, Int32, String_},
1818
},
19+
throttle::maybe_throttle,
1920
validation::ExactlyOne,
2021
};
2122

@@ -63,23 +64,28 @@ impl ControllerClient {
6364
};
6465

6566
maybe_retry(&self.backoff_config, self, "create_topic", || async move {
66-
let broker = self.get().await?;
67-
let response = broker.request(request).await?;
67+
let broker = self.get().await.map_err(ErrorOrThrottle::Error)?;
68+
let response = broker
69+
.request(request)
70+
.await
71+
.map_err(|e| ErrorOrThrottle::Error(e.into()))?;
72+
73+
maybe_throttle(response.throttle_time_ms)?;
6874

6975
let topic = response
7076
.topics
7177
.exactly_one()
72-
.map_err(Error::exactly_one_topic)?;
78+
.map_err(|e| ErrorOrThrottle::Error(Error::exactly_one_topic(e)))?;
7379

7480
match topic.error {
7581
None => Ok(()),
76-
Some(protocol_error) => Err(Error::ServerError {
82+
Some(protocol_error) => Err(ErrorOrThrottle::Error(Error::ServerError {
7783
protocol_error,
7884
error_message: topic.error_message.and_then(|s| s.0),
7985
request: RequestContext::Topic(topic.name.0),
8086
response: None,
8187
is_virtual: false,
82-
}),
88+
})),
8389
}
8490
})
8591
.await?;
@@ -150,15 +156,20 @@ async fn maybe_retry<B, R, F, T>(
150156
where
151157
B: BrokerCache,
152158
R: (Fn() -> F) + Send + Sync,
153-
F: std::future::Future<Output = Result<T>> + Send,
159+
F: std::future::Future<Output = Result<T, ErrorOrThrottle<Error>>> + Send,
154160
{
155161
let mut backoff = Backoff::new(backoff_config);
156162

157163
backoff
158164
.retry_with_backoff(request_name, || async {
159165
let error = match f().await {
160-
Ok(v) => return ControlFlow::Break(Ok(v)),
161-
Err(e) => e,
166+
Ok(v) => {
167+
return ControlFlow::Break(Ok(v));
168+
}
169+
Err(ErrorOrThrottle::Throttle(t)) => {
170+
return ControlFlow::Continue(ErrorOrThrottle::Throttle(t));
171+
}
172+
Err(ErrorOrThrottle::Error(e)) => e,
162173
};
163174

164175
match error {
@@ -184,7 +195,7 @@ where
184195
return ControlFlow::Break(Err(error));
185196
}
186197
}
187-
ControlFlow::Continue(error)
198+
ControlFlow::Continue(ErrorOrThrottle::Error(error))
188199
})
189200
.await
190201
.map_err(Error::RetryFailed)?

src/client/partition.rs

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
backoff::{Backoff, BackoffConfig},
2+
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
33
client::error::{Error, RequestContext, Result},
44
connection::{
55
BrokerCache, BrokerConnection, BrokerConnector, MessengerTransport, MetadataLookupMode,
@@ -19,6 +19,7 @@ use crate::{
1919
record::{Record as ProtocolRecord, *},
2020
},
2121
record::{Record, RecordAndOffset},
22+
throttle::maybe_throttle,
2223
validation::ExactlyOne,
2324
};
2425
use async_trait::async_trait;
@@ -151,7 +152,7 @@ impl PartitionClient {
151152
&*brokers,
152153
"leader_detection",
153154
|| async move {
154-
scope.get().await?;
155+
scope.get().await.map_err(ErrorOrThrottle::Error)?;
155156
Ok(())
156157
},
157158
)
@@ -190,9 +191,14 @@ impl PartitionClient {
190191
self,
191192
"produce",
192193
|| async move {
193-
let broker = self.get().await?;
194-
let response = broker.request(&request).await?;
194+
let broker = self.get().await.map_err(ErrorOrThrottle::Error)?;
195+
let response = broker
196+
.request(&request)
197+
.await
198+
.map_err(|e| ErrorOrThrottle::Error(e.into()))?;
199+
maybe_throttle(response.throttle_time_ms)?;
195200
process_produce_response(self.partition, &self.topic, n, response)
201+
.map_err(ErrorOrThrottle::Error)
196202
},
197203
)
198204
.await
@@ -221,8 +227,16 @@ impl PartitionClient {
221227
self,
222228
"fetch_records",
223229
|| async move {
224-
let response = self.get().await?.request(&request).await?;
230+
let response = self
231+
.get()
232+
.await
233+
.map_err(ErrorOrThrottle::Error)?
234+
.request(&request)
235+
.await
236+
.map_err(|e| ErrorOrThrottle::Error(e.into()))?;
237+
maybe_throttle(response.throttle_time_ms)?;
225238
process_fetch_response(self.partition, &self.topic, response, offset)
239+
.map_err(ErrorOrThrottle::Error)
226240
},
227241
)
228242
.await?;
@@ -248,8 +262,16 @@ impl PartitionClient {
248262
self,
249263
"get_offset",
250264
|| async move {
251-
let response = self.get().await?.request(&request).await?;
265+
let response = self
266+
.get()
267+
.await
268+
.map_err(ErrorOrThrottle::Error)?
269+
.request(&request)
270+
.await
271+
.map_err(|e| ErrorOrThrottle::Error(e.into()))?;
272+
maybe_throttle(response.throttle_time_ms)?;
252273
process_list_offsets_response(self.partition, &self.topic, response)
274+
.map_err(ErrorOrThrottle::Error)
253275
},
254276
)
255277
.await?;
@@ -272,8 +294,16 @@ impl PartitionClient {
272294
self,
273295
"delete_records",
274296
|| async move {
275-
let response = self.get().await?.request(&request).await?;
297+
let response = self
298+
.get()
299+
.await
300+
.map_err(ErrorOrThrottle::Error)?
301+
.request(&request)
302+
.await
303+
.map_err(|e| ErrorOrThrottle::Error(e.into()))?;
304+
maybe_throttle(Some(response.throttle_time_ms))?;
276305
process_delete_records_response(&self.topic, self.partition, response)
306+
.map_err(ErrorOrThrottle::Error)
277307
},
278308
)
279309
.await?;
@@ -462,15 +492,20 @@ async fn maybe_retry<B, R, F, T>(
462492
where
463493
B: BrokerCache,
464494
R: (Fn() -> F) + Send + Sync,
465-
F: std::future::Future<Output = Result<T>> + Send,
495+
F: std::future::Future<Output = Result<T, ErrorOrThrottle<Error>>> + Send,
466496
{
467497
let mut backoff = Backoff::new(backoff_config);
468498

469499
backoff
470500
.retry_with_backoff(request_name, || async {
471501
let error = match f().await {
472-
Ok(v) => return ControlFlow::Break(Ok(v)),
473-
Err(e) => e,
502+
Ok(v) => {
503+
return ControlFlow::Break(Ok(v));
504+
}
505+
Err(ErrorOrThrottle::Throttle(throttle)) => {
506+
return ControlFlow::Continue(ErrorOrThrottle::Throttle(throttle));
507+
}
508+
Err(ErrorOrThrottle::Error(e)) => e,
474509
};
475510

476511
let retry = match error {
@@ -504,7 +539,7 @@ where
504539
};
505540

506541
if retry {
507-
ControlFlow::Continue(error)
542+
ControlFlow::Continue(ErrorOrThrottle::Error(error))
508543
} else {
509544
error!(
510545
e=%error,

src/connection.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ use thiserror::Error;
66
use tokio::{io::BufStream, sync::Mutex};
77
use tracing::{debug, error, info, warn};
88

9+
use crate::backoff::ErrorOrThrottle;
910
use crate::connection::topology::{Broker, BrokerTopology};
1011
use crate::connection::transport::Transport;
1112
use crate::messenger::{Messenger, RequestError};
1213
use crate::protocol::messages::{MetadataRequest, MetadataRequestTopic, MetadataResponse};
1314
use crate::protocol::primitives::String_;
15+
use crate::throttle::maybe_throttle;
1416
use crate::{
1517
backoff::{Backoff, BackoffConfig, BackoffError},
1618
client::metadata_cache::MetadataCache,
@@ -418,7 +420,7 @@ where
418420
"Failed to connect to any broker, backing off".to_string(),
419421
);
420422
let err: Arc<dyn std::error::Error + Send + Sync> = err.into();
421-
ControlFlow::Continue(err)
423+
ControlFlow::Continue(ErrorOrThrottle::Error(err))
422424
})
423425
.await
424426
.map_err(Error::RetryFailed)
@@ -449,12 +451,18 @@ where
449451
};
450452

451453
match broker.metadata_request(request_params).await {
452-
Ok(response) => ControlFlow::Break(Ok(response)),
454+
Ok(response) => {
455+
if let Err(e) = maybe_throttle(response.throttle_time_ms) {
456+
return ControlFlow::Continue(e);
457+
}
458+
459+
ControlFlow::Break(Ok(response))
460+
}
453461
Err(e @ RequestError::Poisoned(_) | e @ RequestError::IO(_))
454462
if !matches!(metadata_mode, MetadataLookupMode::SpecificBroker(_)) =>
455463
{
456464
arbitrary_broker_cache.invalidate().await;
457-
ControlFlow::Continue(e)
465+
ControlFlow::Continue(ErrorOrThrottle::Error(e))
458466
}
459467
Err(error) => {
460468
error!(

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ mod protocol;
3636

3737
pub mod record;
3838

39+
mod throttle;
40+
3941
pub mod topic;
4042

4143
// re-exports

0 commit comments

Comments
 (0)