Skip to content

Commit 92e4cfe

Browse files
Merge pull request #213 from cognitedata/clientbuilder-accept-backoffconfig
refactor: Clientbuilder accept backoffconfig
2 parents fe0cc3b + 861c444 commit 92e4cfe

File tree

6 files changed

+32
-9
lines changed

6 files changed

+32
-9
lines changed

src/backoff.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tracing::info;
66
/// Exponential backoff with jitter
77
///
88
/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
9+
#[allow(missing_copy_implementations)]
910
#[derive(Debug, Clone)]
1011
pub struct BackoffConfig {
1112
pub init_backoff: Duration,

src/client/controller.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@ use super::error::RequestContext;
2727
pub struct ControllerClient {
2828
brokers: Arc<BrokerConnector>,
2929

30-
backoff_config: BackoffConfig,
30+
backoff_config: Arc<BackoffConfig>,
3131

3232
/// Current broker connection if any
3333
current_broker: Mutex<(Option<BrokerConnection>, BrokerCacheGeneration)>,
3434
}
3535

3636
impl ControllerClient {
37-
pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
37+
pub(super) fn new(brokers: Arc<BrokerConnector>, backoff_config: Arc<BackoffConfig>) -> Self {
3838
Self {
3939
brokers,
40-
backoff_config: Default::default(),
40+
backoff_config,
4141
current_broker: Mutex::new((None, BrokerCacheGeneration::START)),
4242
}
4343
}

src/client/mod.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use thiserror::Error;
44

55
use crate::{
6+
backoff::BackoffConfig,
67
build_info::DEFAULT_CLIENT_ID,
78
client::partition::PartitionClient,
89
connection::{BrokerConnector, MetadataLookupMode, TlsConfig},
@@ -43,6 +44,7 @@ pub struct ClientBuilder {
4344
max_message_size: usize,
4445
socks5_proxy: Option<String>,
4546
tls_config: TlsConfig,
47+
backoff_config: Arc<BackoffConfig>,
4648
}
4749

4850
impl ClientBuilder {
@@ -54,6 +56,7 @@ impl ClientBuilder {
5456
max_message_size: 100 * 1024 * 1024, // 100MB
5557
socks5_proxy: None,
5658
tls_config: TlsConfig::default(),
59+
backoff_config: Default::default(),
5760
}
5861
}
5962

@@ -73,6 +76,12 @@ impl ClientBuilder {
7376
self
7477
}
7578

79+
/// Set up backoff configuration
80+
pub fn backoff_config(mut self, backoff_config: BackoffConfig) -> Self {
81+
self.backoff_config = Arc::from(backoff_config);
82+
self
83+
}
84+
7685
/// Use SOCKS5 proxy.
7786
#[cfg(feature = "transport-socks5")]
7887
pub fn socks5_proxy(mut self, proxy: String) -> Self {
@@ -96,10 +105,14 @@ impl ClientBuilder {
96105
self.tls_config,
97106
self.socks5_proxy,
98107
self.max_message_size,
108+
Arc::clone(&self.backoff_config),
99109
));
100110
brokers.refresh_metadata().await?;
101111

102-
Ok(Client { brokers })
112+
Ok(Client {
113+
brokers,
114+
backoff_config: self.backoff_config,
115+
})
103116
}
104117
}
105118

@@ -118,12 +131,16 @@ impl std::fmt::Debug for ClientBuilder {
118131
#[derive(Debug)]
119132
pub struct Client {
120133
brokers: Arc<BrokerConnector>,
134+
backoff_config: Arc<BackoffConfig>,
121135
}
122136

123137
impl Client {
124138
/// Returns a client for performing certain cluster-wide operations.
125139
pub fn controller_client(&self) -> Result<ControllerClient> {
126-
Ok(ControllerClient::new(Arc::clone(&self.brokers)))
140+
Ok(ControllerClient::new(
141+
Arc::clone(&self.brokers),
142+
Arc::clone(&self.backoff_config),
143+
))
127144
}
128145

129146
/// Returns a client for performing operations on a specific partition
@@ -138,6 +155,7 @@ impl Client {
138155
partition,
139156
Arc::clone(&self.brokers),
140157
unknown_topic_handling,
158+
Arc::clone(&self.backoff_config),
141159
)
142160
.await
143161
}

src/client/partition.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub struct PartitionClient {
123123
partition: i32,
124124
brokers: Arc<BrokerConnector>,
125125

126-
backoff_config: BackoffConfig,
126+
backoff_config: Arc<BackoffConfig>,
127127

128128
/// Current broker connection if any
129129
current_broker: Mutex<CurrentBroker>,
@@ -143,12 +143,13 @@ impl PartitionClient {
143143
partition: i32,
144144
brokers: Arc<BrokerConnector>,
145145
unknown_topic_handling: UnknownTopicHandling,
146+
backoff_config: Arc<BackoffConfig>,
146147
) -> Result<Self> {
147148
let p = Self {
148149
topic,
149150
partition,
150151
brokers: Arc::clone(&brokers),
151-
backoff_config: Default::default(),
152+
backoff_config,
152153
current_broker: Mutex::new(CurrentBroker {
153154
broker: None,
154155
gen_broker: BrokerCacheGeneration::START,

src/connection.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pub struct BrokerConnector {
168168
cached_metadata: MetadataCache,
169169

170170
/// The backoff configuration on error
171-
backoff_config: BackoffConfig,
171+
backoff_config: Arc<BackoffConfig>,
172172

173173
/// TLS configuration if any
174174
tls_config: TlsConfig,
@@ -187,14 +187,15 @@ impl BrokerConnector {
187187
tls_config: TlsConfig,
188188
socks5_proxy: Option<String>,
189189
max_message_size: usize,
190+
backoff_config: Arc<BackoffConfig>,
190191
) -> Self {
191192
Self {
192193
bootstrap_brokers,
193194
client_id,
194195
topology: Default::default(),
195196
cached_arbitrary_broker: Mutex::new((None, BrokerCacheGeneration::START)),
196197
cached_metadata: Default::default(),
197-
backoff_config: Default::default(),
198+
backoff_config,
198199
tls_config,
199200
socks5_proxy,
200201
max_message_size,

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
mod backoff;
2121

22+
pub use backoff::BackoffConfig;
23+
2224
pub mod build_info;
2325

2426
pub mod client;

0 commit comments

Comments
 (0)