Skip to content

Commit 67fa4ce

Browse files
committed
fix: ensure Java interopt
The only issue found by this is that we were not able to read Snappy-compressed data produced by Java clients, because Java uses its own weird framing format. Funnily Java clients can read the Snappy data what we produce. This is also in line with the rdkafka implemention (they can consume the "normal" and the special "Java" format but only produce the "normal" one).
1 parent 3af1939 commit 67fa4ce

File tree

6 files changed

+689
-43
lines changed

6 files changed

+689
-43
lines changed

.circleci/config.yml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@ commands:
3131
- /usr/local/cargo/registry
3232
key: cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.toml" }}
3333

34-
install_zlib_dev:
35-
description: Install zlib-dev
34+
install_packages:
35+
description: Install openjdk-11-jre and zlib-dev
3636
steps:
3737
- run:
38-
name: Install zlib-dev
38+
name: Install openjdk-11-jre and zlib-dev
3939
command: |
4040
sudo apt-get update
41-
sudo apt-get install -y zlib1g-dev
41+
sudo apt-get install -y openjdk-11-jre zlib1g-dev
4242
4343
jobs:
4444
fmt:
@@ -190,14 +190,15 @@ jobs:
190190
RUST_LOG: "trace"
191191
# Run integration tests
192192
TEST_INTEGRATION: 1
193+
TEST_JAVA_INTEROPT: 1
193194
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
194195
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
195196
KAFKA_CONNECT: "redpanda-1:9092"
196197
steps:
197198
- checkout
198199
- rust_components
199200
- cache_restore
200-
- install_zlib_dev
201+
- install_packages
201202
- run:
202203
name: Cargo test
203204
command: cargo test --all-features --all-targets
@@ -258,14 +259,15 @@ jobs:
258259
TEST_INTEGRATION: 1
259260
# Kafka support DeleteRecords
260261
TEST_DELETE_RECORDS: 1
262+
TEST_JAVA_INTEROPT: 1
261263
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
262264
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
263265
KAFKA_CONNECT: "kafka-1:9093"
264266
steps:
265267
- checkout
266268
- rust_components
267269
- cache_restore
268-
- install_zlib_dev
270+
- install_packages
269271
- run:
270272
name: Cargo test
271273
command: cargo test --all-features --all-targets

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ assert_matches = "1.5"
4343
criterion = { version = "0.3", features = ["async_tokio"] }
4444
dotenv = "0.15"
4545
futures = "0.3"
46+
j4rs = "0.13"
47+
once_cell = "1.9"
48+
procspawn = "0.10"
4649
proptest = "1"
4750
proptest-derive = "0.3"
4851
rustls-pemfile = "0.3"

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ $ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo te
136136
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
137137
environment variables.
138138

139+
### Java Interopt
140+
To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the
141+
`TEST_JAVA_INTEROPT=1` environment variable set.
142+
139143
### Fuzzing
140144
RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have [cargo-fuzz] installed.
141145
Select one of the following fuzzers:

src/protocol/record.rs

