Skip to content

Commit 9fb7581

Browse files
committed
feat: support LZ4 (de)compression
1 parent 9d1ddd9 commit 9fb7581

File tree

6 files changed

+123
-4
lines changed

6 files changed

+123
-4
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ crc32c = "0.6"
2525
flate2 = { version = "1", optional = true }
2626
futures = "0.3"
2727
integer-encoding = "3"
28+
lz4 = { version = "1.23", optional = true }
2829
parking_lot = "0.12"
2930
pin-project-lite = "0.2"
3031
rand = "0.8"
@@ -53,6 +54,7 @@ uuid = { version = "0.8", features = ["v4"] }
5354
default = ["transport-tls"]
5455

5556
compression-gzip = ["flate2"]
57+
compression-lz4 = ["lz4"]
5658
fuzzing = []
5759
transport-socks5 = ["async-socks5"]
5860
transport-tls = ["rustls", "tokio-rustls"]

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ For more advanced production and consumption, see [`crate::client::producer`] an
9191
## Features
9292

9393
- **`compression-gzip`:** Support compression and decompression of messages using [gzip].
94+
- **`compression-lz4`:** Support compression and decompression of messages using [LZ4].
9495
- **`fuzzing`:** Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable
9596
feature / API!
9697
- **`transport-socks5`:** Allow transport via SOCKS5 proxy.
@@ -252,6 +253,7 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
252253
[gzip]: https://en.wikipedia.org/wiki/Gzip
253254
[IOx]: https://github.com/influxdata/influxdb_iox/
254255
[LLDB]: https://lldb.llvm.org/
256+
[LZ4]: https://lz4.github.io/lz4/
255257
[perf]: https://perf.wiki.kernel.org/index.php/Main_Page
256258
[Redpanda]: https://vectorized.io/redpanda
257259
[rustls]: https://github.com/rustls/rustls

src/client/partition.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub enum Compression {
3030
NoCompression,
3131
#[cfg(feature = "compression-gzip")]
3232
Gzip,
33+
#[cfg(feature = "compression-lz4")]
34+
Lz4,
3335
}
3436

