@@ -24,8 +24,6 @@ use std::io::{Cursor, Read, Write};
24
24
#[ cfg( test) ]
25
25
use proptest:: prelude:: * ;
26
26
27
- use crate :: protocol:: vec_builder:: DEFAULT_BLOCK_SIZE ;
28
-
29
27
use super :: {
30
28
primitives:: { Int16 , Int32 , Int64 , Int8 , Varint , Varlong } ,
31
29
traits:: { ReadError , ReadType , WriteError , WriteType } ,
94
92
pub struct Record {
95
93
pub timestamp_delta : i64 ,
96
94
pub offset_delta : i32 ,
97
- pub key : Vec < u8 > ,
98
- pub value : Vec < u8 > ,
95
+ pub key : Option < Vec < u8 > > ,
96
+ pub value : Option < Vec < u8 > > ,
99
97
pub headers : Vec < RecordHeader > ,
100
98
}
101
99
@@ -119,18 +117,26 @@ where
119
117
let offset_delta = Varint :: read ( reader) ?. 0 ;
120
118
121
119
// key
122
- let len = Varint :: read ( reader) ?;
123
- let len = usize:: try_from ( len. 0 ) . map_err ( |e| ReadError :: Malformed ( Box :: new ( e) ) ) ?;
124
- let mut key = VecBuilder :: new ( len) ;
125
- key = key. read_exact ( reader) ?;
126
- let key = key. into ( ) ;
120
+ let len = Varint :: read ( reader) ?. 0 ;
121
+ let key = if len == -1 {
122
+ None
123
+ } else {
124
+ let len = usize:: try_from ( len) . map_err ( |e| ReadError :: Malformed ( Box :: new ( e) ) ) ?;
125
+ let mut key = VecBuilder :: new ( len) ;
126
+ key = key. read_exact ( reader) ?;
127
+ Some ( key. into ( ) )
128
+ } ;
127
129
128
130
// value
129
- let len = Varint :: read ( reader) ?;
130
- let len = usize:: try_from ( len. 0 ) . map_err ( |e| ReadError :: Malformed ( Box :: new ( e) ) ) ?;
131
- let mut value = VecBuilder :: new ( len) ;
132
- value = value. read_exact ( reader) ?;
133
- let value = value. into ( ) ;
131
+ let len = Varint :: read ( reader) ?. 0 ;
132
+ let value = if len == -1 {
133
+ None
134
+ } else {
135
+ let len = usize:: try_from ( len) . map_err ( |e| ReadError :: Malformed ( Box :: new ( e) ) ) ?;
136
+ let mut value = VecBuilder :: new ( len) ;
137
+ value = value. read_exact ( reader) ?;
138
+ Some ( value. into ( ) )
139
+ } ;
134
140
135
141
// headers
136
142
// Note: This is NOT a normal array but uses a Varint instead.
@@ -179,14 +185,29 @@ where
179
185
Varint ( self . offset_delta ) . write ( & mut data) ?;
180
186
181
187
// key
182
- let l = i32:: try_from ( self . key . len ( ) ) . map_err ( |e| WriteError :: Malformed ( Box :: new ( e) ) ) ?;
183
- Varint ( l) . write ( & mut data) ?;
184
- data. write_all ( & self . key ) ?;
188
+ match & self . key {
189
+ Some ( key) => {
190
+ let l = i32:: try_from ( key. len ( ) ) . map_err ( |e| WriteError :: Malformed ( Box :: new ( e) ) ) ?;
191
+ Varint ( l) . write ( & mut data) ?;
192
+ data. write_all ( key) ?;
193
+ }
194
+ None => {
195
+ Varint ( -1 ) . write ( & mut data) ?;
196
+ }
197
+ }
185
198
186
199
// value
187
- let l = i32:: try_from ( self . value . len ( ) ) . map_err ( |e| WriteError :: Malformed ( Box :: new ( e) ) ) ?;
188
- Varint ( l) . write ( & mut data) ?;
189
- data. write_all ( & self . value ) ?;
200
+ match & self . value {
201
+ Some ( value) => {
202
+ let l =
203
+ i32:: try_from ( value. len ( ) ) . map_err ( |e| WriteError :: Malformed ( Box :: new ( e) ) ) ?;
204
+ Varint ( l) . write ( & mut data) ?;
205
+ data. write_all ( value) ?;
206
+ }
207
+ None => {
208
+ Varint ( -1 ) . write ( & mut data) ?;
209
+ }
210
+ }
190
211
191
212
// headers
192
213
// Note: This is NOT a normal array but uses a Varint instead.
@@ -598,6 +619,7 @@ where
598
619
}
599
620
#[ cfg( feature = "compression-snappy" ) ]
600
621
RecordBatchCompression :: Snappy => {
622
+ use crate :: protocol:: vec_builder:: DEFAULT_BLOCK_SIZE ;
601
623
use snap:: raw:: { decompress_len, Decoder } ;
602
624
603
625
// Construct the input for the raw decoder.
@@ -924,8 +946,8 @@ mod tests {
924
946
records : ControlBatchOrRecords :: Records ( vec ! [ Record {
925
947
timestamp_delta: 0 ,
926
948
offset_delta: 0 ,
927
- key: vec![ ] ,
928
- value: b"hello kafka" . to_vec( ) ,
949
+ key: Some ( vec![ ] ) ,
950
+ value: Some ( b"hello kafka" . to_vec( ) ) ,
929
951
headers: vec![ RecordHeader {
930
952
key: "foo" . to_owned( ) ,
931
953
value: b"bar" . to_vec( ) ,
@@ -970,8 +992,8 @@ mod tests {
970
992
records : ControlBatchOrRecords :: Records ( vec ! [ Record {
971
993
timestamp_delta: 0 ,
972
994
offset_delta: 0 ,
973
- key: vec![ b'x' ; 100 ] ,
974
- value: b"hello kafka" . to_vec( ) ,
995
+ key: Some ( vec![ b'x' ; 100 ] ) ,
996
+ value: Some ( b"hello kafka" . to_vec( ) ) ,
975
997
headers: vec![ RecordHeader {
976
998
key: "foo" . to_owned( ) ,
977
999
value: b"bar" . to_vec( ) ,
@@ -1020,8 +1042,8 @@ mod tests {
1020
1042
records : ControlBatchOrRecords :: Records ( vec ! [ Record {
1021
1043
timestamp_delta: 0 ,
1022
1044
offset_delta: 0 ,
1023
- key: vec![ b'x' ; 100 ] ,
1024
- value: b"hello kafka" . to_vec( ) ,
1045
+ key: Some ( vec![ b'x' ; 100 ] ) ,
1046
+ value: Some ( b"hello kafka" . to_vec( ) ) ,
1025
1047
headers: vec![ RecordHeader {
1026
1048
key: "foo" . to_owned( ) ,
1027
1049
value: b"bar" . to_vec( ) ,
@@ -1070,8 +1092,8 @@ mod tests {
1070
1092
records : ControlBatchOrRecords :: Records ( vec ! [ Record {
1071
1093
timestamp_delta: 0 ,
1072
1094
offset_delta: 0 ,
1073
- key: vec![ b'x' ; 100 ] ,
1074
- value: b"hello kafka" . to_vec( ) ,
1095
+ key: Some ( vec![ b'x' ; 100 ] ) ,
1096
+ value: Some ( b"hello kafka" . to_vec( ) ) ,
1075
1097
headers: vec![ RecordHeader {
1076
1098
key: "foo" . to_owned( ) ,
1077
1099
value: b"bar" . to_vec( ) ,
@@ -1091,4 +1113,74 @@ mod tests {
1091
1113
let actual2 = RecordBatch :: read ( & mut Cursor :: new ( data2) ) . unwrap ( ) ;
1092
1114
assert_eq ! ( actual2, expected) ;
1093
1115
}
1116
+
1117
+ #[ test]
1118
+ fn test_decode_fixture_null_key ( ) {
1119
+ // This data was obtained by watching rdkafka driven by IOx.
1120
+ let data = [
1121
+ b"\x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x01 \x1a \x00 \x00 \x00 \x00 " . to_vec ( ) ,
1122
+ b"\x02 \x67 \x98 \xb9 \x54 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x01 \x7e \xbe " . to_vec ( ) ,
1123
+ b"\xdc \x91 \xf6 \x00 \x00 \x01 \x7e \xbe \xdc \x91 \xf6 \xff \xff \xff \xff \xff " . to_vec ( ) ,
1124
+ b"\xff \xff \xff \xff \xff \xff \xff \xff \xff \x00 \x00 \x00 \x01 \xce \x03 \x00 " . to_vec ( ) ,
1125
+ b"\x00 \x00 \x01 \xce \x01 \x0a \x65 \x0a \x2f \x74 \x65 \x73 \x74 \x5f \x74 \x6f " . to_vec ( ) ,
1126
+ b"\x70 \x69 \x63 \x5f \x33 \x37 \x33 \x39 \x38 \x66 \x38 \x64 \x2d \x39 \x35 \x66 " . to_vec ( ) ,
1127
+ b"\x38 \x2d \x34 \x34 \x65 \x65 \x2d \x38 \x33 \x61 \x34 \x2d \x34 \x64 \x30 \x63 " . to_vec ( ) ,
1128
+ b"\x35 \x39 \x32 \x62 \x34 \x34 \x36 \x64 \x12 \x32 \x0a \x03 \x75 \x70 \x63 \x12 " . to_vec ( ) ,
1129
+ b"\x17 \x0a \x04 \x75 \x73 \x65 \x72 \x10 \x03 \x1a \x0a \x12 \x08 \x00 \x00 \x00 " . to_vec ( ) ,
1130
+ b"\x00 \x00 \x00 \xf0 \x3f \x22 \x01 \x00 \x12 \x10 \x0a \x04 \x74 \x69 \x6d \x65 " . to_vec ( ) ,
1131
+ b"\x10 \x04 \x1a \x03 \x0a \x01 \x64 \x22 \x01 \x00 \x18 \x01 \x04 \x18 \x63 \x6f " . to_vec ( ) ,
1132
+ b"\x6e \x74 \x65 \x6e \x74 \x2d \x74 \x79 \x70 \x65 \xa4 \x01 \x61 \x70 \x70 \x6c " . to_vec ( ) ,
1133
+ b"\x69 \x63 \x61 \x74 \x69 \x6f \x6e \x2f \x78 \x2d \x70 \x72 \x6f \x74 \x6f \x62 " . to_vec ( ) ,
1134
+ b"\x75 \x66 \x3b \x20 \x73 \x63 \x68 \x65 \x6d \x61 \x3d \x22 \x69 \x6e \x66 \x6c " . to_vec ( ) ,
1135
+ b"\x75 \x78 \x64 \x61 \x74 \x61 \x2e \x69 \x6f \x78 \x2e \x77 \x72 \x69 \x74 \x65 " . to_vec ( ) ,
1136
+ b"\x5f \x62 \x75 \x66 \x66 \x65 \x72 \x2e \x76 \x31 \x2e \x57 \x72 \x69 \x74 \x65 " . to_vec ( ) ,
1137
+ b"\x42 \x75 \x66 \x66 \x65 \x72 \x50 \x61 \x79 \x6c \x6f \x61 \x64 \x22 \x1a \x69 " . to_vec ( ) ,
1138
+ b"\x6f \x78 \x2d \x6e \x61 \x6d \x65 \x73 \x70 \x61 \x63 \x65 \x12 \x6e \x61 \x6d " . to_vec ( ) ,
1139
+ b"\x65 \x73 \x70 \x61 \x63 \x65 " . to_vec ( ) ,
1140
+ ]
1141
+ . concat ( ) ;
1142
+
1143
+ let actual = RecordBatch :: read ( & mut Cursor :: new ( data. clone ( ) ) ) . unwrap ( ) ;
1144
+ let expected = RecordBatch {
1145
+ base_offset : 0 ,
1146
+ partition_leader_epoch : 0 ,
1147
+ last_offset_delta : 0 ,
1148
+ first_timestamp : 1643879633398 ,
1149
+ max_timestamp : 1643879633398 ,
1150
+ producer_id : -1 ,
1151
+ producer_epoch : -1 ,
1152
+ base_sequence : -1 ,
1153
+ records : ControlBatchOrRecords :: Records ( vec ! [ Record {
1154
+ timestamp_delta: 0 ,
1155
+ offset_delta: 0 ,
1156
+ key: None ,
1157
+ value: Some ( vec![
1158
+ 10 , 101 , 10 , 47 , 116 , 101 , 115 , 116 , 95 , 116 , 111 , 112 , 105 , 99 , 95 , 51 , 55 ,
1159
+ 51 , 57 , 56 , 102 , 56 , 100 , 45 , 57 , 53 , 102 , 56 , 45 , 52 , 52 , 101 , 101 , 45 , 56 ,
1160
+ 51 , 97 , 52 , 45 , 52 , 100 , 48 , 99 , 53 , 57 , 50 , 98 , 52 , 52 , 54 , 100 , 18 , 50 , 10 ,
1161
+ 3 , 117 , 112 , 99 , 18 , 23 , 10 , 4 , 117 , 115 , 101 , 114 , 16 , 3 , 26 , 10 , 18 , 8 , 0 , 0 ,
1162
+ 0 , 0 , 0 , 0 , 240 , 63 , 34 , 1 , 0 , 18 , 16 , 10 , 4 , 116 , 105 , 109 , 101 , 16 , 4 , 26 , 3 ,
1163
+ 10 , 1 , 100 , 34 , 1 , 0 , 24 , 1 ,
1164
+ ] ) ,
1165
+ headers: vec![
1166
+ RecordHeader {
1167
+ key: "content-type" . to_owned( ) ,
1168
+ value: br#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""# . to_vec( ) ,
1169
+ } ,
1170
+ RecordHeader {
1171
+ key: "iox-namespace" . to_owned( ) ,
1172
+ value: b"namespace" . to_vec( ) ,
1173
+ } ,
1174
+ ] ,
1175
+ } ] ) ,
1176
+ compression : RecordBatchCompression :: NoCompression ,
1177
+ is_transactional : false ,
1178
+ timestamp_type : RecordBatchTimestampType :: CreateTime ,
1179
+ } ;
1180
+ assert_eq ! ( actual, expected) ;
1181
+
1182
+ let mut data2 = vec ! [ ] ;
1183
+ actual. write ( & mut data2) . unwrap ( ) ;
1184
+ assert_eq ! ( data, data2) ;
1185
+ }
1094
1186
}
0 commit comments