Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ControllerClient {
// Request an uncached, fresh copy of the metadata.
let (metadata, _gen) = self
.brokers
.request_metadata(MetadataLookupMode::ArbitraryBroker, Some(vec![]))
.request_metadata(&MetadataLookupMode::ArbitraryBroker, Some(vec![]))
.await?;

let controller_id = metadata
Expand Down
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Client {
// will update the cached metadata entry.
let (response, _gen) = self
.brokers
.request_metadata(MetadataLookupMode::ArbitraryBroker, None)
.request_metadata(&MetadataLookupMode::ArbitraryBroker, None)
.await?;

Ok(response
Expand Down
3 changes: 2 additions & 1 deletion src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl PartitionClient {
) -> Result<(i32, Option<MetadataCacheGeneration>)> {
let (metadata, gen) = self
.brokers
.request_metadata(metadata_mode, Some(vec![self.topic.clone()]))
.request_metadata(&metadata_mode, Some(vec![self.topic.clone()]))
.await?;

let topic = metadata
Expand Down Expand Up @@ -403,6 +403,7 @@ impl PartitionClient {
topic=%self.topic,
partition=%self.partition,
leader=partition.leader_id.0,
%metadata_mode,
"Detected leader",
);
Ok((partition.leader_id.0, gen))
Expand Down
16 changes: 13 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub enum MetadataLookupMode<B = BrokerConnection> {
CachedArbitrary,
}

impl<B> std::fmt::Display for MetadataLookupMode<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ArbitraryBroker => write!(f, "ArbitraryBroker"),
Self::SpecificBroker(_) => f.debug_tuple("SpecificBroker").field(&"...").finish(),
Self::CachedArbitrary => write!(f, "CachedArbitrary"),
}
}
}

/// Info needed to connect to a broker, with [optional broker ID](Self::id) for debugging
enum BrokerRepresentation {
/// URL specified as a bootstrap broker
Expand Down Expand Up @@ -193,7 +203,7 @@ impl BrokerConnector {

/// Fetch and cache metadata
pub async fn refresh_metadata(&self) -> Result<()> {
self.request_metadata(MetadataLookupMode::ArbitraryBroker, None)
self.request_metadata(&MetadataLookupMode::ArbitraryBroker, None)
.await?;

Ok(())
Expand Down Expand Up @@ -223,7 +233,7 @@ impl BrokerConnector {
/// entry - doing so will panic.
pub async fn request_metadata(
&self,
metadata_mode: MetadataLookupMode,
metadata_mode: &MetadataLookupMode,
topics: Option<Vec<String>>,
) -> Result<(MetadataResponse, Option<MetadataCacheGeneration>)> {
// Return a cached metadata response as an optimisation to prevent
Expand Down Expand Up @@ -251,7 +261,7 @@ impl BrokerConnector {
allow_auto_topic_creation: None,
};

let response = metadata_request_with_retry(&metadata_mode, &request, backoff, self).await?;
let response = metadata_request_with_retry(metadata_mode, &request, backoff, self).await?;

// If the request was for a full, unfiltered set of topics, cache the
// response for later calls to make use of.
Expand Down