Skip to content

Commit 3208e47

Browse files
Merge pull request #176 from influxdata/crepererum/chrono
refactor: replace `time` w/ `chono`
2 parents 6c887e5 + d7e14a8 commit 3208e47

File tree

14 files changed

+72
-61
lines changed

14 files changed

+72
-61
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ documentation = "https://docs.rs/rskafka/"
2121
async-socks5 = { version = "0.5", optional = true }
2222
async-trait = "0.1"
2323
bytes = "1.1"
24+
chrono = { version = "0.4", default-features = false }
2425
crc32c = "0.6"
2526
flate2 = { version = "1", optional = true }
2627
futures = "0.3"
@@ -32,7 +33,6 @@ rand = "0.8"
3233
rustls = { version = "0.20", optional = true }
3334
snap = { version = "1", optional = true }
3435
thiserror = "1.0"
35-
time = "0.3"
3636
tokio = { version = "1.19", default-features = false, features = ["io-util", "net", "rt", "sync", "time", "macros"] }
3737
tokio-rustls = { version = "0.23", optional = true }
3838
tracing = "0.1"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use rskafka::{
3636
},
3737
record::Record,
3838
};
39-
use time::OffsetDateTime;
39+
use chrono::{TimeZone, Utc};
4040
use std::collections::BTreeMap;
4141
4242
// setup client
@@ -70,7 +70,7 @@ let record = Record {
7070
headers: BTreeMap::from([
7171
("foo".to_owned(), b"bar".to_vec()),
7272
]),
73-
timestamp: OffsetDateTime::now_utc(),
73+
timestamp: Utc.timestamp_millis(42),
7474
};
7575
partition_client.produce(vec![record], Compression::default()).await.unwrap();
7676

benches/throughput.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
time::{Duration, Instant},
88
};
99

10+
use chrono::{TimeZone, Utc};
1011
use criterion::{
1112
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
1213
};
@@ -28,7 +29,6 @@ use rskafka::{
2829
},
2930
record::Record,
3031
};
31-
use time::OffsetDateTime;
3232
use tokio::runtime::Runtime;
3333

3434
const PARALLEL_BATCH_SIZE: usize = 1_000_000;
@@ -42,7 +42,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4242
key: Some(vec![b'k'; 10]),
4343
value: Some(vec![b'x'; 10_000]),
4444
headers: BTreeMap::default(),
45-
timestamp: OffsetDateTime::now_utc(),
45+
timestamp: Utc.timestamp_millis(1337),
4646
};
4747