3537
impl Default for Compression {
@@ -367,6 +369,8 @@ fn build_produce_request(
367369
Compression::NoCompression => RecordBatchCompression::NoCompression,
368370
#[cfg(feature = "compression-gzip")]
369371
Compression::Gzip => RecordBatchCompression::Gzip,
372+
#[cfg(feature = "compression-lz4")]
373+
Compression::Lz4 => RecordBatchCompression::Lz4,
370374
},
371375
timestamp_type: RecordBatchTimestampType::CreateTime,
372376
producer_id: -1,

src/protocol/record.rs

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,27 @@ where
573573
use flate2::read::GzDecoder;
574574
Self::read_records(&mut GzDecoder::new(reader), is_control, n_records)?
575575
}
576+
#[cfg(feature = "compression-lz4")]
577+
RecordBatchCompression::Lz4 => {
578+
use lz4::Decoder;
579+
let mut decoder = Decoder::new(reader)?;
580+
let records = Self::read_records(&mut decoder, is_control, n_records)?;
581+
582+
// the lz4 decoder requires us to consume the whole inner stream until we reach EOF
583+
let mut buf = [0u8; 1];
584+
match decoder.read(&mut buf) {
585+
Ok(_) => {}
586+
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {}
587+
Err(e) => {
588+
return Err(ReadError::IO(e));
589+
}
590+
}
591+
592+
let (_reader, res) = decoder.finish();
593+
res?;
594+
595+
records
596+
}
576597
#[allow(unreachable_patterns)]
577598
_ => {
578599
return Err(ReadError::Malformed(
@@ -709,10 +730,22 @@ where
709730
#[cfg(feature = "compression-gzip")]
710731
RecordBatchCompression::Gzip => {
711732
use flate2::{write::GzEncoder, Compression};
712-
Self::write_records(
713-
&mut GzEncoder::new(writer, Compression::default()),
714-
self.records,
715-
)?;
733+
let mut encoder = GzEncoder::new(writer, Compression::default());
734+
Self::write_records(&mut encoder, self.records)?;
735+
encoder.finish()?;
736+
}
737+
#[cfg(feature = "compression-lz4")]
738+
RecordBatchCompression::Lz4 => {
739+
use lz4::{liblz4::BlockMode, EncoderBuilder};
740+
let mut encoder = EncoderBuilder::new()
741+
.block_mode(
742+
// the only one supported by Kafka
743+
BlockMode::Independent,
744+
)
745+
.build(writer)?;
746+
Self::write_records(&mut encoder, self.records)?;
747+
let (_writer, res) = encoder.finish();
748+
res?;
716749
}
717750
#[allow(unreachable_patterns)]
718751
_ => {
@@ -869,4 +902,54 @@ mod tests {
869902
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
870903
assert_eq!(actual2, expected);
871904
}
905+
906+
#[cfg(feature = "compression-lz4")]
907+
#[test]
908+
fn test_decode_fixture_lz4() {
909+
// This data was obtained by watching rdkafka.
910+
let data = [
911+
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(),
912+
b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(),
913+
b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(),
914+
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(),
915+
b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(),
916+
b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(),
917+
b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(),
918+
]
919+
.concat();
920+
921+
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
922+
let expected = RecordBatch {
923+
base_offset: 0,
924+
partition_leader_epoch: 0,
925+
last_offset_delta: 0,
926+
first_timestamp: 1643649156900,
927+
max_timestamp: 1643649156900,
928+
producer_id: -1,
929+
producer_epoch: -1,
930+
base_sequence: -1,
931+
records: ControlBatchOrRecords::Records(vec![Record {
932+
timestamp_delta: 0,
933+
offset_delta: 0,
934+
key: vec![b'x'; 100],
935+
value: b"hello kafka".to_vec(),
936+
headers: vec![RecordHeader {
937+
key: "foo".to_owned(),
938+
value: b"bar".to_vec(),
939+
}],
940+
}]),
941+
compression: RecordBatchCompression::Lz4,
942+
is_transactional: false,
943+
timestamp_type: RecordBatchTimestampType::CreateTime,
944+
};
945+
assert_eq!(actual, expected);
946+
947+
let mut data2 = vec![];
948+
actual.write(&mut data2).unwrap();
949+
950+
// don't compare if the data is equal because compression encoder might work slightly differently, use another
951+
// roundtrip instead
952+
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
953+
assert_eq!(actual2, expected);
954+
}
872955
}

tests/produce_consume.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,30 @@ async fn test_produce_rskafka_consume_rskafka_gzip() {
5757
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Gzip).await;
5858
}
5959

60+
#[cfg(feature = "compression-lz4")]
61+
#[tokio::test]
62+
async fn test_produce_rdkafka_consume_rdkafka_lz4() {
63+
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Lz4).await;
64+
}
65+
66+
#[cfg(feature = "compression-lz4")]
67+
#[tokio::test]
68+
async fn test_produce_rskafka_consume_rdkafka_lz4() {
69+
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Lz4).await;
70+
}
71+
72+
#[cfg(feature = "compression-lz4")]
73+
#[tokio::test]
74+
async fn test_produce_rdkafka_consume_rskafka_lz4() {
75+
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Lz4).await;
76+
}
77+
78+
#[cfg(feature = "compression-lz4")]
79+
#[tokio::test]
80+
async fn test_produce_rskafka_consume_rskafka_lz4() {
81+
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Lz4).await;
82+
}
83+
6084
async fn assert_produce_consume<F1, G1, F2, G2>(
6185
f_produce: F1,
6286
f_consume: F2,

tests/rdkafka_helper.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub async fn produce(
2929
Compression::Gzip => {
3030
cfg.set("compression.codec", "gzip");
3131
}
32+
#[cfg(feature = "compression-lz4")]
33+
Compression::Lz4 => {
34+
cfg.set("compression.codec", "lz4");
35+
}
3236
}
3337
let client: FutureProducer<_> = cfg.create().unwrap();
3438

0 commit comments

Comments
 (0)