Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/protocol/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,8 @@ where
// There are "normal" compression libs, and there is Java
// See https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L307-L318
let output = if input.starts_with(JAVA_MAGIC) {
let mut cursor = Cursor::new(&input[JAVA_MAGIC.len()..]);
let cursor_content = &input[JAVA_MAGIC.len()..];
let mut cursor = Cursor::new(cursor_content);

let mut buf_version = [0u8; 4];
cursor.read_exact(&mut buf_version)?;
Expand All @@ -653,6 +654,11 @@ where
let mut buf_chunk_length = [0u8; 4];
cursor.read_exact(&mut buf_chunk_length)?;
let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize;
let bytes_left = cursor_content.len() - (cursor.position() as usize);
if chunk_length > bytes_left {
// do NOT try to allocate massive buffer for `chunk_data` but instead fail early
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return Err(ReadError::Malformed(format!("Java-specific Snappy-compressed data has illegal chunk length, got {chunk_length} bytes but only {bytes_left} bytes are left.").into()));
}

let mut chunk_data = vec![0u8; chunk_length];
cursor.read_exact(&mut chunk_data)?;
Expand Down Expand Up @@ -1312,6 +1318,26 @@ mod tests {
assert_eq!(actual2, expected);
}

#[test]
fn test_decode_java_specific_oom() {
// Found by the fuzzer, this should return an error instead of OOM.
let data = [
0x0a, 0x0a, 0x83, 0x00, 0xd4, 0x00, 0x00, 0x22, 0x00, 0x4b, 0x08, 0xd2, 0x22, 0xfb,
0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x9b, 0x00, 0x9b, 0x0a, 0x40,
0x00, 0x00, 0x4b, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0xd3, 0x82, 0x53,
0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x03, 0x01, 0x00, 0x00, 0xfc, 0x00, 0x09, 0x09, 0x09, 0x09, 0x09,
0x09, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0x00, 0x80,
0x00, 0x00, 0x00, 0x00, 0xb0, 0x9b, 0x00,
]
.to_vec();

let err = RecordBatchBody::read(&mut Cursor::new(data)).unwrap_err();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

assert_matches!(err, ReadError::Malformed(_));
assert_eq!(err.to_string(), "Malformed data: Java-specific Snappy-compressed data has illegal chunk length, got 4227860745 bytes but only 38 bytes are left.");
}

#[test]
fn test_carefully_decompress_snappy_empty_input() {
let err = carefully_decompress_snappy(&[], 1).unwrap_err();
Expand Down