Skip to content

Commit ecda725

Browse files
committed
feat: support zstd (de)compression
1 parent 1de7759 commit ecda725

File tree

6 files changed

+106
-1
lines changed

6 files changed

+106
-1
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ time = "0.3"
3636
tokio = { version = "1.14", default-features = false, features = ["io-util", "net", "rt", "sync", "time"] }
3737
tokio-rustls = { version = "0.23", optional = true }
3838
tracing = "0.1"
39+
zstd = { version = "0.10", optional = true }
3940

4041
[dev-dependencies]
4142
assert_matches = "1.5"
@@ -45,7 +46,7 @@ futures = "0.3"
4546
proptest = "1"
4647
proptest-derive = "0.3"
4748
rustls-pemfile = "0.2"
48-
rdkafka = "0.28"
49+
rdkafka = { version = "0.28", default-features = false, features = ["libz", "tokio", "zstd"] }
4950
tokio = { version = "1.14", features = ["macros", "rt-multi-thread"] }
5051
tracing-log = "0.1"
5152
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
@@ -57,6 +58,7 @@ default = ["transport-tls"]
5758
compression-gzip = ["flate2"]
5859
compression-lz4 = ["lz4"]
5960
compression-snappy = ["snap"]
61+
compression-zstd = ["zstd"]
6062
fuzzing = []
6163
transport-socks5 = ["async-socks5"]
6264
transport-tls = ["rustls", "tokio-rustls"]

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ For more advanced production and consumption, see [`crate::client::producer`] an
9393
- **`compression-gzip`:** Support compression and decompression of messages using [gzip].
9494
- **`compression-lz4`:** Support compression and decompression of messages using [LZ4].
9595
- **`compression-snappy`:** Support compression and decompression of messages using [Snappy].
96+
- **`compression-zstd`:** Support compression and decompression of messages using [zstd].
9697
- **`fuzzing`:** Exposes some internal data structures so that they can be used by our fuzzers. This is NOT a stable
9798
feature / API!
9899
- **`transport-socks5`:** Allow transport via SOCKS5 proxy.
@@ -259,3 +260,4 @@ e.g. by batching writes to multiple partitions in a single ProduceRequest
259260
[Redpanda]: https://vectorized.io/redpanda
260261
[rustls]: https://github.com/rustls/rustls
261262
[Snappy]: https://github.com/google/snappy
263+
[zstd]: https://github.com/facebook/zstd

src/client/partition.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub enum Compression {
3434
Lz4,
3535
#[cfg(feature = "compression-snappy")]
3636
Snappy,
37+
#[cfg(feature = "compression-zstd")]
38+
Zstd,
3739
}
3840

