Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ jobs:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker:
- image: quay.io/influxdb/rust:ci
- image: vectorized/redpanda:v22.2.1
- image: redpandadata/redpanda:v22.2.1
name: redpanda-0
command:
- redpanda
Expand All @@ -151,7 +151,7 @@ jobs:
- --kafka-addr redpanda-0:9092
- --rpc-addr redpanda-0:33145
- --set redpanda.auto_create_topics_enabled=false
- image: vectorized/redpanda:v22.2.1
- image: redpandadata/redpanda:v22.2.1
name: redpanda-1
command:
- redpanda
Expand All @@ -166,7 +166,7 @@ jobs:
- --rpc-addr redpanda-1:33145
- --seeds redpanda-0:33145
- --set redpanda.auto_create_topics_enabled=false
- image: vectorized/redpanda:v22.2.1
- image: redpandadata/redpanda:v22.2.1
name: redpanda-2
command:
- redpanda
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "rskafka"
version = "0.5.0"
edition = "2021"
rust-version = "1.80"
edition = "2024"
rust-version = "1.85"
license = "MIT OR Apache-2.0"
readme = "README.md"
keywords = [
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
[LLDB]: https://lldb.llvm.org/
[LZ4]: https://lz4.github.io/lz4/
[perf]: https://perf.wiki.kernel.org/index.php/Main_Page
[Redpanda]: https://vectorized.io/redpanda
[Redpanda]: https://www.redpanda.com/
[rustls]: https://github.com/rustls/rustls
[Snappy]: https://github.com/google/snappy
[zstd]: https://github.com/facebook/zstd
22 changes: 10 additions & 12 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ use std::{

use chrono::{TimeZone, Utc};
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
BenchmarkGroup, Criterion, SamplingMode, criterion_group, criterion_main, measurement::WallTime,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesUnordered};
use parking_lot::Once;
use rdkafka::{
ClientConfig, TopicPartitionList,
consumer::{Consumer, StreamConsumer as RdStreamConsumer},
producer::{FutureProducer, FutureRecord},
util::Timeout,
ClientConfig, TopicPartitionList,
};
use rskafka::{
client::{
ClientBuilder,
consumer::{StartOffset, StreamConsumerBuilder as RsStreamConsumerBuilder},
partition::{Compression, PartitionClient, UnknownTopicHandling},
producer::{aggregator::RecordAggregator, BatchProducerBuilder},
ClientBuilder,
producer::{BatchProducerBuilder, aggregator::RecordAggregator},
},
record::Record,
};
Expand Down Expand Up @@ -219,20 +219,18 @@ pub fn criterion_benchmark(c: &mut Criterion) {
}
}

async fn exec_sequential<F, Fut>(f: F, iters: u64)
async fn exec_sequential<F>(f: F, iters: u64)
where
F: Fn() -> Fut,
Fut: Future<Output = ()>,
F: AsyncFn(),
{
for _ in 0..iters {
f().await;
}
}

