Skip to content

Commit 1de7759

Browse files
Merge pull request #93 from influxdata/crepererum/fix_nullable_key_value
fix: record key/value can be NULL
2 parents c5bd35f + bc666ee commit 1de7759

File tree

11 files changed

+163
-65
lines changed

11 files changed

+163
-65
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ let partition_client = client
6464
6565
// produce some data
6666
let record = Record {
67-
key: b"".to_vec(),
68-
value: b"hello kafka".to_vec(),
67+
key: None,
68+
value: Some(b"hello kafka".to_vec()),
6969
headers: BTreeMap::from([
7070
("foo".to_owned(), b"bar".to_vec()),
7171
]),

benches/write_throughput.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
7272
async move {
7373
let client = setup_rskafka(connection).await;
7474
let record = Record {
75-
key,
76-
value,
75+
key: Some(key),
76+
value: Some(value),
7777
headers: BTreeMap::default(),
7878
timestamp: OffsetDateTime::now_utc(),
7979
};
@@ -139,8 +139,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
139139
async move {
140140
let client = setup_rskafka(connection).await;
141141
let record = Record {
142-
key,
143-
value,
142+
key: Some(key),
143+
value: Some(value),
144144
headers: BTreeMap::default(),
145145
timestamp: OffsetDateTime::now_utc(),
146146
};

src/client/consumer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,8 @@ mod tests {
351351
#[tokio::test]
352352
async fn test_consumer() {
353353
let record = Record {
354-
key: vec![0; 4],
355-
value: vec![0; 6],
354+
key: Some(vec![0; 4]),
355+
value: Some(vec![0; 6]),
356356
headers: Default::default(),
357357
timestamp: OffsetDateTime::now_utc(),
358358
};
@@ -412,8 +412,8 @@ mod tests {
412412
#[tokio::test]
413413
async fn test_consumer_timeout() {
414414
let record = Record {
415-
key: vec![0; 4],
416-
value: vec![0; 6],
415+
key: Some(vec![0; 4]),
416+
value: Some(vec![0; 6]),
417417
headers: Default::default(),
418418
timestamp: OffsetDateTime::now_utc(),
419419
};

src/client/producer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@
7272
//!
7373
//! // produce data
7474
//! let record = Record {
75-
//! key: b"".to_vec(),
76-
//! value: b"hello kafka".to_vec(),
75+
//! key: None,
76+
//! value: Some(b"hello kafka".to_vec()),
7777
//! headers: BTreeMap::from([
7878
//! ("foo".to_owned(), b"bar".to_vec()),
7979
//! ]),
@@ -145,8 +145,8 @@
145145
//! let data = std::mem::take(&mut self.data);
146146
//! let records = vec![
147147
//! Record {
148-
//! key: b"".to_vec(),
149-
//! value: data,
148+
//! key: None,
149+
//! value: Some(data),
150150
//! headers: BTreeMap::from([
151151
//! ("foo".to_owned(), b"bar".to_vec()),
152152
//! ]),
@@ -542,8 +542,8 @@ mod tests {
542542

543543
fn record() -> Record {
544544
Record {
545-
key: vec![0; 4],
546-
value: vec![0; 6],
545+
key: Some(vec![0; 4]),
546+
value: Some(vec![0; 6]),
547547
headers: Default::default(),
548548
timestamp: OffsetDateTime::from_unix_timestamp(320).unwrap(),
549549
}

src/client/producer/aggregator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,14 @@ mod tests {
149149
#[test]
150150
fn test_record_aggregator() {
151151
let r1 = Record {
152-
key: vec![0; 45],
153-
value: vec![0; 2],
152+
key: Some(vec![0; 45]),
153+
value: Some(vec![0; 2]),
154154
headers: Default::default(),
155155
timestamp: OffsetDateTime::from_unix_timestamp(20).unwrap(),
156156
};
157157

158158
let r2 = Record {
159-
value: vec![0; 34],
159+
value: Some(vec![0; 34]),
160160
..r1.clone()
161161
};
162162

src/protocol/record.rs

Lines changed: 120 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ use std::io::{Cursor, Read, Write};
2424
#[cfg(test)]
2525
use proptest::prelude::*;
2626

27-
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
28-
2927
use super::{
3028
primitives::{Int16, Int32, Int64, Int8, Varint, Varlong},
3129
traits::{ReadError, ReadType, WriteError, WriteType},
@@ -94,8 +92,8 @@ where
9492
pub struct Record {
9593
pub timestamp_delta: i64,
9694
pub offset_delta: i32,
97-
pub key: Vec<u8>,
98-
pub value: Vec<u8>,
95+
pub key: Option<Vec<u8>>,
96+
pub value: Option<Vec<u8>>,
9997
pub headers: Vec<RecordHeader>,
10098
}
10199

@@ -119,18 +117,26 @@ where
119117
let offset_delta = Varint::read(reader)?.0;
120118

121119
// key
122-
let len = Varint::read(reader)?;
123-
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
124-
let mut key = VecBuilder::new(len);
125-
key = key.read_exact(reader)?;
126-
let key = key.into();
120+
let len = Varint::read(reader)?.0;
121+
let key = if len == -1 {
122+
None
123+
} else {
124+
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
125+
let mut key = VecBuilder::new(len);
126+
key = key.read_exact(reader)?;
127+
Some(key.into())
128+
};
127129

128130
// value
129-
let len = Varint::read(reader)?;
130-
let len = usize::try_from(len.0).map_err(|e| ReadError::Malformed(Box::new(e)))?;
131-
let mut value = VecBuilder::new(len);
132-
value = value.read_exact(reader)?;
133-
let value = value.into();
131+
let len = Varint::read(reader)?.0;
132+
let value = if len == -1 {
133+
None
134+
} else {
135+
let len = usize::try_from(len).map_err(|e| ReadError::Malformed(Box::new(e)))?;
136+
let mut value = VecBuilder::new(len);
137+
value = value.read_exact(reader)?;
138+
Some(value.into())
139+
};
134140

135141
// headers
136142
// Note: This is NOT a normal array but uses a Varint instead.
@@ -179,14 +185,29 @@ where
179185
Varint(self.offset_delta).write(&mut data)?;
180186

181187
// key
182-
let l = i32::try_from(self.key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
183-
Varint(l).write(&mut data)?;
184-
data.write_all(&self.key)?;
188+
match &self.key {
189+
Some(key) => {
190+
let l = i32::try_from(key.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
191+
Varint(l).write(&mut data)?;
192+
data.write_all(key)?;
193+
}
194+
None => {
195+
Varint(-1).write(&mut data)?;
196+
}
197+
}
185198

186199
// value
187-
let l = i32::try_from(self.value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
188-
Varint(l).write(&mut data)?;
189-
data.write_all(&self.value)?;
200+
match &self.value {
201+
Some(value) => {
202+
let l =
203+
i32::try_from(value.len()).map_err(|e| WriteError::Malformed(Box::new(e)))?;
204+
Varint(l).write(&mut data)?;
205+
data.write_all(value)?;
206+
}
207+
None => {
208+
Varint(-1).write(&mut data)?;
209+
}
210+
}
190211

191212
// headers
192213
// Note: This is NOT a normal array but uses a Varint instead.
@@ -598,6 +619,7 @@ where
598619
}
599620
#[cfg(feature = "compression-snappy")]
600621
RecordBatchCompression::Snappy => {
622+
use crate::protocol::vec_builder::DEFAULT_BLOCK_SIZE;
601623
use snap::raw::{decompress_len, Decoder};
602624

603625
// Construct the input for the raw decoder.
@@ -924,8 +946,8 @@ mod tests {
924946
records: ControlBatchOrRecords::Records(vec![Record {
925947
timestamp_delta: 0,
926948
offset_delta: 0,
927-
key: vec![],
928-
value: b"hello kafka".to_vec(),
949+
key: Some(vec![]),
950+
value: Some(b"hello kafka".to_vec()),
929951
headers: vec![RecordHeader {
930952
key: "foo".to_owned(),
931953
value: b"bar".to_vec(),
@@ -970,8 +992,8 @@ mod tests {
970992
records: ControlBatchOrRecords::Records(vec![Record {
971993
timestamp_delta: 0,
972994
offset_delta: 0,
973-
key: vec![b'x'; 100],
974-
value: b"hello kafka".to_vec(),
995+
key: Some(vec![b'x'; 100]),
996+
value: Some(b"hello kafka".to_vec()),
975997
headers: vec![RecordHeader {
976998
key: "foo".to_owned(),
977999
value: b"bar".to_vec(),
@@ -1020,8 +1042,8 @@ mod tests {
10201042
records: ControlBatchOrRecords::Records(vec![Record {
10211043
timestamp_delta: 0,
10221044
offset_delta: 0,
1023-
key: vec![b'x'; 100],
1024-
value: b"hello kafka".to_vec(),
1045+
key: Some(vec![b'x'; 100]),
1046+
value: Some(b"hello kafka".to_vec()),
10251047
headers: vec![RecordHeader {
10261048
key: "foo".to_owned(),
10271049
value: b"bar".to_vec(),
@@ -1070,8 +1092,8 @@ mod tests {
10701092
records: ControlBatchOrRecords::Records(vec![Record {
10711093
timestamp_delta: 0,
10721094
offset_delta: 0,
1073-
key: vec![b'x'; 100],
1074-
value: b"hello kafka".to_vec(),
1095+
key: Some(vec![b'x'; 100]),
1096+
value: Some(b"hello kafka".to_vec()),
10751097
headers: vec![RecordHeader {
10761098
key: "foo".to_owned(),
10771099
value: b"bar".to_vec(),
@@ -1091,4 +1113,74 @@ mod tests {
10911113
let actual2 = RecordBatch::read(&mut Cursor::new(data2)).unwrap();
10921114
assert_eq!(actual2, expected);
10931115
}
1116+
1117+
#[test]
1118+
fn test_decode_fixture_null_key() {
1119+
// This data was obtained by watching rdkafka driven by IOx.
1120+
let data = [
1121+
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x1a\x00\x00\x00\x00".to_vec(),
1122+
b"\x02\x67\x98\xb9\x54\x00\x00\x00\x00\x00\x00\x00\x00\x01\x7e\xbe".to_vec(),
1123+
b"\xdc\x91\xf6\x00\x00\x01\x7e\xbe\xdc\x91\xf6\xff\xff\xff\xff\xff".to_vec(),
1124+
b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x01\xce\x03\x00".to_vec(),
1125+
b"\x00\x00\x01\xce\x01\x0a\x65\x0a\x2f\x74\x65\x73\x74\x5f\x74\x6f".to_vec(),
1126+
b"\x70\x69\x63\x5f\x33\x37\x33\x39\x38\x66\x38\x64\x2d\x39\x35\x66".to_vec(),
1127+
b"\x38\x2d\x34\x34\x65\x65\x2d\x38\x33\x61\x34\x2d\x34\x64\x30\x63".to_vec(),
1128+
b"\x35\x39\x32\x62\x34\x34\x36\x64\x12\x32\x0a\x03\x75\x70\x63\x12".to_vec(),
1129+
b"\x17\x0a\x04\x75\x73\x65\x72\x10\x03\x1a\x0a\x12\x08\x00\x00\x00".to_vec(),
1130+
b"\x00\x00\x00\xf0\x3f\x22\x01\x00\x12\x10\x0a\x04\x74\x69\x6d\x65".to_vec(),
1131+
b"\x10\x04\x1a\x03\x0a\x01\x64\x22\x01\x00\x18\x01\x04\x18\x63\x6f".to_vec(),
1132+
b"\x6e\x74\x65\x6e\x74\x2d\x74\x79\x70\x65\xa4\x01\x61\x70\x70\x6c".to_vec(),
1133+
b"\x69\x63\x61\x74\x69\x6f\x6e\x2f\x78\x2d\x70\x72\x6f\x74\x6f\x62".to_vec(),
1134+
b"\x75\x66\x3b\x20\x73\x63\x68\x65\x6d\x61\x3d\x22\x69\x6e\x66\x6c".to_vec(),
1135+
b"\x75\x78\x64\x61\x74\x61\x2e\x69\x6f\x78\x2e\x77\x72\x69\x74\x65".to_vec(),
1136+
b"\x5f\x62\x75\x66\x66\x65\x72\x2e\x76\x31\x2e\x57\x72\x69\x74\x65".to_vec(),
1137+
b"\x42\x75\x66\x66\x65\x72\x50\x61\x79\x6c\x6f\x61\x64\x22\x1a\x69".to_vec(),
1138+
b"\x6f\x78\x2d\x6e\x61\x6d\x65\x73\x70\x61\x63\x65\x12\x6e\x61\x6d".to_vec(),
1139+
b"\x65\x73\x70\x61\x63\x65".to_vec(),
1140+
]
1141+
.concat();
1142+
1143+
let actual = RecordBatch::read(&mut Cursor::new(data.clone())).unwrap();
1144+
let expected = RecordBatch {
1145+
base_offset: 0,
1146+
partition_leader_epoch: 0,
1147+
last_offset_delta: 0,
1148+
first_timestamp: 1643879633398,
1149+
max_timestamp: 1643879633398,
1150+
producer_id: -1,
1151+
producer_epoch: -1,
1152+
base_sequence: -1,
1153+
records: ControlBatchOrRecords::Records(vec![Record {
1154+
timestamp_delta: 0,
1155+
offset_delta: 0,
1156+
key: None,
1157+
value: Some(vec![
1158+
10, 101, 10, 47, 116, 101, 115, 116, 95, 116, 111, 112, 105, 99, 95, 51, 55,
1159+
51, 57, 56, 102, 56, 100, 45, 57, 53, 102, 56, 45, 52, 52, 101, 101, 45, 56,
1160+
51, 97, 52, 45, 52, 100, 48, 99, 53, 57, 50, 98, 52, 52, 54, 100, 18, 50, 10,
1161+
3, 117, 112, 99, 18, 23, 10, 4, 117, 115, 101, 114, 16, 3, 26, 10, 18, 8, 0, 0,
1162+
0, 0, 0, 0, 240, 63, 34, 1, 0, 18, 16, 10, 4, 116, 105, 109, 101, 16, 4, 26, 3,
1163+
10, 1, 100, 34, 1, 0, 24, 1,
1164+
]),
1165+
headers: vec![
1166+
RecordHeader {
1167+
key: "content-type".to_owned(),
1168+
value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#.to_vec(),
1169+
},
1170+
RecordHeader {
1171+
key: "iox-namespace".to_owned(),
1172+
value: b"namespace".to_vec(),
1173+
},
1174+
],
1175+
}]),
1176+
compression: RecordBatchCompression::NoCompression,
1177+
is_transactional: false,
1178+
timestamp_type: RecordBatchTimestampType::CreateTime,
1179+
};
1180+
assert_eq!(actual, expected);
1181+
1182+
let mut data2 = vec![];
1183+
actual.write(&mut data2).unwrap();
1184+
assert_eq!(data, data2);
1185+
}
10941186
}

src/record.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@ use time::OffsetDateTime;
55
/// High-level record.
66
#[derive(Debug, Clone, PartialEq, Eq)]
77
pub struct Record {
8-
pub key: Vec<u8>,
9-
pub value: Vec<u8>,
8+
pub key: Option<Vec<u8>>,
9+
pub value: Option<Vec<u8>>,
1010
pub headers: BTreeMap<String, Vec<u8>>,
1111
pub timestamp: OffsetDateTime,
1212
}
1313

1414
impl Record {
1515
/// Returns the approximate uncompressed size of this [`Record`]
1616
pub fn approximate_size(&self) -> usize {
17-
self.key.len()
18-
+ self.value.len()
17+
self.key.as_ref().map(|k| k.len()).unwrap_or_default()
18+
+ self.value.as_ref().map(|v| v.len()).unwrap_or_default()
1919
+ self
2020
.headers
2121
.iter()
@@ -38,8 +38,8 @@ mod tests {
3838
#[test]
3939
fn test_approximate_size() {
4040
let record = Record {
41-
key: vec![0; 23],
42-
value: vec![0; 45],
41+
key: Some(vec![0; 23]),
42+
value: Some(vec![0; 45]),
4343
headers: vec![("a".to_string(), vec![0; 5]), ("b".to_string(), vec![0; 7])]
4444
.into_iter()
4545
.collect(),

tests/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ async fn test_produce_consume_size_cutoff() {
282282

283283
pub fn large_record() -> Record {
284284
Record {
285-
key: b"".to_vec(),
286-
value: vec![b'x'; 1024],
285+
key: Some(b"".to_vec()),
286+
value: Some(vec![b'x'; 1024]),
287287
headers: BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
288288
timestamp: now(),
289289
}

tests/produce_consume.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,19 +146,19 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
146146
// add a bit more data to encourage rdkafka to actually use compression, otherwise the compressed data
147147
// is larger than the uncompressed version and rdkafka will not use compression at all
148148
Record {
149-
key: vec![b'x'; 100],
149+
key: Some(vec![b'x'; 100]),
150150
..record
151151
}
152152
}
153153
}
154154
};
155155
let record_2 = Record {
156-
value: b"some value".to_vec(),
156+
value: Some(b"some value".to_vec()),
157157
timestamp: now(),
158158
..record_1.clone()
159159
};
160160
let record_3 = Record {
161-
value: b"more value".to_vec(),
161+
value: Some(b"more value".to_vec()),
162162
timestamp: now(),
163163
..record_1.clone()
164164
};

0 commit comments

Comments
 (0)