Lines changed: 138 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -619,49 +619,39 @@ where
619619
}
620620
#[cfg(feature = "compression-snappy")]
621621
RecordBatchCompression::Snappy => {
622-
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
623-
use snap::raw::{decompress_len, Decoder};
624-
625622
// Construct the input for the raw decoder.
626623
let mut input = vec![];
627624
reader.read_to_end(&mut input)?;
628625

629-
// The snappy compression used here is unframed aka "raw". So we first need to figure out the
630-
// uncompressed length. See
631-
//
632-
// - https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L345-L348
633-
// - https://github.com/edenhill/librdkafka/blob/747f77c98fbddf7dc6508f76398e0fc9ee91450f/src/snappy.c#L779
634-
let uncompressed_size = decompress_len(&input).unwrap();
635-
636-
// Decode snappy payload.
637-
// The uncompressed length is unchecked and can be up to 2^32-1 bytes. To avoid a DDoS vector we try to
638-
// limit it to a small size and if that fails we double that size;
639-
let mut max_uncompressed_size = DEFAULT_BLOCK_SIZE;
640-
let output = loop {
641-
let try_uncompressed_size = uncompressed_size.min(max_uncompressed_size);
642-
643-
let mut decoder = Decoder::new();
644-
let mut output = vec![0; try_uncompressed_size];
645-
let actual_uncompressed_size = match decoder.decompress(&input, &mut output) {
646-
Ok(size) => size,
647-
Err(snap::Error::BufferTooSmall { .. })
648-
if max_uncompressed_size < uncompressed_size =>
649-
{
650-
// try larger buffer
651-
max_uncompressed_size *= 2;
652-
continue;
653-
}
654-
Err(e) => {
655-
return Err(ReadError::Malformed(Box::new(e)));
656-
}
657-
};
658-
if actual_uncompressed_size != uncompressed_size {
659-
return Err(ReadError::Malformed(
660-
"broken snappy data".to_string().into(),
661-
));
626+
const JAVA_MAGIC: &[u8] = &[0x82, b'S', b'N', b'A', b'P', b'P', b'Y', 0];
627+
628+
// There are "normal" compression libs, and there is Java
629+
// See https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L307-L318
630+
let output = if input.starts_with(JAVA_MAGIC) {
631+
let mut cursor = Cursor::new(&input[JAVA_MAGIC.len()..]);
632+
633+
let mut buf_version = [0u8; 4];
634+
cursor.read_exact(&mut buf_version)?;
635+
636+
let mut buf_compatible = [0u8; 4];
637+
cursor.read_exact(&mut buf_compatible)?;
638+
639+
let mut output = vec![];
640+
while cursor.position() < cursor.get_ref().len() as u64 {
641+
let mut buf_chunk_length = [0u8; 4];
642+
cursor.read_exact(&mut buf_chunk_length)?;
643+
let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize;
644+
645+
let mut chunk_data = vec![0u8; chunk_length];
646+
cursor.read_exact(&mut chunk_data)?;
647+
648+
let mut buf = carefully_decompress_snappy(&chunk_data)?;
649+
output.append(&mut buf);
662650
}
663651

664-
break output;
652+
output
653+
} else {
654+
carefully_decompress_snappy(&input)?
665655
};
666656

667657
// Read uncompressed records.
@@ -889,6 +879,51 @@ where
889879
}
890880
}
891881

882+
#[cfg(feature = "compression-snappy")]
883+
fn carefully_decompress_snappy(input: &[u8]) -> Result<Vec<u8>, ReadError> {
884+
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
885+
use snap::raw::{decompress_len, Decoder};
886+
887+
// The snappy compression used here is unframed aka "raw". So we first need to figure out the
888+
// uncompressed length. See
889+
//
890+
// - https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L345-L348
891+
// - https://github.com/edenhill/librdkafka/blob/747f77c98fbddf7dc6508f76398e0fc9ee91450f/src/snappy.c#L779
892+
let uncompressed_size = decompress_len(input).map_err(|e| ReadError::Malformed(Box::new(e)))?;
893+
894+
// Decode snappy payload.
895+
// The uncompressed length is unchecked and can be up to 2^32-1 bytes. To avoid a DDoS vector we try to
896+
// limit it to a small size and if that fails we double that size;
897+
let mut max_uncompressed_size = DEFAULT_BLOCK_SIZE;
898+
899+
loop {
900+
let try_uncompressed_size = uncompressed_size.min(max_uncompressed_size);
901+
902+
let mut decoder = Decoder::new();
903+
let mut output = vec![0; try_uncompressed_size];
904+
let actual_uncompressed_size = match decoder.decompress(input, &mut output) {
905+
Ok(size) => size,
906+
Err(snap::Error::BufferTooSmall { .. })
907+
if max_uncompressed_size < uncompressed_size =>
908+
{
909+
// try larger buffer
910+
max_uncompressed_size *= 2;
911+
continue;
912+
}
913+
Err(e) => {
914+
return Err(ReadError::Malformed(Box::new(e)));
915+
}
916+
};
917+
if actual_uncompressed_size != uncompressed_size {
918+
return Err(ReadError::Malformed(
919+
"broken snappy data".to_string().into(),
920+
));
921+
}
922+
923+
break Ok(output);
924+
}
925+
}
926+
892927
#[cfg(test)]
893928
mod tests {
894929
use std::io::Cursor;
@@ -1133,6 +1168,72 @@ mod tests {
11331168
assert_eq!(actual2, expected);
11341169
}
11351170

1171+
#[cfg(feature = "compression-snappy")]
1172+
#[test]
1173+
fn test_decode_fixture_snappy_java() {
1174+
// This data was obtained by watching Kafka returning a recording to rskafka that was produced by the official
1175+
// Java client.
1176+
let data = [
1177+
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x8c\x00\x00\x00\x00".to_vec(),
1178+
b"\x02\x79\x1e\x2d\xce\x00\x02\x00\x00\x00\x01\x00\x00\x01\x7f\x07".to_vec(),
1179+
b"\x25\x7a\xb1\x00\x00\x01\x7f\x07\x25\x7a\xb1\xff\xff\xff\xff\xff".to_vec(),
1180+
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x02\x82\x53\x4e".to_vec(),
1181+
b"\x41\x50\x50\x59\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00".to_vec(),
1182+
b"\x47\xff\x01\x1c\xfc\x01\x00\x00\x00\xc8\x01\x78\xfe\x01\x00\x8a".to_vec(),
1183+
b"\x01\x00\x64\x16\x68\x65\x6c\x6c\x6f\x20\x6b\x61\x66\x6b\x61\x02".to_vec(),
1184+
b"\x06\x66\x6f\x6f\x06\x62\x61\x72\xfa\x01\x00\x00\x02\xfe\x80\x00".to_vec(),
1185+
b"\x96\x80\x00\x4c\x14\x73\x6f\x6d\x65\x20\x76\x61\x6c\x75\x65\x02".to_vec(),
1186+
b"\x06\x66\x6f\x6f\x06\x62\x61\x72".to_vec(),
1187+
]
1188+
.concat();
1189+
1190+
let actual = RecordBatch::read(&mut Cursor::new(data)).unwrap();
1191+
let expected = RecordBatch {
1192+
base_offset: 0,
1193+
partition_leader_epoch: 0,
1194+
last_offset_delta: 1,
1195+
first_timestamp: 1645092371121,
1196+
max_timestamp: 1645092371121,
1197+
producer_id: -1,
1198+
producer_epoch: -1,
1199+
base_sequence: -1,
1200+
records: ControlBatchOrRecords::Records(vec![
1201+
Record {
1202+
timestamp_delta: 0,
1203+
offset_delta: 0,
1204+
key: Some(vec![b'x'; 100]),
1205+
value: Some(b"hello kafka".to_vec()),
1206+
headers: vec![RecordHeader {
1207+
key: "foo".to_owned(),
1208+
value: b"bar".to_vec(),
1209+
}],
1210+
},
1211+
Record {
1212+
timestamp_delta: 0,
1213+
offset_delta: 1,
1214+
key: Some(vec![b'x'; 100]),
1215+
value: Some(b"some value".to_vec()),
1216+
headers: vec![RecordHeader {
1217+
key: "foo".to_owned(),
1218+
value: b"bar".to_vec(),
1219+
}],
1220+
},
1221+
]),
1222+
compression: RecordBatchCompression::Snappy,
1223+
is_transactional: false,
1224+
timestamp_type: RecordBatchTimestampType::CreateTime,
1225+
};
1226+
assert_eq!(actual, expected);
1227+
1228+
let mut data2 = vec![];
1229+
actual.write(&mut data2).unwrap();
1230+
1231+
// don't compare if the data is equal because compression encoder might work slightly differently, use another
1232+
// roundtrip instead
1233+
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
1234+
assert_eq!(actual2, expected);
1235+
}
1236+
11361237
#[cfg(feature = "compression-zstd")]
11371238
#[test]
11381239
fn test_decode_fixture_zstd() {

0 commit comments

Comments
 (0)