Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ crc32c = "0.6"
flate2 = { version = "1", optional = true }
futures = "0.3"
integer-encoding = "3"
lz4 = { version = "1.23", optional = true }
parking_lot = "0.12"
pin-project-lite = "0.2"
rand = "0.8"
Expand Down Expand Up @@ -53,6 +54,7 @@ uuid = { version = "0.8", features = ["v4"] }
default = ["transport-tls"]

compression-gzip = ["flate2"]
compression-lz4 = ["lz4"]
fuzzing = []
transport-socks5 = ["async-socks5"]
transport-tls = ["rustls", "tokio-rustls"]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ For more advanced production and consumption, see [`crate::client::producer`] an
## Features

- **`compression-gzip`:** Support compression and decompression of messages using [gzip].
- **`compression-lz4`:** Support compression and decompression of messages using [LZ4].
- **`fuzzing`:** Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable
feature / API!
- **`transport-socks5`:** Allow transport via SOCKS5 proxy.
Expand Down Expand Up @@ -252,6 +253,7 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
[gzip]: https://en.wikipedia.org/wiki/Gzip
[IOx]: https://github.com/influxdata/influxdb_iox/
[LLDB]: https://lldb.llvm.org/
[LZ4]: https://lz4.github.io/lz4/
[perf]: https://perf.wiki.kernel.org/index.php/Main_Page
[Redpanda]: https://vectorized.io/redpanda
[rustls]: https://github.com/rustls/rustls
4 changes: 4 additions & 0 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum Compression {
NoCompression,
#[cfg(feature = "compression-gzip")]
Gzip,
#[cfg(feature = "compression-lz4")]
Lz4,
}

impl Default for Compression {
Expand Down Expand Up @@ -367,6 +369,8 @@ fn build_produce_request(
Compression::NoCompression => RecordBatchCompression::NoCompression,
#[cfg(feature = "compression-gzip")]
Compression::Gzip => RecordBatchCompression::Gzip,
#[cfg(feature = "compression-lz4")]
Compression::Lz4 => RecordBatchCompression::Lz4,
},
timestamp_type: RecordBatchTimestampType::CreateTime,
producer_id: -1,
Expand Down
106 changes: 101 additions & 5 deletions src/protocol/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,26 @@ where
#[cfg(feature = "compression-gzip")]
RecordBatchCompression::Gzip => {
use flate2::read::GzDecoder;
Self::read_records(&mut GzDecoder::new(reader), is_control, n_records)?
let mut decoder = GzDecoder::new(reader);
let records = Self::read_records(&mut decoder, is_control, n_records)?;

ensure_eof(&mut decoder, "Data left in gzip block")?;

records
}
#[cfg(feature = "compression-lz4")]
RecordBatchCompression::Lz4 => {
use lz4::Decoder;
let mut decoder = Decoder::new(reader)?;
let records = Self::read_records(&mut decoder, is_control, n_records)?;

// the lz4 decoder requires us to consume the whole inner stream until we reach EOF
ensure_eof(&mut decoder, "Data left in LZ4 block")?;

let (_reader, res) = decoder.finish();
res?;

records
}
#[allow(unreachable_patterns)]
_ => {
Expand Down Expand Up @@ -709,10 +728,22 @@ where
#[cfg(feature = "compression-gzip")]
RecordBatchCompression::Gzip => {
use flate2::{write::GzEncoder, Compression};
Self::write_records(
&mut GzEncoder::new(writer, Compression::default()),
self.records,
)?;
let mut encoder = GzEncoder::new(writer, Compression::default());
Self::write_records(&mut encoder, self.records)?;
encoder.finish()?;
}
#[cfg(feature = "compression-lz4")]
RecordBatchCompression::Lz4 => {
use lz4::{liblz4::BlockMode, EncoderBuilder};
let mut encoder = EncoderBuilder::new()
.block_mode(
// the only one supported by Kafka
BlockMode::Independent,
)
.build(writer)?;
Self::write_records(&mut encoder, self.records)?;
let (_writer, res) = encoder.finish();
res?;
}
#[allow(unreachable_patterns)]
_ => {
Expand All @@ -726,6 +757,21 @@ where
}
}

/// Ensure that given reader is at EOF.
#[allow(dead_code)] // only use by some features
fn ensure_eof<R>(reader: &mut R, msg: &str) -> Result<(), ReadError>
where
R: Read,
{
let mut buf = [0u8; 1];
match reader.read(&mut buf) {
Ok(0) => Ok(()),
Ok(_) => Err(ReadError::Malformed(msg.to_string().into())),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()),
Err(e) => Err(ReadError::IO(e)),
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand Down Expand Up @@ -869,4 +915,54 @@ mod tests {
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}

#[cfg(feature = "compression-lz4")]
#[test]
fn test_decode_fixture_lz4() {
// This data was obtained by watching rdkafka.
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x63\x00\x00\x00\x00".to_vec(),
b"\x02\x1b\xa5\x92\x35\x00\x03\x00\x00\x00\x00\x00\x00\x01\x7e\xb1".to_vec(),
b"\x1f\xc7\x24\x00\x00\x01\x7e\xb1\x1f\xc7\x24\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x04\x22\x4d".to_vec(),
b"\x18\x60\x40\x82\x23\x00\x00\x00\x8f\xfc\x01\x00\x00\x00\xc8\x01".to_vec(),
b"\x78\x01\x00\x50\xf0\x06\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66".to_vec(),
b"\x6b\x61\x02\x06\x66\x6f\x6f\x06\x62\x61\x72\x00\x00\x00\x00".to_vec(),
]
.concat();

let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643649156900,
max_timestamp: 1643649156900,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: vec![b'x'; 100],
value: b"hello kafka".to_vec(),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
}],
}]),
compression: RecordBatchCompression::Lz4,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);

let mut data2 = vec![];
actual.write(&mut data2).unwrap();

// don't compare if the data is equal because compression encoder might work slightly differently, use another
// roundtrip instead
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}
}
24 changes: 24 additions & 0 deletions tests/produce_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ async fn test_produce_rskafka_consume_rskafka_gzip() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Gzip).await;
}

#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rdkafka_lz4() {
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Lz4).await;
}

#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rskafka_consume_rdkafka_lz4() {
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Lz4).await;
}

#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rdkafka_consume_rskafka_lz4() {
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Lz4).await;
}

#[cfg(feature = "compression-lz4")]
#[tokio::test]
async fn test_produce_rskafka_consume_rskafka_lz4() {
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Lz4).await;
}

async fn assert_produce_consume<F1, G1, F2, G2>(
f_produce: F1,
f_consume: F2,
Expand Down
4 changes: 4 additions & 0 deletions tests/rdkafka_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub async fn produce(
Compression::Gzip => {
cfg.set("compression.codec", "gzip");
}
#[cfg(feature = "compression-lz4")]
Compression::Lz4 => {
cfg.set("compression.codec", "lz4");
}
}
let client: FutureProducer<_> = cfg.create().unwrap();

Expand Down