Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ jobs:
- cache_restore
- run:
name: Cargo test
command: cargo test --all-features --workspace
command: cargo test --all-features --all-targets
- cache_save
- store_artifacts:
path: proptest-regressions
Expand Down Expand Up @@ -255,7 +255,7 @@ jobs:
- cache_restore
- run:
name: Cargo test
command: cargo test --all-features --workspace
command: cargo test --all-features --all-targets
- cache_save
- store_artifacts:
path: proptest-regressions
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
Cargo.lock
perf.data*
12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ documentation = "https://docs.rs/rskafka/"
async-socks5 = { version = "0.5", optional = true }
async-trait = "0.1"
bytes = "1.1"
crc = "2"
crc32c = "0.6"
futures = "0.3"
integer-encoding = "3"
parking_lot = "0.11"
Expand All @@ -36,13 +36,14 @@ tracing = "0.1"

[dev-dependencies]
assert_matches = "1.5"
criterion = { version = "0.3", features = ["async_tokio"] }
dotenv = "0.15"
futures = "0.3"
proptest = "1"
proptest-derive = "0.3"
rustls-pemfile = "0.2"
rdkafka = "0.28"
tokio = { version = "1.14", features = ["macros"] }
tokio = { version = "1.14", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "0.8", features = ["v4"] }

Expand All @@ -52,11 +53,18 @@ fuzzing = []
transport-tls = ["rustls", "tokio-rustls"]
transport-socks5 = ["async-socks5"]

[[bench]]
name = "write_throughput"
harness = false

[workspace]
members = [
".",
"fuzz",
]

[profile.bench]
debug = true

[package.metadata.docs.rs]
all-features = true
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ execution that hooks right into the place where it is about to exit:
(lldb) b fuzzer::PrintStackTrace()
```

### Benchmarks
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```

Have a look at the report:

```console
$ perf report
```


## License

Licensed under either of these:
Expand All @@ -219,8 +242,11 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest


[Apache Kafka]: https://kafka.apache.org/
[cargo-criterion]: https://github.com/bheisler/cargo-criterion
[cargo-fuzz]: https://github.com/rust-fuzz/cargo-fuzz
[cargo-with]: https://github.com/cbourjau/cargo-with
[IOx]: https://github.com/influxdata/influxdb_iox/
[LLDB]: https://lldb.llvm.org/
[perf]: https://perf.wiki.kernel.org/index.php/Main_Page
[Redpanda]: https://vectorized.io/redpanda
[rustls]: https://github.com/rustls/rustls
289 changes: 289 additions & 0 deletions benches/write_throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
use std::{
collections::BTreeMap,
sync::Arc,
time::{Duration, Instant},
};

use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
};
use futures::{stream::FuturesUnordered, StreamExt};
use rdkafka::producer::FutureProducer;
use rskafka::{
client::{
partition::PartitionClient,
producer::{aggregator::RecordAggregator, BatchProducerBuilder},
ClientBuilder,
},
record::Record,
};
use time::OffsetDateTime;
use tokio::runtime::Runtime;

const PARALLEL_BATCH_SIZE: usize = 1_000_000;
const PARALLEL_LINGER_MS: u64 = 10;

pub fn criterion_benchmark(c: &mut Criterion) {
let connection = maybe_skip_kafka_integration!();

let key = vec![b'k'; 10];
let value = vec![b'x'; 10_000];

{
let mut group_sequential = benchark_group(c, "sequential");

group_sequential.bench_function("rdkafka", |b| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

b.to_async(runtime()).iter_custom(|iters| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

async move {
use rdkafka::{producer::FutureRecord, util::Timeout};

let (client, topic) = setup_rdkafka(connection, false).await;

let start = Instant::now();

for _ in 0..iters {
let f_record = FutureRecord::to(&topic).key(&key).payload(&value);
client.send(f_record, Timeout::Never).await.unwrap();
}

start.elapsed()
}
});
});

group_sequential.bench_function("rskafka", |b| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

b.to_async(runtime()).iter_custom(|iters| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

async move {
let client = setup_rskafka(connection).await;
let record = Record {
key,
value,
headers: BTreeMap::default(),
timestamp: OffsetDateTime::now_utc(),
};

let start = Instant::now();

for _ in 0..iters {
client.produce(vec![record.clone()]).await.unwrap();
}

start.elapsed()
}
});
});
}