async fn exec_parallel<F, Fut>(f: F, iters: u64)
async fn exec_parallel<F>(f: F, iters: u64)
where
F: Fn() -> Fut,
Fut: Future<Output = ()>,
F: AsyncFn(),
{
let mut tasks: FuturesUnordered<_> = (0..iters).map(|_| f()).collect();
while tasks.next().await.is_some() {}
Expand Down Expand Up @@ -462,7 +460,7 @@ pub fn maybe_start_logging() {
/// Start logging.
pub fn start_logging() {
use tracing_log::LogTracer;
use tracing_subscriber::{filter::EnvFilter, FmtSubscriber};
use tracing_subscriber::{FmtSubscriber, filter::EnvFilter};

LOG_SETUP.call_once(|| {
LogTracer::init().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions docker-compose-redpanda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: '3.7'
services:
redpanda-0:
image: vectorized/redpanda:v22.2.1
image: redpandadata/redpanda:v22.2.1
container_name: redpanda-0
ports:
- '9010:9010'
Expand All @@ -21,7 +21,7 @@ services:
- --advertise-rpc-addr redpanda-0:33145
- --set redpanda.auto_create_topics_enabled=false
redpanda-1:
image: vectorized/redpanda:v22.2.1
image: redpandadata/redpanda:v22.2.1
container_name: redpanda-1
ports:
- '9011:9011'
Expand All @@ -41,7 +41,7 @@ services:
- --advertise-rpc-addr redpanda-1:33146
- --set redpanda.auto_create_topics_enabled=false
redpanda-2:
image: vectorized/redpanda:v22.2.1
image: redpandadata/redpanda:v22.2.1
container_name: redpanda-2
ports:
- '9012:9012'
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.80"
channel = "1.85"
components = [ "rustfmt", "clippy" ]
2 changes: 1 addition & 1 deletion src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Backoff {
break Err(BackoffError::DeadlineExceded {
deadline: Duration::from_secs_f64(self.deadline.unwrap()),
source: Box::new(e),
})
});
}
},
ErrorOrThrottle::Throttle(throttle) => {
Expand Down
6 changes: 3 additions & 3 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use futures::future::{BoxFuture, Fuse, FusedFuture, FutureExt};
use futures::Stream;
use futures::future::{BoxFuture, Fuse, FusedFuture, FutureExt};
use tracing::{debug, trace, warn};

use crate::{
Expand Down Expand Up @@ -365,8 +365,8 @@ mod tests {

use assert_matches::assert_matches;
use chrono::{TimeZone, Utc};
use futures::{pin_mut, StreamExt};
use tokio::sync::{mpsc, Mutex};
use futures::{StreamExt, pin_mut};
use tokio::sync::{Mutex, mpsc};

use crate::{
client::error::{Error, ProtocolError, RequestContext},
Expand Down
22 changes: 11 additions & 11 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ impl ControllerClient {
};

maybe_retry(&self.backoff_config, self, "create_topic", || async move {
let (broker, gen) = self
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;

maybe_throttle(response.throttle_time_ms)?;

let topic = response
.topics
.exactly_one()
.map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
.map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(r#gen))))?;

match topic.error {
None => Ok(()),
Expand All @@ -90,7 +90,7 @@ impl ControllerClient {
response: None,
is_virtual: false,
},
Some(gen),
Some(r#gen),
))),
}
})
Expand All @@ -115,21 +115,21 @@ impl ControllerClient {
};

maybe_retry(&self.backoff_config, self, "delete_topic", || async move {
let (broker, gen) = self
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;

maybe_throttle(response.throttle_time_ms)?;

let topic = response
.responses
.exactly_one()
.map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
.map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(r#gen))))?;

match topic.error {
None => Ok(()),
Expand All @@ -141,7 +141,7 @@ impl ControllerClient {
response: None,
is_virtual: false,
},
Some(gen),
Some(r#gen),
))),
}
})
Expand Down Expand Up @@ -197,15 +197,15 @@ impl BrokerCache for &ControllerClient {
Ok((broker, current_broker.1))
}

async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration) {
async fn invalidate(&self, reason: &'static str, r#gen: BrokerCacheGeneration) {
let mut guard = self.current_broker.lock().await;

if guard.1 != gen {
if guard.1 != r#gen {
// stale request
debug!(
reason,
current_gen = guard.1.get(),
request_gen = gen.get(),
request_gen = r#gen.get(),
"stale invalidation request for arbitrary broker cache",
);
return;
Expand Down
18 changes: 9 additions & 9 deletions src/client/metadata_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ impl MetadataCache {
&self,
topics: &Option<Vec<String>>,
) -> Option<(MetadataResponse, MetadataCacheGeneration)> {
let (mut m, gen) = match self.cache.lock().deref() {
(Some(m), gen) => (m.clone(), *gen),
let (mut m, r#gen) = match self.cache.lock().deref() {
(Some(m), r#gen) => (m.clone(), *r#gen),
(None, _) => {
return None;
}
Expand All @@ -59,24 +59,24 @@ impl MetadataCache {
// if a caller keeps requesting metadata for a non-existent
// topic.
debug!("cached metadata query for unknown topic");
self.invalidate("get from metadata cache: unknown topic", gen);
self.invalidate("get from metadata cache: unknown topic", r#gen);
return None;
}
}

debug!(?m, "using cached metadata response");

Some((m, gen))
Some((m, r#gen))
}

pub(crate) fn invalidate(&self, reason: &'static str, gen: MetadataCacheGeneration) {
pub(crate) fn invalidate(&self, reason: &'static str, r#gen: MetadataCacheGeneration) {
let mut guard = self.cache.lock();
if guard.1 != gen {
if guard.1 != r#gen {
// stale request
debug!(
reason,
current_gen = guard.1 .0,
request_gen = gen.0,
current_gen = guard.1.0,
request_gen = r#gen.0,
"stale invalidation request for metadata cache",
);
return;
Expand All @@ -89,7 +89,7 @@ impl MetadataCache {
pub(crate) fn update(&self, m: MetadataResponse) {
let mut guard = self.cache.lock();
guard.0 = Some(m);
guard.1 .0 += 1;
guard.1.0 += 1;
debug!("updated metadata cache");
}
}
Expand Down
Loading