Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ documentation = "https://docs.rs/rskafka/"
async-socks5 = { version = "0.5", optional = true }
async-trait = "0.1"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
crc32c = "0.6"
flate2 = { version = "1", optional = true }
futures = "0.3"
Expand All @@ -32,7 +33,6 @@ rand = "0.8"
rustls = { version = "0.20", optional = true }
snap = { version = "1", optional = true }
thiserror = "1.0"
time = "0.3"
tokio = { version = "1.19", default-features = false, features = ["io-util", "net", "rt", "sync", "time", "macros"] }
tokio-rustls = { version = "0.23", optional = true }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use rskafka::{
},
record::Record,
};
use time::OffsetDateTime;
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
Expand Down Expand Up @@ -70,7 +70,7 @@ let record = Record {
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};

use chrono::{TimeZone, Utc};
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, SamplingMode,
};
Expand All @@ -28,7 +29,6 @@ use rskafka::{
},
record::Record,
};
use time::OffsetDateTime;
use tokio::runtime::Runtime;

const PARALLEL_BATCH_SIZE: usize = 1_000_000;
Expand All @@ -42,7 +42,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
key: Some(vec![b'k'; 10]),
value: Some(vec![b'x'; 10_000]),
headers: BTreeMap::default(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

{
Expand Down
10 changes: 5 additions & 5 deletions src/client/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ mod tests {
use std::time::Duration;

use assert_matches::assert_matches;
use chrono::{TimeZone, Utc};
use futures::{pin_mut, StreamExt};
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};

use crate::{
Expand Down Expand Up @@ -509,7 +509,7 @@ mod tests {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

let (sender, receiver) = mpsc::channel(10);
Expand Down Expand Up @@ -576,7 +576,7 @@ mod tests {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

let (sender, receiver) = mpsc::channel(10);
Expand Down Expand Up @@ -667,7 +667,7 @@ mod tests {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

// Simulate an error on first fetch to encourage an offset update
Expand Down Expand Up @@ -714,7 +714,7 @@ mod tests {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

// Simulate an error on first fetch to encourage an offset update
Expand Down
37 changes: 27 additions & 10 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::{
validation::ExactlyOne,
};
use async_trait::async_trait;
use chrono::{LocalResult, TimeZone, Utc};
use std::{
ops::{ControlFlow, Deref, Range},
sync::Arc,
};
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tracing::{error, info};

Expand Down Expand Up @@ -540,7 +540,7 @@ fn build_produce_request(
ProtocolRecord {
key: record.key,
value: record.value,
timestamp_delta: (record.timestamp - first_timestamp).whole_milliseconds() as i64,
timestamp_delta: (record.timestamp - first_timestamp).num_milliseconds() as i64,
offset_delta: offset_delta as i32,
headers: record
.headers
Expand Down Expand Up @@ -573,8 +573,8 @@ fn build_produce_request(
timestamp_type: RecordBatchTimestampType::CreateTime,
producer_id: -1,
producer_epoch: -1,
first_timestamp: (first_timestamp.unix_timestamp_nanos() / 1_000_000) as i64,
max_timestamp: (max_timestamp.unix_timestamp_nanos() / 1_000_000) as i64,
first_timestamp: first_timestamp.timestamp_millis(),
max_timestamp: max_timestamp.timestamp_millis(),
records: ControlBatchOrRecords::Records(records),
}]),
};
Expand Down Expand Up @@ -730,12 +730,29 @@ fn extract_records(
continue;
}

let timestamp = OffsetDateTime::from_unix_timestamp_nanos(
(batch.first_timestamp + record.timestamp_delta) as i128 * 1_000_000,
)
.map_err(|e| {
Error::InvalidResponse(format!("Cannot parse timestamp: {}", e))
})?;
let timestamp_millis =
match batch.first_timestamp.checked_add(record.timestamp_delta) {
Some(ts) => ts,
None => {
return Err(Error::InvalidResponse(format!(
"Timestamp overflow (first_timestamp={}, delta={}",
batch.first_timestamp, record.timestamp_delta
)));
}
};
let timestamp = match Utc.timestamp_millis_opt(timestamp_millis) {
LocalResult::None => {
return Err(Error::InvalidResponse(format!(
"Not a valid timestamp ({timestamp_millis})"
)));
}
LocalResult::Single(ts) => ts,
LocalResult::Ambiguous(a, b) => {
return Err(Error::InvalidResponse(format!(
"Ambiguous timestamp ({timestamp_millis}): {a} or {b}"
)));
}
};

records.push(RecordAndOffset {
record: Record {
Expand Down
12 changes: 6 additions & 6 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
//! },
//! record::Record,
//! };
//! use time::OffsetDateTime;
//! use chrono::{TimeZone, Utc};
//! use std::{
//! collections::BTreeMap,
//! sync::Arc,
Expand Down Expand Up @@ -82,7 +82,7 @@
//! headers: BTreeMap::from([
//! ("foo".to_owned(), b"bar".to_vec()),
//! ]),
//! timestamp: OffsetDateTime::now_utc(),
//! timestamp: Utc.timestamp_millis(42),
//! };
//! producer.produce(record.clone()).await.unwrap();
//! # }
Expand All @@ -109,7 +109,7 @@
//! },
//! record::Record,
//! };
//! use time::OffsetDateTime;
//! use chrono::{TimeZone, Utc};
//! use std::{
//! collections::BTreeMap,
//! sync::Arc,
Expand Down Expand Up @@ -156,7 +156,7 @@
//! headers: BTreeMap::from([
//! ("foo".to_owned(), b"bar".to_vec()),
//! ]),
//! timestamp: OffsetDateTime::now_utc(),
//! timestamp: Utc.timestamp_millis(42),
//! },
//! ];
//! Ok((
Expand Down Expand Up @@ -686,9 +686,9 @@ mod tests {
use crate::{
client::producer::aggregator::RecordAggregator, protocol::error::Error as ProtocolError,
};
use chrono::{TimeZone, Utc};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use time::OffsetDateTime;

#[derive(Debug)]
struct MockClient {
Expand Down Expand Up @@ -737,7 +737,7 @@ mod tests {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: OffsetDateTime::from_unix_timestamp(320).unwrap(),
timestamp: Utc.timestamp_millis(320),
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/client/producer/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,17 @@ impl StatusDeaggregator for RecordAggregatorStatusDeaggregator {

#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};

use super::*;
use time::OffsetDateTime;

#[test]
fn test_record_aggregator() {
let r1 = Record {
key: Some(vec![0; 45]),
value: Some(vec![0; 2]),
headers: Default::default(),
timestamp: OffsetDateTime::from_unix_timestamp(20).unwrap(),
timestamp: Utc.timestamp_millis(1337),
};

let r2 = Record {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ pub mod record;
pub mod topic;

// re-exports
pub use time;
pub use chrono;

mod validation;
8 changes: 5 additions & 3 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::BTreeMap;

use time::OffsetDateTime;
use chrono::{DateTime, Utc};

/// High-level record.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Record {
pub key: Option<Vec<u8>>,
pub value: Option<Vec<u8>>,
pub headers: BTreeMap<String, Vec<u8>>,
pub timestamp: OffsetDateTime,
pub timestamp: DateTime<Utc>,
}

impl Record {
Expand All @@ -33,6 +33,8 @@ pub struct RecordAndOffset {

#[cfg(test)]
mod tests {
use chrono::TimeZone;

use super::*;

#[test]
Expand All @@ -43,7 +45,7 @@ mod tests {
headers: vec![("a".to_string(), vec![0; 5]), ("b".to_string(), vec![0; 7])]
.into_iter()
.collect(),
timestamp: OffsetDateTime::now_utc(),
timestamp: Utc.timestamp_millis(1337),
};

assert_eq!(record.approximate_size(), 23 + 45 + 1 + 5 + 1 + 7);
Expand Down
7 changes: 4 additions & 3 deletions tests/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use assert_matches::assert_matches;
use chrono::{TimeZone, Utc};
use rskafka::{
client::{
error::{Error as ClientError, ProtocolError, ServerErrorResponse},
Expand All @@ -10,7 +11,7 @@ use rskafka::{
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};

mod test_helpers;
use test_helpers::{maybe_start_logging, now, random_topic_name, record, TEST_TIMEOUT};
use test_helpers::{maybe_start_logging, random_topic_name, record, TEST_TIMEOUT};

#[tokio::test]
async fn test_plain() {
Expand Down Expand Up @@ -368,7 +369,7 @@ async fn test_get_offset() {
// use out-of order timestamps to ensure our "lastest offset" logic works
let record_early = record(b"");
let record_late = Record {
timestamp: record_early.timestamp + time::Duration::SECOND,
timestamp: record_early.timestamp + chrono::Duration::seconds(1),
..record_early.clone()
};
let offsets = partition_client
Expand Down Expand Up @@ -653,6 +654,6 @@ pub fn large_record() -> Record {
key: Some(b"".to_vec()),
value: Some(vec![b'x'; 1024]),
headers: BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
timestamp: now(),
timestamp: Utc.timestamp_millis(1337),
}
}
7 changes: 3 additions & 4 deletions tests/java_helper.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::BTreeMap;

use chrono::{TimeZone, Utc};
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use once_cell::sync::Lazy;
use rskafka::{
client::partition::Compression,
record::{Record, RecordAndOffset},
};
use time::OffsetDateTime;

/// If `TEST_JAVA_INTEROPT` is not set, skip the calling test by returning early.
#[macro_export]
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn produce(

let mut futures = vec![];
for (topic_name, partition_index, record) in records {
let ts = (record.timestamp.unix_timestamp_nanos() / 1_000_000) as i64;
let ts = record.timestamp.timestamp_millis();
let k = String::from_utf8(record.key.unwrap()).unwrap();
let v = String::from_utf8(record.value.unwrap()).unwrap();

Expand Down Expand Up @@ -278,8 +278,7 @@ pub async fn consume(
key: Some(key.as_bytes().to_vec()),
value: Some(value.as_bytes().to_vec()),
headers,
timestamp: OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128 * 1_000_000)
.unwrap(),
timestamp: Utc.timestamp_millis(timestamp),
};
let record_and_offset = RecordAndOffset { record, offset };
results.push(record_and_offset);
Expand Down
11 changes: 6 additions & 5 deletions tests/produce_consume.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use chrono::{Duration, TimeZone, Utc};
use rskafka::{
client::{
partition::{Compression, PartitionClient, UnknownTopicHandling},
Expand All @@ -12,7 +13,7 @@ mod java_helper;
mod rdkafka_helper;
mod test_helpers;

use test_helpers::{maybe_start_logging, now, random_topic_name, record};
use test_helpers::{maybe_start_logging, random_topic_name, record};

#[tokio::test]
async fn test_produce_java_consume_java_nocompression() {
Expand Down Expand Up @@ -268,9 +269,9 @@ async fn assert_produce_consume<F1, G1, F2, G2>(
);

// timestamps for records. We'll reorder the messages though to ts2, ts1, ts3
let ts1 = now();
let ts2 = ts1 + Duration::from_millis(1);
let ts3 = ts2 + Duration::from_millis(1);
let ts1 = Utc.timestamp_millis(1337);
let ts2 = ts1 + Duration::milliseconds(1);
let ts3 = ts2 + Duration::milliseconds(1);

let record_1 = {
let record = Record {
Expand Down
Loading