Skip to content

Commit 4151626

Browse files
committed
feat: support gzip compression
1 parent a01889a commit 4151626

File tree

9 files changed

+465
-220
lines changed

9 files changed

+465
-220
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ async-socks5 = { version = "0.5", optional = true }
1111
async-trait = "0.1"
1212
bytes = "1.1"
1313
crc = "2"
14+
flate2 = { version = "1", optional = true }
1415
futures = "0.3"
1516
integer-encoding = "3"
1617
parking_lot = "0.11"
@@ -38,9 +39,11 @@ uuid = { version = "0.8", features = ["v4"] }
3839

3940
[features]
4041
default = ["transport-tls"]
42+
43+
compression-gzip = ["flate2"]
4144
fuzzing = []
42-
transport-tls = ["rustls", "tokio-rustls"]
4345
transport-socks5 = ["async-socks5"]
46+
transport-tls = ["rustls", "tokio-rustls"]
4447

4548
[workspace]
4649
members = [

README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ It will be a good fit for workloads that:
2525
```rust,no_run
2626
# async fn test() {
2727
use rskafka::{
28-
client::ClientBuilder,
28+
client::{
29+
ClientBuilder,
30+
partition::Compression,
31+
},
2932
record::Record,
3033
};
3134
use time::OffsetDateTime;
@@ -63,7 +66,7 @@ let record = Record {
6366
]),
6467
timestamp: OffsetDateTime::now_utc(),
6568
};
66-
partition_client.produce(vec![record]).await.unwrap();
69+
partition_client.produce(vec![record], Compression::default()).await.unwrap();
6770
6871
// consume data
6972
let (records, high_watermark) = partition_client
@@ -82,10 +85,11 @@ For more advanced production and consumption, see [`crate::client::producer`] an
8285

8386
## Features
8487

88+
- **`compression-gzip`:** Support compression and decompression of messages using [gzip].
8589
- **`fuzzing`:** Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable
8690
feature / API!
87-
- **`transport-tls` (default):** Allows TLS transport via [rustls].
8891
- **`transport-socks5`:** Allow transport via SOCKS5 proxy.
92+
- **`transport-tls` (default):** Allows TLS transport via [rustls].
8993

9094
## Testing
9195

@@ -215,6 +219,7 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
215219

216220
[Apache Kafka]: https://kafka.apache.org/
217221
[cargo-fuzz]: https://github.com/rust-fuzz/cargo-fuzz
222+
[gzip]: https://en.wikipedia.org/wiki/Gzip
218223
[IOx]: https://github.com/influxdata/influxdb_iox/
219224
[LLDB]: https://lldb.llvm.org/
220225
[Redpanda]: https://vectorized.io/redpanda

src/client/partition.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ use time::OffsetDateTime;
2323
use tokio::sync::Mutex;
2424
use tracing::{error, info};
2525

26+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27+
pub enum Compression {
28+
NoCompression,
29+
#[cfg(feature = "compression-gzip")]
30+
Gzip,
31+
}
32+
33+
impl Default for Compression {
34+
fn default() -> Self {
35+
Self::NoCompression
36+
}
37+
}
38+
2639
/// Many operations must be performed on the leader for a partition
2740
///
2841
/// Additionally a partition is the unit of concurrency within Kafka
@@ -62,7 +75,11 @@ impl PartitionClient {
6275
}
6376

6477
/// Produce a batch of records to the partition
65-
pub async fn produce(&self, records: Vec<Record>) -> Result<Vec<i64>> {
78+
pub async fn produce(
79+
&self,
80+
records: Vec<Record>,
81+
compression: Compression,
82+
) -> Result<Vec<i64>> {
6683
// skip request entirely if `records` is empty
6784
if records.is_empty() {
6885
return Ok(vec![]);
@@ -105,7 +122,11 @@ impl PartitionClient {
105122
last_offset_delta: n as i32 - 1,
106123
is_transactional: false,
107124
base_sequence: -1,
108-
compression: RecordBatchCompression::NoCompression,
125+
compression: match compression {
126+
Compression::NoCompression => RecordBatchCompression::NoCompression,
127+
#[cfg(feature = "compression-gzip")]
128+
Compression::Gzip => RecordBatchCompression::Gzip,
129+
},
109130
timestamp_type: RecordBatchTimestampType::CreateTime,
110131
producer_id: -1,
111132
producer_epoch: -1,

src/client/producer.rs

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ use crate::client::producer::aggregator::TryPush;
207207
use crate::client::{error::Error as ClientError, partition::PartitionClient};
208208
use crate::record::Record;
209209

210+
use super::partition::Compression;
211+
210212
pub mod aggregator;
211213
mod broadcast;
212214

@@ -230,6 +232,8 @@ pub struct BatchProducerBuilder {
230232
client: Arc<dyn ProducerClient>,
231233

232234
linger: Duration,
235+
236+
compression: Compression,
233237
}
234238

235239
impl BatchProducerBuilder {
@@ -243,6 +247,7 @@ impl BatchProducerBuilder {
243247
Self {
244248
client,
245249
linger: Duration::from_millis(5),
250+
compression: Compression::default(),
246251
}
247252
}
248253

@@ -251,12 +256,21 @@ impl BatchProducerBuilder {
251256
Self { linger, ..self }
252257
}
253258

259+
/// Sets compression.
260+
pub fn with_compression(self, compression: Compression) -> Self {
261+
Self {
262+
compression,
263+
..self
264+
}
265+
}
266+
254267
pub fn build<A>(self, aggregator: A) -> BatchProducer<A>
255268
where
256269
A: aggregator::Aggregator,
257270
{
258271
BatchProducer {
259272
linger: self.linger,
273+
compression: self.compression,
260274
client: self.client,
261275
inner: Mutex::new(ProducerInner {
262276
aggregator,
@@ -268,12 +282,20 @@ impl BatchProducerBuilder {
268282

269283
// A trait wrapper to allow mocking
270284
trait ProducerClient: std::fmt::Debug + Send + Sync {
271-
fn produce(&self, records: Vec<Record>) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
285+
fn produce(
286+
&self,
287+
records: Vec<Record>,
288+
compression: Compression,
289+
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>>;
272290
}
273291

274292
impl ProducerClient for PartitionClient {
275-
fn produce(&self, records: Vec<Record>) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
276-
Box::pin(self.produce(records))
293+
fn produce(
294+
&self,
295+
records: Vec<Record>,
296+
compression: Compression,
297+
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
298+
Box::pin(self.produce(records, compression))
277299
}
278300
}
279301

@@ -293,6 +315,8 @@ where
293315
{
294316
linger: Duration,
295317

318+
compression: Compression,
319+
296320
client: Arc<dyn ProducerClient>,
297321

298322
inner: Mutex<ProducerInner<A>>,
@@ -370,7 +394,7 @@ where
370394
TryPush::NoCapacity(data) => {
371395
debug!("Insufficient capacity in aggregator - flushing");
372396

373-
Self::flush_impl(&mut inner, self.client.as_ref()).await;
397+
Self::flush_impl(&mut inner, self.client.as_ref(), self.compression).await;
374398
match inner
375399
.aggregator
376400
.try_push(data)
@@ -413,7 +437,7 @@ where
413437
debug!("Linger expired - flushing");
414438

415439
// Flush data
416-
Self::flush_impl(&mut inner, self.client.as_ref()).await;
440+
Self::flush_impl(&mut inner, self.client.as_ref(), self.compression).await;
417441

418442
extract(&result_slot.now_or_never().expect("just flushed"), tag)
419443
}
@@ -422,12 +446,16 @@ where
422446
pub async fn flush(&self) {
423447
let mut inner = self.inner.lock().await;
424448
debug!("Manual flush");
425-
Self::flush_impl(&mut inner, self.client.as_ref()).await;
449+
Self::flush_impl(&mut inner, self.client.as_ref(), self.compression).await;
426450
}
427451

428452
/// Flushes out the data from the aggregator, publishes the result to the result slot,
429453
/// and creates a fresh result slot for future writes to use
430-
async fn flush_impl(inner: &mut ProducerInner<A>, client: &dyn ProducerClient) {
454+
async fn flush_impl(
455+
inner: &mut ProducerInner<A>,
456+
client: &dyn ProducerClient,
457+
compression: Compression,
458+
) {
431459
trace!("Flushing batch producer");
432460

433461
let (output, status_deagg) = match inner.aggregator.flush() {
@@ -444,7 +472,7 @@ where
444472
return;
445473
}
446474

447-
let r = client.produce(output).await;
475+
let r = client.produce(output, compression).await;
448476

449477
// Reset result slot
450478
let slot = std::mem::take(&mut inner.result_slot);
@@ -483,7 +511,11 @@ mod tests {
483511
}
484512

485513
impl ProducerClient for MockClient {
486-
fn produce(&self, records: Vec<Record>) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
514+
fn produce(
515+
&self,
516+
records: Vec<Record>,
517+
_compression: Compression,
518+
) -> BoxFuture<'_, Result<Vec<i64>, ClientError>> {
487519
Box::pin(async move {
488520
tokio::time::sleep(self.delay).await;
489521

0 commit comments

Comments
 (0)