3941
impl Default for Compression {
@@ -375,6 +377,8 @@ fn build_produce_request(
375377
Compression::Lz4 => RecordBatchCompression::Lz4,
376378
#[cfg(feature = "compression-snappy")]
377379
Compression::Snappy => RecordBatchCompression::Snappy,
380+
#[cfg(feature = "compression-zstd")]
381+
Compression::Zstd => RecordBatchCompression::Zstd,
378382
},
379383
timestamp_type: RecordBatchTimestampType::CreateTime,
380384
producer_id: -1,

src/protocol/record.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,17 @@ where
673673

674674
records
675675
}
676+
#[cfg(feature = "compression-zstd")]
677+
RecordBatchCompression::Zstd => {
678+
use zstd::Decoder;
679+
680+
let mut decoder = Decoder::new(reader)?;
681+
let records = Self::read_records(&mut decoder, is_control, n_records)?;
682+
683+
ensure_eof(&mut decoder, "Data left in zstd block")?;
684+
685+
records
686+
}
676687
#[allow(unreachable_patterns)]
677688
_ => {
678689
return Err(ReadError::Malformed(
@@ -843,6 +854,14 @@ where
843854

844855
writer.write_all(&output[..len])?;
845856
}
857+
#[cfg(feature = "compression-zstd")]
858+
RecordBatchCompression::Zstd => {
859+
use zstd::Encoder;
860+
861+
let mut encoder = Encoder::new(writer, 0)?;
862+
Self::write_records(&mut encoder, self.records)?;
863+
encoder.finish()?;
864+
}
846865
#[allow(unreachable_patterns)]
847866
_ => {
848867
return Err(WriteError::Malformed(
@@ -1114,6 +1133,56 @@ mod tests {
11141133
assert_eq!(actual2, expected);
11151134
}
11161135

1136+
#[cfg(feature = "compression-zstd")]
1137+
#[test]
1138+
fn test_decode_fixture_zstd() {
1139+
// This data was obtained by watching rdkafka.
1140+
let data = [
1141+
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5d\x00\x00\x00\x00".to_vec(),
1142+
b"\x02\xa1\x6e\x4e\x95\x00\x04\x00\x00\x00\x00\x00\x00\x01\x7e\xbf".to_vec(),
1143+
b"\x78\xf3\xad\x00\x00\x01\x7e\xbf\x78\xf3\xad\xff\xff\xff\xff\xff".to_vec(),
1144+
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\x28\xb5\x2f".to_vec(),
1145+
b"\xfd\x00\x58\x1d\x01\x00\xe8\xfc\x01\x00\x00\x00\xc8\x01\x78\x16".to_vec(),
1146+
b"\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02\x06\x66\x6f\x6f".to_vec(),
1147+
b"\x06\x62\x61\x72\x01\x00\x20\x05\x5c".to_vec(),
1148+
]
1149+
.concat();
1150+
1151+
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1152+
let expected = RecordBatch {
1153+
base_offset: 0,
1154+
partition_leader_epoch: 0,
1155+
last_offset_delta: 0,
1156+
first_timestamp: 1643889882029,
1157+
max_timestamp: 1643889882029,
1158+
producer_id: -1,
1159+
producer_epoch: -1,
1160+
base_sequence: -1,
1161+
records: ControlBatchOrRecords::Records(vec![Record {
1162+
timestamp_delta: 0,
1163+
offset_delta: 0,
1164+
key: Some(vec![b'x'; 100]),
1165+
value: Some(b"hello kafka".to_vec()),
1166+
headers: vec![RecordHeader {
1167+
key: "foo".to_owned(),
1168+
value: b"bar".to_vec(),
1169+
}],
1170+
}]),
1171+
compression: RecordBatchCompression::Zstd,
1172+
is_transactional: false,
1173+
timestamp_type: RecordBatchTimestampType::CreateTime,
1174+
};
1175+
assert_eq!(actual, expected);
1176+
1177+
let mut data2 = vec![];
1178+
actual.write(&mut data2).unwrap();
1179+
1180+
// don't compare if the data is equal because compression encoder might work slightly differently, use another
1181+
// roundtrip instead
1182+
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1183+
assert_eq!(actual2, expected);
1184+
}
1185+
11171186
#[test]
11181187
fn test_decode_fixture_null_key() {
11191188
// This data was obtained by watching rdkafka driven by IOx.

tests/produce_consume.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,30 @@ async fn test_produce_rskafka_consume_rskafka_snappy() {
105105
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Snappy).await;
106106
}
107107

108+
#[cfg(feature = "compression-zstd")]
109+
#[tokio::test]
110+
async fn test_produce_rdkafka_consume_rdkafka_zstd() {
111+
assert_produce_consume(produce_rdkafka, consume_rdkafka, Compression::Zstd).await;
112+
}
113+
114+
#[cfg(feature = "compression-zstd")]
115+
#[tokio::test]
116+
async fn test_produce_rskafka_consume_rdkafka_zstd() {
117+
assert_produce_consume(produce_rskafka, consume_rdkafka, Compression::Zstd).await;
118+
}
119+
120+
#[cfg(feature = "compression-zstd")]
121+
#[tokio::test]
122+
async fn test_produce_rdkafka_consume_rskafka_zstd() {
123+
assert_produce_consume(produce_rdkafka, consume_rskafka, Compression::Zstd).await;
124+
}
125+
126+
#[cfg(feature = "compression-zstd")]
127+
#[tokio::test]
128+
async fn test_produce_rskafka_consume_rskafka_zstd() {
129+
assert_produce_consume(produce_rskafka, consume_rskafka, Compression::Zstd).await;
130+
}
131+
108132
async fn assert_produce_consume<F1, G1, F2, G2>(
109133
f_produce: F1,
110134
f_consume: F2,

tests/rdkafka_helper.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ pub async fn produce(
3737
Compression::Snappy => {
3838
cfg.set("compression.codec", "snappy");
3939
}
40+
#[cfg(feature = "compression-zstd")]
41+
Compression::Zstd => {
42+
cfg.set("compression.codec", "zstd");
43+
}
4044
}
4145
let client: FutureProducer<_> = cfg.create().unwrap();
4246

0 commit comments

Comments
 (0)