Skip to content

Commit 2cbcc3a

Browse files
Merge pull request #85 from influxdata/crepererum/bench
feat: benchmark + fix CRC performance
2 parents 17e851a + 37dd884 commit 2cbcc3a

File tree

6 files changed

+349
-25
lines changed

6 files changed

+349
-25
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ jobs:
190190
- cache_restore
191191
- run:
192192
name: Cargo test
193-
command: cargo test --all-features --workspace
193+
command: cargo test --all-features --all-targets
194194
- cache_save
195195
- store_artifacts:
196196
path: proptest-regressions
@@ -255,7 +255,7 @@ jobs:
255255
- cache_restore
256256
- run:
257257
name: Cargo test
258-
command: cargo test --all-features --workspace
258+
command: cargo test --all-features --all-targets
259259
- cache_save
260260
- store_artifacts:
261261
path: proptest-regressions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
22
Cargo.lock
3+
perf.data*

Cargo.toml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ documentation = "https://docs.rs/rskafka/"
2121
async-socks5 = { version = "0.5", optional = true }
2222
async-trait = "0.1"
2323
bytes = "1.1"
24-
crc = "2"
24+
crc32c = "0.6"
2525
futures = "0.3"
2626
integer-encoding = "3"
2727
parking_lot = "0.11"
@@ -36,13 +36,14 @@ tracing = "0.1"
3636

3737
[dev-dependencies]
3838
assert_matches = "1.5"
39+
criterion = { version = "0.3", features = ["async_tokio"] }
3940
dotenv = "0.15"
4041
futures = "0.3"
4142
proptest = "1"
4243
proptest-derive = "0.3"
4344
rustls-pemfile = "0.2"
4445
rdkafka = "0.28"
45-
tokio = { version = "1.14", features = ["macros"] }
46+
tokio = { version = "1.14", features = ["macros", "rt-multi-thread"] }
4647
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
4748
uuid = { version = "0.8", features = ["v4"] }
4849

@@ -52,11 +53,18 @@ fuzzing = []
5253
transport-tls = ["rustls", "tokio-rustls"]
5354
transport-socks5 = ["async-socks5"]
5455

56+
[[bench]]
57+
name = "write_throughput"
58+
harness = false
59+
5560
[workspace]
5661
members = [
5762
".",
5863
"fuzz",
5964
]
6065

