diff --git a/Cargo.toml b/Cargo.toml index ae4017a5..dbf1a6f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ crc32c = "0.6" flate2 = { version = "1", optional = true } futures = "0.3" integer-encoding = "3" +lz4 = { version = "1.23", optional = true } parking_lot = "0.12" pin-project-lite = "0.2" rand = "0.8" @@ -53,6 +54,7 @@ uuid = { version = "0.8", features = ["v4"] } default = ["transport-tls"] compression-gzip = ["flate2"] +compression-lz4 = ["lz4"] fuzzing = [] transport-socks5 = ["async-socks5"] transport-tls = ["rustls", "tokio-rustls"] diff --git a/README.md b/README.md index 568459d6..37006297 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ For more advanced production and consumption, see [`crate::client::producer`] an ## Features - **`compression-gzip`:** Support compression and decompression of messages using [gzip]. +- **`compression-lz4`:** Support compression and decompression of messages using [LZ4]. - **`fuzzing`:** Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable feature / API! - **`transport-socks5`:** Allow transport via SOCKS5 proxy. @@ -252,6 +253,7 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest [gzip]: https://en.wikipedia.org/wiki/Gzip [IOx]: https://github.com/influxdata/influxdb_iox/ [LLDB]: https://lldb.llvm.org/ +[LZ4]: https://lz4.github.io/lz4/ [perf]: https://perf.wiki.kernel.org/index.php/Main_Page [Redpanda]: https://vectorized.io/redpanda [rustls]: https://github.com/rustls/rustls diff --git a/src/client/partition.rs b/src/client/partition.rs index 7dd2b773..316cff95 100644 --- a/src/client/partition.rs +++ b/src/client/partition.rs @@ -30,6 +30,8 @@ pub enum Compression { NoCompression, #[cfg(feature = "compression-gzip")] Gzip, + #[cfg(feature = "compression-lz4")] + Lz4, } impl Default for Compression { @@ -367,6 +369,8 @@ fn build_produce_request( Compression::NoCompression => RecordBatchCompression::NoCompression, #[cfg(feature = "compression-gzip")] Compression::Gzip => RecordBatchCompression::Gzip, + #[cfg(feature = "compression-lz4")] + Compression::Lz4 => RecordBatchCompression::Lz4, }, timestamp_type: RecordBatchTimestampType::CreateTime, producer_id: -1, diff --git a/src/protocol/record.rs b/src/protocol/record.rs index 16fb7641..357794cb 100644 --- a/src/protocol/record.rs +++ b/src/protocol/record.rs @@ -571,7 +571,26 @@ where #[cfg(feature = "compression-gzip")] RecordBatchCompression::Gzip => { use flate2::read::GzDecoder; - Self::read_records(&mut GzDecoder::new(reader), is_control, n_records)? + let mut decoder = GzDecoder::new(reader); + let records = Self::read_records(&mut decoder, is_control, n_records)?; + + ensure_eof(&mut decoder, "Data left in gzip block")?; + + records + } + #[cfg(feature = "compression-lz4")] + RecordBatchCompression::Lz4 => { + use lz4::Decoder; + let mut decoder = Decoder::new(reader)?; + let records = Self::read_records(&mut decoder, is_control, n_records)?; + + // the lz4 decoder requires us to consume the whole inner stream until we reach EOF + ensure_eof(&mut decoder, "Data left in LZ4 block")?; + + let (_reader, res) = decoder.finish(); + res?; + + records } #[allow(unreachable_patterns)] _ => { @@ -709,10 +728,22 @@ where #[cfg(feature = "compression-gzip")] RecordBatchCompression::Gzip => { use flate2::{write::GzEncoder, Compression}; - Self::write_records( - &mut GzEncoder::new(writer, Compression::default()), - self.records, - )?; + let mut encoder = GzEncoder::new(writer, Compression::default()); + Self::write_records(&mut encoder, self.records)?; + encoder.finish()?; + } + #[cfg(feature = "compression-lz4")] + RecordBatchCompression::Lz4 => { + use lz4::{liblz4::BlockMode, EncoderBuilder}; + let mut encoder = EncoderBuilder::new() + .block_mode( + // the only one supported by Kafka + BlockMode::Independent, + ) + .build(writer)?; + Self::write_records(&mut encoder, self.records)?; + let (_writer, res) = encoder.finish(); + res?; } #[allow(unreachable_patterns)] _ => { @@ -726,6 +757,21 @@ where } } +/// Ensure that given reader is at EOF. +#[allow(dead_code)] // only use by some features +fn ensure_eof(reader: &mut R, msg: &str) -> Result<(), ReadError> +where + R: Read, +{ + let mut buf = [0u8; 1]; + match reader.read(&mut buf) { + Ok(0) => Ok(()), + Ok(_) => Err(ReadError::Malformed(msg.to_string().into())), + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()), + Err(e) => Err(ReadError::IO(e)), + } +} + #[cfg(test)] mod tests { use std::io::Cursor; @@ -869,4 +915,54 @@ mod tests { let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap(); assert_eq!(actual2, expected); } + + #[cfg(feature = "compression-lz4")] + #[test] + fn test_decode_fixture_lz4() { + // This data was obtained by watching rdkafka. + let data = [ + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(), + b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(), + b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(), + b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(), + b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(), + b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(), + b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(), + ] + .concat(); + + let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap(); + let expected = RecordBatch { + base_offset: 0, + partition_leader_epoch: 0, + last_offset_delta: 0, + first_timestamp: 1643649156900, + max_timestamp: 1643649156900, + producer_id: -1, + producer_epoch: -1, + base_sequence: -1, + records: ControlBatchOrRecords::Records(vec![Record { + timestamp_delta: 0, + offset_delta: 0, + key: vec![b'x'; 100], + value: b"hello kafka".to_vec(), + headers: vec![RecordHeader { + key: "foo".to_owned(), + value: b"bar".to_vec(), + }], + }]), + compression: RecordBatchCompression::Lz4, + is_transactional: false, + timestamp_type: RecordBatchTimestampType::CreateTime, + }; + assert_eq!(actual, expected); + + let mut data2 = vec![]; + actual.write(&mut data2).unwrap(); + + // don't compare if the data is equal because compression encoder might work slightly differently, use another + // roundtrip instead + let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap(); + assert_eq!(actual2, expected); + } } diff --git a/tests/produce_consume.rs b/tests/produce_consume.rs index c77463ef..19216b7c 100644 --- a/tests/produce_consume.rs +++ b/tests/produce_consume.rs @@ -57,6 +57,30 @@ async fn test_produce_rskafka_consume_rskafka_gzip() { assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Gzip).await; } +#[cfg(feature = "compression-lz4")] +#[tokio::test] +async fn test_produce_rdkafka_consume_rdkafka_lz4() { + assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Lz4).await; +} + +#[cfg(feature = "compression-lz4")] +#[tokio::test] +async fn test_produce_rskafka_consume_rdkafka_lz4() { + assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Lz4).await; +} + +#[cfg(feature = "compression-lz4")] +#[tokio::test] +async fn test_produce_rdkafka_consume_rskafka_lz4() { + assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Lz4).await; +} + +#[cfg(feature = "compression-lz4")] +#[tokio::test] +async fn test_produce_rskafka_consume_rskafka_lz4() { + assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Lz4).await; +} + async fn assert_produce_consume( f_produce: F1, f_consume: F2, diff --git a/tests/rdkafka_helper.rs b/tests/rdkafka_helper.rs index 9e2f6e71..a7adce0e 100644 --- a/tests/rdkafka_helper.rs +++ b/tests/rdkafka_helper.rs @@ -29,6 +29,10 @@ pub async fn produce( Compression::Gzip => { cfg.set("compression.codec", "gzip"); } + #[cfg(feature = "compression-lz4")] + Compression::Lz4 => { + cfg.set("compression.codec", "lz4"); + } } let client: FutureProducer<_> = cfg.create().unwrap();