Skip to content

Commit e68bed8

Browse files
Merge pull request #146 from influxdata/crepererum/test_produce_ts_handling
test: ensure that record timestamps can be produced/consumed out of order
2 parents 822186d + 326d9ab commit e68bed8

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

src/client/partition.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ fn build_produce_request(
399399

400400
// TODO: Retry on failure
401401

402-
// TODO: Verify this is the first timestamp in the batch and not the min
403402
let first_timestamp = records.first().unwrap().timestamp;
404403
let mut max_timestamp = first_timestamp;
405404

tests/produce_consume.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use rskafka::{
44
client::{
@@ -260,8 +260,16 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
260260
.unwrap();
261261
let partition_client = Arc::new(client.partition_client(topic_name.clone(), 1).unwrap());
262262

263+
// timestamps for records. We'll reorder the messages though to ts2, ts1, ts3
264+
let ts1 = now();
265+
let ts2 = ts1 + Duration::from_millis(1);
266+
let ts3 = ts2 + Duration::from_millis(1);
267+
263268
let record_1 = {
264-
let record = record(b"");
269+
let record = Record {
270+
timestamp: ts2,
271+
..record(b"")
272+
};
265273
match compression {
266274
Compression::NoCompression => record,
267275
#[allow(unreachable_patterns)]
@@ -277,12 +285,12 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
277285
};
278286
let record_2 = Record {
279287
value: Some(b"some value".to_vec()),
280-
timestamp: now(),
288+
timestamp: ts1,
281289
..record_1.clone()
282290
};
283291
let record_3 = Record {
284292
value: Some(b"more value".to_vec()),
285-
timestamp: now(),
293+
timestamp: ts3,
286294
..record_1.clone()
287295
};
288296

@@ -322,7 +330,11 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
322330
.zip([record_1, record_2, record_3])
323331
.map(|(offset, record)| RecordAndOffset { record, offset })
324332
.collect();
325-
assert_eq!(actual, expected);
333+
assert_eq!(
334+
actual, expected,
335+
"Records are different.\n\nActual:\n{:#?}\n\nExpected:\n{:#?}",
336+
actual, expected,
337+
);
326338
}
327339

328340
async fn produce_java(

0 commit comments

Comments
 (0)