4848
{

src/client/consumer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,8 @@ mod tests {
360360
use std::time::Duration;
361361

362362
use assert_matches::assert_matches;
363+
use chrono::{TimeZone, Utc};
363364
use futures::{pin_mut, StreamExt};
364-
use time::OffsetDateTime;
365365
use tokio::sync::{mpsc, Mutex};
366366

367367
use crate::{
@@ -509,7 +509,7 @@ mod tests {
509509
key: Some(vec![0; 4]),
510510
value: Some(vec![0; 6]),
511511
headers: Default::default(),
512-
timestamp: OffsetDateTime::now_utc(),
512+
timestamp: Utc.timestamp_millis(1337),
513513
};
514514

515515
let (sender, receiver) = mpsc::channel(10);
@@ -576,7 +576,7 @@ mod tests {
576576
key: Some(vec![0; 4]),
577577
value: Some(vec![0; 6]),
578578
headers: Default::default(),
579-
timestamp: OffsetDateTime::now_utc(),
579+
timestamp: Utc.timestamp_millis(1337),
580580
};
581581

582582
let (sender, receiver) = mpsc::channel(10);
@@ -667,7 +667,7 @@ mod tests {
667667
key: Some(vec![0; 4]),
668668
value: Some(vec![0; 6]),
669669
headers: Default::default(),
670-
timestamp: OffsetDateTime::now_utc(),
670+
timestamp: Utc.timestamp_millis(1337),
671671
};
672672

673673
// Simulate an error on first fetch to encourage an offset update
@@ -714,7 +714,7 @@ mod tests {
714714
key: Some(vec![0; 4]),
715715
value: Some(vec![0; 6]),
716716
headers: Default::default(),
717-
timestamp: OffsetDateTime::now_utc(),
717+
timestamp: Utc.timestamp_millis(1337),
718718
};
719719

720720
// Simulate an error on first fetch to encourage an offset update

src/client/partition.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use crate::{
2222
validation::ExactlyOne,
2323
};
2424
use async_trait::async_trait;
25+
use chrono::{LocalResult, TimeZone, Utc};
2526
use std::{
2627
ops::{ControlFlow, Deref, Range},
2728
sync::Arc,
2829
};
29-
use time::OffsetDateTime;
3030
use tokio::sync::Mutex;
3131
use tracing::{error, info};
3232

@@ -540,7 +540,7 @@ fn build_produce_request(
540540
ProtocolRecord {
541541
key: record.key,
542542
value: record.value,
543-
timestamp_delta: (record.timestamp - first_timestamp).whole_milliseconds() as i64,
543+
timestamp_delta: (record.timestamp - first_timestamp).num_milliseconds() as i64,
544544
offset_delta: offset_delta as i32,
545545
headers: record
546546
.headers
@@ -573,8 +573,8 @@ fn build_produce_request(
573573
timestamp_type: RecordBatchTimestampType::CreateTime,
574574
producer_id: -1,
575575
producer_epoch: -1,
576-
first_timestamp: (first_timestamp.unix_timestamp_nanos() / 1_000_000) as i64,
577-
max_timestamp: (max_timestamp.unix_timestamp_nanos() / 1_000_000) as i64,
576+
first_timestamp: first_timestamp.timestamp_millis(),
577+
max_timestamp: max_timestamp.timestamp_millis(),
578578
records: ControlBatchOrRecords::Records(records),
579579
}]),
580580
};
@@ -730,12 +730,29 @@ fn extract_records(
730730
continue;
731731
}
732732

733-
let timestamp = OffsetDateTime::from_unix_timestamp_nanos(
734-
(batch.first_timestamp + record.timestamp_delta) as i128 * 1_000_000,
735-
)
736-
.map_err(|e| {
737-
Error::InvalidResponse(format!("Cannot parse timestamp: {}", e))
738-
})?;
733+
let timestamp_millis =
734+
match batch.first_timestamp.checked_add(record.timestamp_delta) {
735+
Some(ts) => ts,
736+
None => {
737+
return Err(Error::InvalidResponse(format!(
738+
"Timestamp overflow (first_timestamp={}, delta={}",
739+
batch.first_timestamp, record.timestamp_delta
740+
)));
741+
}
742+
};
743+
let timestamp = match Utc.timestamp_millis_opt(timestamp_millis) {
744+
LocalResult::None => {
745+
return Err(Error::InvalidResponse(format!(
746+
"Not a valid timestamp ({timestamp_millis})"
747+
)));
748+
}
749+
LocalResult::Single(ts) => ts,
750+
LocalResult::Ambiguous(a, b) => {
751+
return Err(Error::InvalidResponse(format!(
752+
"Ambiguous timestamp ({timestamp_millis}): {a} or {b}"
753+
)));
754+
}
755+
};
739756

740757
records.push(RecordAndOffset {
741758
record: Record {

src/client/producer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
//! },
5151
//! record::Record,
5252
//! };
53-
//! use time::OffsetDateTime;
53+
//! use chrono::{TimeZone, Utc};
5454
//! use std::{
5555
//! collections::BTreeMap,
5656
//! sync::Arc,
@@ -82,7 +82,7 @@
8282
//! headers: BTreeMap::from([
8383
//! ("foo".to_owned(), b"bar".to_vec()),
8484
//! ]),
85-
//! timestamp: OffsetDateTime::now_utc(),
85+
//! timestamp: Utc.timestamp_millis(42),
8686
//! };
8787
//! producer.produce(record.clone()).await.unwrap();
8888
//! # }
@@ -109,7 +109,7 @@
109109
//! },
110110
//! record::Record,
111111
//! };
112-
//! use time::OffsetDateTime;
112+
//! use chrono::{TimeZone, Utc};
113113
//! use std::{
114114
//! collections::BTreeMap,
115115
//! sync::Arc,
@@ -156,7 +156,7 @@
156156
//! headers: BTreeMap::from([
157157
//! ("foo".to_owned(), b"bar".to_vec()),
158158
//! ]),
159-
//! timestamp: OffsetDateTime::now_utc(),
159+
//! timestamp: Utc.timestamp_millis(42),
160160
//! },
161161
//! ];
162162
//! Ok((
@@ -686,9 +686,9 @@ mod tests {
686686
use crate::{
687687
client::producer::aggregator::RecordAggregator, protocol::error::Error as ProtocolError,
688688
};
689+
use chrono::{TimeZone, Utc};
689690
use futures::stream::{FuturesOrdered, FuturesUnordered};
690691
use futures::{pin_mut, FutureExt, StreamExt};
691-
use time::OffsetDateTime;
692692

693693
#[derive(Debug)]
694694
struct MockClient {
@@ -737,7 +737,7 @@ mod tests {
737737
key: Some(vec![0; 4]),
738738
value: Some(vec![0; 6]),
739739
headers: Default::default(),
740-
timestamp: OffsetDateTime::from_unix_timestamp(320).unwrap(),
740+
timestamp: Utc.timestamp_millis(320),
741741
}
742742
}
743743

src/client/producer/aggregator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,17 @@ impl StatusDeaggregator for RecordAggregatorStatusDeaggregator {
143143

144144
#[cfg(test)]
145145
mod tests {
146+
use chrono::{TimeZone, Utc};
147+
146148
use super::*;
147-
use time::OffsetDateTime;
148149

149150
#[test]
150151
fn test_record_aggregator() {
151152
let r1 = Record {
152153
key: Some(vec![0; 45]),
153154
value: Some(vec![0; 2]),
154155
headers: Default::default(),
155-
timestamp: OffsetDateTime::from_unix_timestamp(20).unwrap(),
156+
timestamp: Utc.timestamp_millis(1337),
156157
};
157158

158159
let r2 = Record {

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ pub mod record;
3737
pub mod topic;
3838

3939
// re-exports
40-
pub use time;
40+
pub use chrono;
4141

4242
mod validation;

src/record.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use std::collections::BTreeMap;
22

3-
use time::OffsetDateTime;
3+
use chrono::{DateTime, Utc};
44

55
/// High-level record.
66
#[derive(Debug, Clone, PartialEq, Eq)]
77
pub struct Record {
88
pub key: Option<Vec<u8>>,
99
pub value: Option<Vec<u8>>,
1010
pub headers: BTreeMap<String, Vec<u8>>,
11-
pub timestamp: OffsetDateTime,
11+
pub timestamp: DateTime<Utc>,
1212
}
1313

1414
impl Record {
@@ -33,6 +33,8 @@ pub struct RecordAndOffset {
3333

3434
#[cfg(test)]
3535
mod tests {
36+
use chrono::TimeZone;
37+
3638
use super::*;
3739

3840
#[test]
@@ -43,7 +45,7 @@ mod tests {
4345
headers: vec![("a".to_string(), vec![0; 5]), ("b".to_string(), vec![0; 7])]
4446
.into_iter()
4547
.collect(),
46-
timestamp: OffsetDateTime::now_utc(),
48+
timestamp: Utc.timestamp_millis(1337),
4749
};
4850

4951
assert_eq!(record.approximate_size(), 23 + 45 + 1 + 5 + 1 + 7);

tests/client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use assert_matches::assert_matches;
2+
use chrono::{TimeZone, Utc};
23
use rskafka::{
34
client::{
45
error::{Error as ClientError, ProtocolError, ServerErrorResponse},
@@ -10,7 +11,7 @@ use rskafka::{
1011
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
1112

1213
mod test_helpers;
13-
use test_helpers::{maybe_start_logging, now, random_topic_name, record, TEST_TIMEOUT};
14+
use test_helpers::{maybe_start_logging, random_topic_name, record, TEST_TIMEOUT};
1415

1516
#[tokio::test]
1617
async fn test_plain() {
@@ -368,7 +369,7 @@ async fn test_get_offset() {
368369
// use out-of order timestamps to ensure our "lastest offset" logic works
369370
let record_early = record(b"");
370371
let record_late = Record {
371-
timestamp: record_early.timestamp + time::Duration::SECOND,
372+
timestamp: record_early.timestamp + chrono::Duration::seconds(1),
372373
..record_early.clone()
373374
};
374375
let offsets = partition_client
@@ -653,6 +654,6 @@ pub fn large_record() -> Record {
653654
key: Some(b"".to_vec()),
654655
value: Some(vec![b'x'; 1024]),
655656
headers: BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
656-
timestamp: now(),
657+
timestamp: Utc.timestamp_millis(1337),
657658
}
658659
}

0 commit comments

Comments
 (0)