66+
[profile.bench]
67+
debug = true
68+
6169
[package.metadata.docs.rs]
6270
all-features = true

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,29 @@ execution that hooks right into the place where it is about to exit:
200200
(lldb) b fuzzer::PrintStackTrace()
201201
```
202202

203+
### Benchmarks
204+
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:
205+
206+
```console
207+
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features
208+
```
209+
210+
If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
211+
for the `parallel/rskafka` benchmark):
212+
213+
```console
214+
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
215+
bench --all-features --bench write_throughput -- \
216+
--bench --noplot parallel/rskafka
217+
```
218+
219+
Have a look at the report:
220+
221+
```console
222+
$ perf report
223+
```
224+
225+
203226
## License
204227

205228
Licensed under either of these:
@@ -219,8 +242,11 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
219242

220243

221244
[Apache Kafka]: https://kafka.apache.org/
245+
[cargo-criterion]: https://github.com/bheisler/cargo-criterion
222246
[cargo-fuzz]: https://github.com/rust-fuzz/cargo-fuzz
247+
[cargo-with]: https://github.com/cbourjau/cargo-with
223248
[IOx]: https://github.com/influxdata/influxdb_iox/
224249
[LLDB]: https://lldb.llvm.org/
250+
[perf]: https://perf.wiki.kernel.org/index.php/Main_Page
225251
[Redpanda]: https://vectorized.io/redpanda
226252
[rustls]: https://github.com/rustls/rustls

benches/write_throughput.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
use std::{
2+
collections::BTreeMap,
3+
sync::Arc,
4+
time::{Duration, Instant},
5+
};
6+
7+
use criterion::{
8+
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
9+
};
10+
use futures::{stream::FuturesUnordered, StreamExt};
11+
use rdkafka::producer::FutureProducer;
12+
use rskafka::{
13+
client::{
14+
partition::PartitionClient,
15+
producer::{aggregator::RecordAggregator, BatchProducerBuilder},
16+
ClientBuilder,
17+
},
18+
record::Record,
19+
};
20+
use time::OffsetDateTime;
21+
use tokio::runtime::Runtime;
22+
23+
const PARALLEL_BATCH_SIZE: usize = 1_000_000;
24+
const PARALLEL_LINGER_MS: u64 = 10;
25+
26+
pub fn criterion_benchmark(c: &mut Criterion) {
27+
let connection = maybe_skip_kafka_integration!();
28+
29+
let key = vec![b'k'; 10];
30+
let value = vec![b'x'; 10_000];
31+
32+
{
33+
let mut group_sequential = benchark_group(c, "sequential");
34+
35+
group_sequential.bench_function("rdkafka", |b| {
36+
let connection = connection.clone();
37+
let key = key.clone();
38+
let value = value.clone();
39+
40+
b.to_async(runtime()).iter_custom(|iters| {
41+
let connection = connection.clone();
42+
let key = key.clone();
43+
let value = value.clone();
44+
45+
async move {
46+
use rdkafka::{producer::FutureRecord, util::Timeout};
47+
48+
let (client, topic) = setup_rdkafka(connection, false).await;
49+
50+
let start = Instant::now();
51+
52+
for _ in 0..iters {
53+
let f_record = FutureRecord::to(&topic).key(&key).payload(&value);
54+
client.send(f_record, Timeout::Never).await.unwrap();
55+
}
56+
57+
start.elapsed()
58+
}
59+
});
60+
});
61+
62+
group_sequential.bench_function("rskafka", |b| {
63+
let connection = connection.clone();
64+
let key = key.clone();
65+
let value = value.clone();
66+
67+
b.to_async(runtime()).iter_custom(|iters| {
68+
let connection = connection.clone();
69+
let key = key.clone();
70+
let value = value.clone();
71+
72+
async move {
73+
let client = setup_rskafka(connection).await;
74+
let record = Record {
75+
key,
76+
value,
77+
headers: BTreeMap::default(),
78+
timestamp: OffsetDateTime::now_utc(),
79+
};
80+
81+
let start = Instant::now();
82+
83+
for _ in 0..iters {
84+
client.produce(vec![record.clone()]).await.unwrap();
85+
}
86+
87+
start.elapsed()
88+
}
89+
});
90+
});
91+
}
92+
93+
{
94+
let mut group_parallel = benchark_group(c, "parallel");
95+
96+
group_parallel.bench_function("rdkafka", |b| {
97+
let connection = connection.clone();
98+
let key = key.clone();
99+
let value = value.clone();
100+
101+
b.to_async(runtime()).iter_custom(|iters| {
102+
let connection = connection.clone();
103+
let key = key.clone();
104+
let value = value.clone();
105+
106+
async move {
107+
use rdkafka::{producer::FutureRecord, util::Timeout};
108+
109+
let (client, topic) = setup_rdkafka(connection, true).await;
110+
111+
let start = Instant::now();
112+
113+
let mut tasks: FuturesUnordered<_> = (0..iters)
114+
.map(|_| async {
115+
let f_record = FutureRecord::to(&topic).key(&key).payload(&value);
116+
client.send(f_record, Timeout::Never).await.unwrap();
117+
})
118+
.collect();
119+
while tasks.next().await.is_some() {}
120+
121+
start.elapsed()
122+
}
123+
});
124+
});
125+
126+
group_parallel.bench_function("rskafka", |b| {
127+
let connection = connection.clone();
128+
let key = key.clone();
129+
let value = value.clone();
130+
131+
b.to_async(runtime()).iter_custom(|iters| {
132+
let connection = connection.clone();
133+
let key = key.clone();
134+
let value = value.clone();
135+
136+
async move {
137+
let client = setup_rskafka(connection).await;
138+
let record = Record {
139+
key,
140+
value,
141+
headers: BTreeMap::default(),
142+
timestamp: OffsetDateTime::now_utc(),
143+
};
144+
let producer = BatchProducerBuilder::new(Arc::new(client))
145+
.with_linger(Duration::from_millis(PARALLEL_LINGER_MS))
146+
.build(RecordAggregator::new(PARALLEL_BATCH_SIZE));
147+
148+
let start = Instant::now();
149+
150+
let mut tasks: FuturesUnordered<_> = (0..iters)
151+
.map(|_| async {
152+
producer.produce(record.clone()).await.unwrap();
153+
})
154+
.collect();
155+
while tasks.next().await.is_some() {}
156+
157+
start.elapsed()
158+
}
159+
});
160+
});
161+
}
162+
}
163+
164+
/// Get the testing Kafka connection string or return current scope.
165+
///
166+
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
167+
/// caller.
168+
///
169+
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
170+
/// guidance for setting `KAFKA_CONNECTION`.
171+
///
172+
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
173+
#[macro_export]
174+
macro_rules! maybe_skip_kafka_integration {
175+
() => {{
176+
use std::env;
177+
dotenv::dotenv().ok();
178+
179+
match (
180+
env::var("TEST_INTEGRATION").is_ok(),
181+
env::var("KAFKA_CONNECT").ok(),
182+
) {
183+
(true, Some(kafka_connection)) => kafka_connection,
184+
(true, None) => {
185+
panic!(
186+
"TEST_INTEGRATION is set which requires running integration tests, but \
187+
KAFKA_CONNECT is not set. Please run Kafka or Redpanda then \
188+
set KAFKA_CONNECT as directed in README.md."
189+
)
190+
}
191+
(false, Some(_)) => {
192+
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
193+
return;
194+
}
195+
(false, None) => {
196+
eprintln!(
197+
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
198+
run"
199+
);
200+
return;
201+
}
202+
}
203+
}};
204+
}
205+
206+
fn benchark_group<'a>(c: &'a mut Criterion, name: &str) -> BenchmarkGroup<'a, WallTime> {
207+
let mut group = c.benchmark_group(name);
208+
group.measurement_time(Duration::from_secs(60));
209+
group.sample_size(15);
210+
group.sampling_mode(SamplingMode::Linear);
211+
group
212+
}
213+
214+
fn runtime() -> Runtime {
215+
tokio::runtime::Builder::new_multi_thread()
216+
.enable_io()
217+
.enable_time()
218+
.build()
219+
.unwrap()
220+
}
221+
222+
/// Generated random topic name for testing.
223+
fn random_topic_name() -> String {
224+
format!("test_topic_{}", uuid::Uuid::new_v4())
225+
}
226+
227+
async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer, String) {
228+
use rdkafka::{
229+
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
230+
producer::FutureRecord,
231+
util::Timeout,
232+
ClientConfig,
233+
};
234+
235+
let topic_name = random_topic_name();
236+
237+
// configure clients
238+
let mut cfg = ClientConfig::new();
239+
cfg.set("bootstrap.servers", connection);
240+
cfg.set("message.timeout.ms", "5000");
241+
if buffering {
242+
cfg.set("batch.num.messages", PARALLEL_BATCH_SIZE.to_string()); // = loads
243+
cfg.set("batch.size", 1_000_000.to_string());
244+
cfg.set("queue.buffering.max.ms", PARALLEL_LINGER_MS.to_string());
245+
} else {
246+
cfg.set("batch.num.messages", "1");
247+
cfg.set("queue.buffering.max.ms", "0");
248+
}
249+
250+
// create topic
251+
let admin_client: AdminClient<_> = cfg.create().unwrap();
252+
let topic = NewTopic::new(&topic_name, 1, TopicReplication::Fixed(1));
253+
let opts = AdminOptions::default();
254+
let mut results = admin_client.create_topics([&topic], &opts).await.unwrap();
255+
assert_eq!(results.len(), 1, "created exactly one topic");
256+
let result = results.pop().expect("just checked the vector length");
257+
result.unwrap();
258+
259+
let producer_client: FutureProducer = cfg.create().unwrap();
260+
261+
// warm up connection
262+
let key = vec![b'k'; 1];
263+
let payload = vec![b'x'; 10];
264+
let f_record = FutureRecord::to(&topic_name).key(&key).payload(&payload);
265+
producer_client
266+
.send(f_record, Timeout::Never)
267+
.await
268+
.unwrap();
269+
270+
(producer_client, topic_name)
271+
}
272+
273+
async fn setup_rskafka(connection: String) -> PartitionClient {
274+
let topic_name = random_topic_name();
275+
276+
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
277+
client
278+
.controller_client()
279+
.await
280+
.unwrap()
281+
.create_topic(topic_name.clone(), 1, 1, 5_000)
282+
.await
283+
.unwrap();
284+
285+
client.partition_client(topic_name, 0).await.unwrap()
286+
}
287+
288+
criterion_group!(benches, criterion_benchmark);
289+
criterion_main!(benches);

0 commit comments

Comments
 (0)