{
let mut group_parallel = benchark_group(c, "parallel");

group_parallel.bench_function("rdkafka", |b| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

b.to_async(runtime()).iter_custom(|iters| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

async move {
use rdkafka::{producer::FutureRecord, util::Timeout};

let (client, topic) = setup_rdkafka(connection, true).await;

let start = Instant::now();

let mut tasks: FuturesUnordered<_> = (0..iters)
.map(|_| async {
let f_record = FutureRecord::to(&topic).key(&key).payload(&value);
client.send(f_record, Timeout::Never).await.unwrap();
})
.collect();
while tasks.next().await.is_some() {}

start.elapsed()
}
});
});

group_parallel.bench_function("rskafka", |b| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

b.to_async(runtime()).iter_custom(|iters| {
let connection = connection.clone();
let key = key.clone();
let value = value.clone();

async move {
let client = setup_rskafka(connection).await;
let record = Record {
key,
value,
headers: BTreeMap::default(),
timestamp: OffsetDateTime::now_utc(),
};
let producer = BatchProducerBuilder::new(Arc::new(client))
.with_linger(Duration::from_millis(PARALLEL_LINGER_MS))
.build(RecordAggregator::new(PARALLEL_BATCH_SIZE));

let start = Instant::now();

let mut tasks: FuturesUnordered<_> = (0..iters)
.map(|_| async {
producer.produce(record.clone()).await.unwrap();
})
.collect();
while tasks.next().await.is_some() {}

start.elapsed()
}
});
});
}
}

/// Get the testing Kafka connection string or return current scope.
///
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
#[macro_export]
macro_rules! maybe_skip_kafka_integration {
() => {{
use std::env;
dotenv::dotenv().ok();

match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka or Redpanda then \
set KAFKA_CONNECT as directed in README.md."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}

fn benchark_group<'a>(c: &'a mut Criterion, name: &str) -> BenchmarkGroup<'a, WallTime> {
let mut group = c.benchmark_group(name);
group.measurement_time(Duration::from_secs(60));
group.sample_size(15);
group.sampling_mode(SamplingMode::Linear);
group
}

fn runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()
.unwrap()
}

/// Generated random topic name for testing.
fn random_topic_name() -> String {
format!("test_topic_{}", uuid::Uuid::new_v4())
}

async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer, String) {
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
producer::FutureRecord,
util::Timeout,
ClientConfig,
};

let topic_name = random_topic_name();

// configure clients
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", connection);
cfg.set("message.timeout.ms", "5000");
if buffering {
cfg.set("batch.num.messages", PARALLEL_BATCH_SIZE.to_string()); // = loads
cfg.set("batch.size", 1_000_000.to_string());
cfg.set("queue.buffering.max.ms", PARALLEL_LINGER_MS.to_string());
} else {
cfg.set("batch.num.messages", "1");
cfg.set("queue.buffering.max.ms", "0");
}

// create topic
let admin_client: AdminClient<_> = cfg.create().unwrap();
let topic = NewTopic::new(&topic_name, 1, TopicReplication::Fixed(1));
let opts = AdminOptions::default();
let mut results = admin_client.create_topics([&topic], &opts).await.unwrap();
assert_eq!(results.len(), 1, "created exactly one topic");
let result = results.pop().expect("just checked the vector length");
result.unwrap();

let producer_client: FutureProducer = cfg.create().unwrap();

// warm up connection
let key = vec![b'k'; 1];
let payload = vec![b'x'; 10];
let f_record = FutureRecord::to(&topic_name).key(&key).payload(&payload);
producer_client
.send(f_record, Timeout::Never)
.await
.unwrap();

(producer_client, topic_name)
}

async fn setup_rskafka(connection: String) -> PartitionClient {
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
client
.controller_client()
.await
.unwrap()
.create_topic(topic_name.clone(), 1, 1, 5_000)
.await
.unwrap();

client.partition_client(topic_name, 0).await.unwrap()
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading