Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ControllerClient {
/// Retrieve the broker ID of the controller
async fn get_controller_id(&self) -> Result<i32> {
// Request an uncached, fresh copy of the metadata.
let metadata = self
let (metadata, _gen) = self
.brokers
.request_metadata(MetadataLookupMode::ArbitraryBroker, Some(vec![]))
.await?;
Expand Down
84 changes: 70 additions & 14 deletions src/client/metadata_cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
use std::ops::Deref;

use parking_lot::Mutex;
use tracing::{debug, info};

use crate::protocol::messages::MetadataResponse;

/// Cache generation for [`MetadataCache`].
///
/// This is used to avoid double-invalidating a cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MetadataCacheGeneration(usize);

/// A [`MetadataCache`] provides look-aside caching of [`MetadataResponse`]
/// instances.
#[derive(Debug, Default)]
#[derive(Debug)]
pub(crate) struct MetadataCache {
cache: Mutex<Option<MetadataResponse>>,
cache: Mutex<(Option<MetadataResponse>, MetadataCacheGeneration)>,
}

impl Default for MetadataCache {
fn default() -> Self {
Self {
cache: Mutex::new((None, MetadataCacheGeneration(0))),
}
}
}

impl MetadataCache {
Expand All @@ -16,8 +32,16 @@ impl MetadataCache {
/// If `topics` is `Some` the returned metadata contains topics that are
/// filtered to match by name. If a topic name is specified that doesn't
/// exist in the cached metadata, the cache is invalidated.
pub(crate) fn get(&self, topics: &Option<Vec<String>>) -> Option<MetadataResponse> {
let mut m = self.cache.lock().clone()?;
pub(crate) fn get(
&self,
topics: &Option<Vec<String>>,
) -> Option<(MetadataResponse, MetadataCacheGeneration)> {
let (mut m, gen) = match self.cache.lock().deref() {
(Some(m), gen) => (m.clone(), *gen),
(None, _) => {
return None;
}
};

// If the caller requested a subset of topics, filter the cached result
// to ensure only the expected topics are present.
Expand All @@ -39,23 +63,37 @@ impl MetadataCache {
// if a caller keeps requesting metadata for a non-existent
// topic.
debug!("cached metadata query for unknown topic");
self.invalidate("get from metadata cache: unknown topic");
self.invalidate("get from metadata cache: unknown topic", gen);
return None;
}
}

debug!(?m, "using cached metadata response");

Some(m)
Some((m, gen))
}

pub(crate) fn invalidate(&self, reason: &'static str) {
*self.cache.lock() = None;
pub(crate) fn invalidate(&self, reason: &'static str, gen: MetadataCacheGeneration) {
let mut guard = self.cache.lock();
if guard.1 != gen {
// stale request
debug!(
reason,
current_gen = guard.1 .0,
request_gen = gen.0,
"stale invalidation request for metadata cache",
);
return;
}

guard.0 = None;
info!(reason, "invalidated metadata cache",);
}

pub(crate) fn update(&self, m: MetadataResponse) {
*self.cache.lock() = Some(m);
let mut guard = self.cache.lock();
guard.0 = Some(m);
guard.1 .0 += 1;
debug!("updated metadata cache");
}
}
Expand Down Expand Up @@ -99,7 +137,7 @@ mod tests {
let m = response_with_topics(None);
cache.update(m.clone());

let got = cache.get(&None).expect("should have cached entry");
let (got, _gen) = cache.get(&None).expect("should have cached entry");
assert_eq!(m, got);
}

Expand All @@ -109,16 +147,16 @@ mod tests {
cache.update(response_with_topics(Some(&["bananas", "platanos"])));

// Request a subset of the topics
let got = cache
let (got, _gen) = cache
.get(&Some(vec!["bananas".to_string()]))
.expect("should have cached entry");
assert_eq!(response_with_topics(Some(&["bananas"])), got);

let got = cache.get(&Some(vec![])).expect("should have cached entry");
let (got, _gen) = cache.get(&Some(vec![])).expect("should have cached entry");
assert_eq!(response_with_topics(Some(&[])), got);

// A request for "None" actually means "all of them".
let got = cache.get(&None).expect("should have cached entry");
let (got, _gen) = cache.get(&None).expect("should have cached entry");
assert_eq!(response_with_topics(Some(&["bananas", "platanos"])), got);
}

Expand Down Expand Up @@ -147,8 +185,26 @@ mod tests {
topics: Default::default(),
});

let (_data, gen1) = cache.get(&None).unwrap();
cache.invalidate("test", gen1);
assert!(cache.get(&None).is_none());

cache.update(MetadataResponse {
throttle_time_ms: Default::default(),
brokers: Default::default(),
cluster_id: Default::default(),
controller_id: Default::default(),
topics: Default::default(),
});

let (_data, gen2) = cache.get(&None).unwrap();

// outdated gen
cache.invalidate("test", gen1);
assert!(cache.get(&None).is_some());
cache.invalidate("test");

// the actual gen
cache.invalidate("test", gen2);
assert!(cache.get(&None).is_none());
}
}
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Client {
//
// Because this is an unconstrained metadata request (all topics) it
// will update the cached metadata entry.
let response = self
let (response, _gen) = self
.brokers
.request_metadata(MetadataLookupMode::ArbitraryBroker, None)
.await?;
Expand Down
77 changes: 55 additions & 22 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
use tokio::sync::Mutex;
use tracing::{error, info};

use super::error::ServerErrorResponse;
use super::{error::ServerErrorResponse, metadata_cache::MetadataCacheGeneration};

/// How strongly a [`PartitionClient`] is bound to a partition.
///
Expand Down Expand Up @@ -99,6 +99,13 @@ pub enum OffsetAt {
Latest,
}

#[derive(Debug, Default)]
struct CurrentBroker {
broker: Option<BrokerConnection>,
gen_leader_from_arbitrary: Option<MetadataCacheGeneration>,
gen_leader_from_self: Option<MetadataCacheGeneration>,
}

/// Many operations must be performed on the leader for a partition
///
/// Additionally a partition is the unit of concurrency within Kafka
Expand All @@ -117,7 +124,7 @@ pub struct PartitionClient {
backoff_config: BackoffConfig,

/// Current broker connection if any
current_broker: Mutex<Option<BrokerConnection>>,
current_broker: Mutex<CurrentBroker>,

unknown_topic_handling: UnknownTopicHandling,
}
Expand All @@ -140,7 +147,7 @@ impl PartitionClient {
partition,
brokers: Arc::clone(&brokers),
backoff_config: Default::default(),
current_broker: Mutex::new(None),
current_broker: Mutex::new(CurrentBroker::default()),
unknown_topic_handling,
};

Expand Down Expand Up @@ -312,8 +319,11 @@ impl PartitionClient {
}

/// Retrieve the broker ID of the partition leader
async fn get_leader(&self, metadata_mode: MetadataLookupMode) -> Result<i32> {
let metadata = self
async fn get_leader(
&self,
metadata_mode: MetadataLookupMode,
) -> Result<(i32, Option<MetadataCacheGeneration>)> {
let (metadata, gen) = self
.brokers
.request_metadata(metadata_mode, Some(vec![self.topic.clone()]))
.await?;
Expand Down Expand Up @@ -379,7 +389,7 @@ impl PartitionClient {
leader=partition.leader_id.0,
"Detected leader",
);
Ok(partition.leader_id.0)
Ok((partition.leader_id.0, gen))
}
}

Expand All @@ -391,7 +401,7 @@ impl BrokerCache for &PartitionClient {

async fn get(&self) -> Result<Arc<Self::R>> {
let mut current_broker = self.current_broker.lock().await;
if let Some(broker) = &*current_broker {
if let Some(broker) = &current_broker.broker {
return Ok(Arc::clone(broker));
}

Expand All @@ -410,22 +420,29 @@ impl BrokerCache for &PartitionClient {
// * A subsequent query is performed against the leader that does not use the cached entry - this validates
// the correctness of the cached entry.
//
let leader = self.get_leader(MetadataLookupMode::CachedArbitrary).await?;
let (leader, gen_leader_from_arbitrary) =
self.get_leader(MetadataLookupMode::CachedArbitrary).await?;
let broker = match self.brokers.connect(leader).await {
Ok(Some(c)) => Ok(c),
Ok(None) => {
self.brokers.invalidate_metadata_cache(
"partition client: broker that is leader is unknown",
);
if let Some(gen) = gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(
"partition client: broker that is leader is unknown",
gen,
);
}
Err(Error::InvalidResponse(format!(
"Partition leader {} not found in metadata response",
leader
)))
}
Err(e) => {
self.brokers.invalidate_metadata_cache(
"partition client: error connecting to partition leader",
);
if let Some(gen) = gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(
"partition client: error connecting to partition leader",
gen,
);
}
Err(e.into())
}
}?;
Expand All @@ -441,14 +458,17 @@ impl BrokerCache for &PartitionClient {
//
// Because this check requires the most up-to-date metadata from a specific broker, do not accept a cached
// metadata response.
let leader_self = self
let (leader_self, gen_leader_from_self) = self
.get_leader(MetadataLookupMode::SpecificBroker(Arc::clone(&broker)))
.await?;
if leader != leader_self {
// The cached metadata identified an incorrect leader - it is stale and should be refreshed.
self.brokers.invalidate_metadata_cache(
"partition client: broker that should be leader does treat itself as a leader",
);
if let Some(gen) = gen_leader_from_self {
// The cached metadata identified an incorrect leader - it is stale and should be refreshed.
self.brokers.invalidate_metadata_cache(
"partition client: broker that should be leader does treat itself as a leader",
gen,
);
}

// this might happen if the leader changed after we got the hint from a arbitrary broker and this specific
// metadata call.
Expand All @@ -464,7 +484,11 @@ impl BrokerCache for &PartitionClient {
});
}

*current_broker = Some(Arc::clone(&broker));
*current_broker = CurrentBroker {
broker: Some(Arc::clone(&broker)),
gen_leader_from_arbitrary,
gen_leader_from_self,
};

info!(
topic=%self.topic,
Expand All @@ -482,8 +506,17 @@ impl BrokerCache for &PartitionClient {
reason,
"Invaliding cached leader",
);
self.brokers.invalidate_metadata_cache(reason);
*self.current_broker.lock().await = None

let mut current_broker = self.current_broker.lock().await;

if let Some(gen) = current_broker.gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(reason, gen);
}
if let Some(gen) = current_broker.gen_leader_from_self {
self.brokers.invalidate_metadata_cache(reason, gen);
}

current_broker.broker = None
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::{io::BufStream, sync::Mutex};
use tracing::{error, info, warn};

use crate::backoff::ErrorOrThrottle;
use crate::client::metadata_cache::MetadataCacheGeneration;
use crate::connection::topology::{Broker, BrokerTopology};
use crate::connection::transport::Transport;
use crate::messenger::{Messenger, RequestError};
Expand Down Expand Up @@ -224,7 +225,7 @@ impl BrokerConnector {
&self,
metadata_mode: MetadataLookupMode,
topics: Option<Vec<String>>,
) -> Result<MetadataResponse> {
) -> Result<(MetadataResponse, Option<MetadataCacheGeneration>)> {
// Return a cached metadata response as an optimisation to prevent
// multiple successive metadata queries for the same topic across
// multiple PartitionClient instances.
Expand All @@ -235,8 +236,8 @@ impl BrokerConnector {
// Client initialises this cache at construction time, so unless
// invalidated, there will always be a cached entry available.
if matches!(metadata_mode, MetadataLookupMode::CachedArbitrary) {
if let Some(m) = self.cached_metadata.get(&topics) {
return Ok(m);
if let Some((m, gen)) = self.cached_metadata.get(&topics) {
return Ok((m, Some(gen)));
}
}

Expand All @@ -261,11 +262,15 @@ impl BrokerConnector {
// Since the metadata request contains information about the cluster state, use it to update our view.
self.topology.update(&response.brokers);

Ok(response)
Ok((response, None))
}

pub(crate) fn invalidate_metadata_cache(&self, reason: &'static str) {
self.cached_metadata.invalidate(reason)
pub(crate) fn invalidate_metadata_cache(
&self,
reason: &'static str,
gen: MetadataCacheGeneration,
) {
self.cached_metadata.invalidate(reason, gen)
}

/// Returns a new connection to the broker with the provided id
Expand Down