Skip to content

Commit 6519cd0

Browse files
Merge pull request #185 from influxdata/crepererum/add_cache_invalidation_logs
feat: log reason about WHY we invalidate caches
2 parents 8c98c56 + 35d04fb commit 6519cd0

File tree

4 files changed

+49
-26
lines changed

4 files changed

+49
-26
lines changed

src/client/controller.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use async_trait::async_trait;
22
use std::ops::ControlFlow;
33
use std::sync::Arc;
44
use tokio::sync::Mutex;
5-
use tracing::{debug, error, info};
5+
use tracing::{error, info};
66

77
use crate::{
88
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
@@ -139,8 +139,8 @@ impl BrokerCache for &ControllerClient {
139139
Ok(broker)
140140
}
141141

142-
async fn invalidate(&self) {
143-
debug!("Invalidating cached controller broker");
142+
async fn invalidate(&self, reason: &'static str) {
143+
info!(reason, "Invalidating cached controller broker",);
144144
self.current_broker.lock().await.take();
145145
}
146146
}
@@ -175,14 +175,20 @@ where
175175
match error {
176176
// broken connection
177177
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
178-
| Error::Connection(_) => broker_cache.invalidate().await,
178+
| Error::Connection(_) => {
179+
broker_cache
180+
.invalidate("controller client: connection broken")
181+
.await
182+
}
179183

180184
// our broker is actually not the controller
181185
Error::ServerError {
182186
protocol_error: ProtocolError::NotController,
183187
..
184188
} => {
185-
broker_cache.invalidate().await;
189+
broker_cache
190+
.invalidate("controller client: server error: not controller")
191+
.await;
186192
}
187193

188194
// fatal

src/client/metadata_cache.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use parking_lot::Mutex;
2-
use tracing::debug;
2+
use tracing::{debug, info};
33

44
use crate::protocol::messages::MetadataResponse;
55

@@ -39,7 +39,7 @@ impl MetadataCache {
3939
// if a caller keeps requesting metadata for a non-existent
4040
// topic.
4141
debug!("cached metadata query for unknown topic");
42-
self.invalidate();
42+
self.invalidate("get from metadata cache: unknown topic");
4343
return None;
4444
}
4545
}
@@ -49,9 +49,9 @@ impl MetadataCache {
4949
Some(m)
5050
}
5151

52-
pub(crate) fn invalidate(&self) {
52+
pub(crate) fn invalidate(&self, reason: &'static str) {
5353
*self.cache.lock() = None;
54-
debug!("invalidated metadata cache");
54+
info!(reason, "invalidated metadata cache",);
5555
}
5656

5757
pub(crate) fn update(&self, m: MetadataResponse) {
@@ -148,7 +148,7 @@ mod tests {
148148
});
149149

150150
assert!(cache.get(&None).is_some());
151-
cache.invalidate();
151+
cache.invalidate("test");
152152
assert!(cache.get(&None).is_none());
153153
}
154154
}

