Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ let partition_client = client

// produce some data
let record = Record {
key: b"".to_vec(),
value: b"hello kafka".to_vec(),
key: None,
value: Some(b"hello kafka".to_vec()),
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
Expand Down
8 changes: 4 additions & 4 deletions benches/write_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
async move {
let client = setup_rskafka(connection).await;
let record = Record {
key,
value,
key: Some(key),
value: Some(value),
headers: BTreeMap::default(),
timestamp: OffsetDateTime::now_utc(),
};
Expand Down Expand Up @@ -139,8 +139,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
async move {
let client = setup_rskafka(connection).await;
let record = Record {
key,
value,
key: Some(key),
value: Some(value),
headers: BTreeMap::default(),
timestamp: OffsetDateTime::now_utc(),
};
Expand Down
8 changes: 4 additions & 4 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ mod tests {
#[tokio::test]
async fn test_consumer() {
let record = Record {
key: vec![0; 4],
value: vec![0; 6],
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
};
Expand Down Expand Up @@ -412,8 +412,8 @@ mod tests {
#[tokio::test]
async fn test_consumer_timeout() {
let record = Record {
key: vec![0; 4],
value: vec![0; 6],
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
};
Expand Down
12 changes: 6 additions & 6 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
//!
//! // produce data
//! let record = Record {
//! key: b"".to_vec(),
//! value: b"hello kafka".to_vec(),
//! key: None,
//! value: Some(b"hello kafka".to_vec()),
//! headers: BTreeMap::from([
//! ("foo".to_owned(), b"bar".to_vec()),
//! ]),
Expand Down Expand Up @@ -145,8 +145,8 @@
//! let data = std::mem::take(&mut self.data);
//! let records = vec![
//! Record {
//! key: b"".to_vec(),
//! value: data,
//! key: None,
//! value: Some(data),
//! headers: BTreeMap::from([
//! ("foo".to_owned(), b"bar".to_vec()),
//! ]),
Expand Down Expand Up @@ -542,8 +542,8 @@ mod tests {

fn record() -> Record {
Record {
key: vec![0; 4],
value: vec![0; 6],
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::from_unix_timestamp(320).unwrap(),
}
Expand Down
6 changes: 3 additions & 3 deletions src/client/producer/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ mod tests {
#[test]
fn test_record_aggregator() {
let r1 = Record {
key: vec![0; 45],
value: vec![0; 2],
key: Some(vec![0; 45]),
value: Some(vec![0; 2]),
headers: Default::default(),
timestamp: OffsetDateTime::from_unix_timestamp(20).unwrap(),
};

let r2 = Record {
value: vec![0; 34],
value: Some(vec![0; 34]),
..r1.clone()
};

Expand Down
148 changes: 120 additions & 28 deletions src/protocol/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use std::io::{Cursor, Read, Write};
#[cfg(test)]
use proptest::prelude::*;

use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;

use super::{
primitives::{Int16, Int32, Int64, Int8, Varint, Varlong},
traits::{ReadError, ReadType, WriteError, WriteType},
Expand Down Expand Up @@ -94,8 +92,8 @@ where
pub struct Record {
pub timestamp_delta: i64,
pub offset_delta: i32,
pub key: Vec<u8>,
pub value: Vec<u8>,
pub key: Option<Vec<u8>>,
pub value: Option<Vec<u8>>,
pub headers: Vec<RecordHeader>,
}

Expand All @@ -119,18 +117,26 @@ where
let offset_delta = Varint::read(reader)?.0;

// key
let len = Varint::read(reader)?;
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut key = VecBuilder::new(len);
key = key.read_exact(reader)?;
let key = key.into();
let len = Varint::read(reader)?.0;
let key = if len == -1 {
None
} else {
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut key = VecBuilder::new(len);
key = key.read_exact(reader)?;
Some(key.into())
};

// value
let len = Varint::read(reader)?;
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut value = VecBuilder::new(len);
value = value.read_exact(reader)?;
let value = value.into();
let len = Varint::read(reader)?.0;
let value = if len == -1 {
None
} else {
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
let mut value = VecBuilder::new(len);
value = value.read_exact(reader)?;
Some(value.into())
};

// headers
// Note: This is NOT a normal array but uses a Varint instead.
Expand Down Expand Up @@ -179,14 +185,29 @@ where
Varint(self.offset_delta).write(&mut data)?;

// key
let l = i32::try_from(self.key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(&self.key)?;
match &self.key {
Some(key) => {
let l = i32::try_from(key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(key)?;
}
None => {
Varint(-1).write(&mut data)?;
}
}

// value
let l = i32::try_from(self.value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(&self.value)?;
match &self.value {
Some(value) => {
let l =
i32::try_from(value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
Varint(l).write(&mut data)?;
data.write_all(value)?;
}
None => {
Varint(-1).write(&mut data)?;
}
}

// headers
// Note: This is NOT a normal array but uses a Varint instead.
Expand Down Expand Up @@ -598,6 +619,7 @@ where
}
#[cfg(feature = "compression-snappy")]
RecordBatchCompression::Snappy => {
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
use snap::raw::{decompress_len, Decoder};

// Construct the input for the raw decoder.
Expand Down Expand Up @@ -924,8 +946,8 @@ mod tests {
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: vec![],
value: b"hello kafka".to_vec(),
key: Some(vec![]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
Expand Down Expand Up @@ -970,8 +992,8 @@ mod tests {
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: vec![b'x'; 100],
value: b"hello kafka".to_vec(),
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
Expand Down Expand Up @@ -1020,8 +1042,8 @@ mod tests {
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: vec![b'x'; 100],
value: b"hello kafka".to_vec(),
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
Expand Down Expand Up @@ -1070,8 +1092,8 @@ mod tests {
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: vec![b'x'; 100],
value: b"hello kafka".to_vec(),
key: Some(vec![b'x'; 100]),
value: Some(b"hello kafka".to_vec()),
headers: vec![RecordHeader {
key: "foo".to_owned(),
value: b"bar".to_vec(),
Expand All @@ -1091,4 +1113,74 @@ mod tests {
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
assert_eq!(actual2, expected);
}

#[test]
fn test_decode_fixture_null_key() {
// This data was obtained by watching rdkafka driven by IOx.
let data = [
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x1a\x00\x00\x00\x00".to_vec(),
b"\x02\x67\x98\xb9\x54\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7e\xbe".to_vec(),
b"\xdc\x91\xf6\x00\x00\x01\x7e\xbe\xdc\x91\xf6\xff\xff\xff\xff\xff".to_vec(),
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\xce\x03\x00".to_vec(),
b"\x00\x00\x01\xce\x01\x0a\x65\x0a\x2f\x74\x65\x73\x74\x5f\x74\x6f".to_vec(),
b"\x70\x69\x63\x5f\x33\x37\x33\x39\x38\x66\x38\x64\x2d\x39\x35\x66".to_vec(),
b"\x38\x2d\x34\x34\x65\x65\x2d\x38\x33\x61\x34\x2d\x34\x64\x30\x63".to_vec(),
b"\x35\x39\x32\x62\x34\x34\x36\x64\x12\x32\x0a\x03\x75\x70\x63\x12".to_vec(),
b"\x17\x0a\x04\x75\x73\x65\x72\x10\x03\x1a\x0a\x12\x08\x00\x00\x00".to_vec(),
b"\x00\x00\x00\xf0\x3f\x22\x01\x00\x12\x10\x0a\x04\x74\x69\x6d\x65".to_vec(),
b"\x10\x04\x1a\x03\x0a\x01\x64\x22\x01\x00\x18\x01\x04\x18\x63\x6f".to_vec(),
b"\x6e\x74\x65\x6e\x74\x2d\x74\x79\x70\x65\xa4\x01\x61\x70\x70\x6c".to_vec(),
b"\x69\x63\x61\x74\x69\x6f\x6e\x2f\x78\x2d\x70\x72\x6f\x74\x6f\x62".to_vec(),
b"\x75\x66\x3b\x20\x73\x63\x68\x65\x6d\x61\x3d\x22\x69\x6e\x66\x6c".to_vec(),
b"\x75\x78\x64\x61\x74\x61\x2e\x69\x6f\x78\x2e\x77\x72\x69\x74\x65".to_vec(),
b"\x5f\x62\x75\x66\x66\x65\x72\x2e\x76\x31\x2e\x57\x72\x69\x74\x65".to_vec(),
b"\x42\x75\x66\x66\x65\x72\x50\x61\x79\x6c\x6f\x61\x64\x22\x1a\x69".to_vec(),
b"\x6f\x78\x2d\x6e\x61\x6d\x65\x73\x70\x61\x63\x65\x12\x6e\x61\x6d".to_vec(),
b"\x65\x73\x70\x61\x63\x65".to_vec(),
]
.concat();

let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
let expected = RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: 0,
first_timestamp: 1643879633398,
max_timestamp: 1643879633398,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: ControlBatchOrRecords::Records(vec![Record {
timestamp_delta: 0,
offset_delta: 0,
key: None,
value: Some(vec![
10, 101, 10, 47, 116, 101, 115, 116, 95, 116, 111, 112, 105, 99, 95, 51, 55,
51, 57, 56, 102, 56, 100, 45, 57, 53, 102, 56, 45, 52, 52, 101, 101, 45, 56,
51, 97, 52, 45, 52, 100, 48, 99, 53, 57, 50, 98, 52, 52, 54, 100, 18, 50, 10,
3, 117, 112, 99, 18, 23, 10, 4, 117, 115, 101, 114, 16, 3, 26, 10, 18, 8, 0, 0,
0, 0, 0, 0, 240, 63, 34, 1, 0, 18, 16, 10, 4, 116, 105, 109, 101, 16, 4, 26, 3,
10, 1, 100, 34, 1, 0, 24, 1,
]),
headers: vec![
RecordHeader {
key: "content-type".to_owned(),
value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#.to_vec(),
},
RecordHeader {
key: "iox-namespace".to_owned(),
value: b"namespace".to_vec(),
},
],
}]),
compression: RecordBatchCompression::NoCompression,
is_transactional: false,
timestamp_type: RecordBatchTimestampType::CreateTime,
};
assert_eq!(actual, expected);

let mut data2 = vec![];
actual.write(&mut data2).unwrap();
assert_eq!(data, data2);
}
}
12 changes: 6 additions & 6 deletions src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use time::OffsetDateTime;
/// High-level record.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Record {
pub key: Vec<u8>,
pub value: Vec<u8>,
pub key: Option<Vec<u8>>,
pub value: Option<Vec<u8>>,
pub headers: BTreeMap<String, Vec<u8>>,
pub timestamp: OffsetDateTime,
}

impl Record {
/// Returns the approximate uncompressed size of this [`Record`]
pub fn approximate_size(&self) -> usize {
self.key.len()
+ self.value.len()
self.key.as_ref().map(|k| k.len()).unwrap_or_default()
+ self.value.as_ref().map(|v| v.len()).unwrap_or_default()
+ self
.headers
.iter()
Expand All @@ -38,8 +38,8 @@ mod tests {
#[test]
fn test_approximate_size() {
let record = Record {
key: vec![0; 23],
value: vec![0; 45],
key: Some(vec![0; 23]),
value: Some(vec![0; 45]),
headers: vec![("a".to_string(), vec![0; 5]), ("b".to_string(), vec![0; 7])]
.into_iter()
.collect(),
Expand Down
4 changes: 2 additions & 2 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ async fn test_produce_consume_size_cutoff() {

pub fn large_record() -> Record {
Record {
key: b"".to_vec(),
value: vec![b'x'; 1024],
key: Some(b"".to_vec()),
value: Some(vec![b'x'; 1024]),
headers: BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
timestamp: now(),
}
Expand Down
6 changes: 3 additions & 3 deletions tests/produce_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,19 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
// add a bit more data to encourage rdkafka to actually use compression, otherwise the compressed data
// is larger than the uncompressed version and rdkafka will not use compression at all
Record {
key: vec![b'x'; 100],
key: Some(vec![b'x'; 100]),
..record
}
}
}
};
let record_2 = Record {
value: b"some value".to_vec(),
value: Some(b"some value".to_vec()),
timestamp: now(),
..record_1.clone()
};
let record_3 = Record {
value: b"more value".to_vec(),
value: Some(b"more value".to_vec()),
timestamp: now(),
..record_1.clone()
};
Expand Down
Loading