src/client/partition.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -414,14 +414,18 @@ impl BrokerCache for &PartitionClient {
414414
let broker = match self.brokers.connect(leader).await {
415415
Ok(Some(c)) => Ok(c),
416416
Ok(None) => {
417-
self.brokers.invalidate_metadata_cache();
417+
self.brokers.invalidate_metadata_cache(
418+
"partition client: broker that is leader is unknown",
419+
);
418420
Err(Error::InvalidResponse(format!(
419421
"Partition leader {} not found in metadata response",
420422
leader
421423
)))
422424
}
423425
Err(e) => {
424-
self.brokers.invalidate_metadata_cache();
426+
self.brokers.invalidate_metadata_cache(
427+
"partition client: error connecting to partition leader",
428+
);
425429
Err(e.into())
426430
}
427431
}?;
@@ -442,7 +446,9 @@ impl BrokerCache for &PartitionClient {
442446
.await?;
443447
if leader != leader_self {
444448
// The cached metadata identified an incorrect leader - it is stale and should be refreshed.
445-
self.brokers.invalidate_metadata_cache();
449+
self.brokers.invalidate_metadata_cache(
450+
"partition client: broker that should be leader does treat itself as a leader",
451+
);
446452

447453
// this might happen if the leader changed after we got the hint from a arbitrary broker and this specific
448454
// metadata call.
@@ -469,13 +475,14 @@ impl BrokerCache for &PartitionClient {
469475
Ok(broker)
470476
}
471477

472-
async fn invalidate(&self) {
478+
async fn invalidate(&self, reason: &'static str) {
473479
info!(
474480
topic = self.topic.deref(),
475481
partition = self.partition,
482+
reason,
476483
"Invaliding cached leader",
477484
);
478-
self.brokers.invalidate_metadata_cache();
485+
self.brokers.invalidate_metadata_cache(reason);
479486
*self.current_broker.lock().await = None
480487
}
481488
}
@@ -511,7 +518,9 @@ where
511518
let retry = match error {
512519
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
513520
| Error::Connection(_) => {
514-
broker_cache.invalidate().await;
521+
broker_cache
522+
.invalidate("partition client: connection broken")
523+
.await;
515524
true
516525
}
517526
Error::ServerError {
@@ -525,14 +534,18 @@ where
525534
protocol_error: ProtocolError::NotLeaderOrFollower,
526535
..
527536
} => {
528-
broker_cache.invalidate().await;
537+
broker_cache
538+
.invalidate("partition client: server error: not leader or follower")
539+
.await;
529540
true
530541
}
531542
Error::ServerError {
532543
protocol_error: ProtocolError::UnknownTopicOrPartition,
533544
..
534545
} => {
535-
broker_cache.invalidate().await;
546+
broker_cache
547+
.invalidate("partition client: server error: unknown topic or partition")
548+
.await;
536549
unknown_topic_handling == UnknownTopicHandling::Retry
537550
}
538551
_ => false,

src/connection.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::ops::ControlFlow;
44
use std::sync::Arc;
55
use thiserror::Error;
66
use tokio::{io::BufStream, sync::Mutex};
7-
use tracing::{debug, error, info, warn};
7+
use tracing::{error, info, warn};
88

99
use crate::backoff::ErrorOrThrottle;
1010
use crate::connection::topology::{Broker, BrokerTopology};
@@ -264,8 +264,8 @@ impl BrokerConnector {
264264
Ok(response)
265265
}
266266

267-
pub(crate) fn invalidate_metadata_cache(&self) {
268-
self.cached_metadata.invalidate()
267+
pub(crate) fn invalidate_metadata_cache(&self, reason: &'static str) {
268+
self.cached_metadata.invalidate(reason)
269269
}
270270

271271
/// Returns a new connection to the broker with the provided id
@@ -343,7 +343,7 @@ pub trait BrokerCache: Send + Sync {
343343

344344
async fn get(&self) -> Result<Arc<Self::R>, Self::E>;
345345

346-
async fn invalidate(&self);
346+
async fn invalidate(&self, reason: &'static str);
347347
}
348348

349349
/// BrokerConnector caches an arbitrary broker that can successfully connect.
@@ -372,8 +372,8 @@ impl BrokerCache for &BrokerConnector {
372372
Ok(connection)
373373
}
374374

375-
async fn invalidate(&self) {
376-
debug!("Invalidating cached arbitrary broker");
375+
async fn invalidate(&self, reason: &'static str) {
376+
info!(reason, "Invalidating cached arbitrary broker",);
377377
self.cached_arbitrary_broker.lock().await.take();
378378
}
379379
}
@@ -461,7 +461,11 @@ where
461461
Err(e @ RequestError::Poisoned(_) | e @ RequestError::IO(_))
462462
if !matches!(metadata_mode, MetadataLookupMode::SpecificBroker(_)) =>
463463
{
464-
arbitrary_broker_cache.invalidate().await;
464+
arbitrary_broker_cache
465+
.invalidate(
466+
"metadata request: arbitrary/cached broker is connection is broken",
467+
)
468+
.await;
465469
ControlFlow::Continue(ErrorOrThrottle::Error(e))
466470
}
467471
Err(error) => {
@@ -523,7 +527,7 @@ mod tests {
523527
(self.get)()
524528
}
525529

526-
async fn invalidate(&self) {
530+
async fn invalidate(&self, _reason: &'static str) {
527531
(self.invalidate)()
528532
}
529533
}

0 commit comments

Comments
 (0)