Skip to content

Commit 8678dfe

Browse files
Merge pull request #189 from influxdata/dependabot/cargo/rdkafka-0.29
chore(deps): update rdkafka requirement from 0.28 to 0.29
2 parents 4f05f3b + 035c9e1 commit 8678dfe

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ thiserror = "1.0"
3636
tokio = { version = "1.19", default-features = false, features = ["io-util", "net", "rt", "sync", "time", "macros"] }
3737
tokio-rustls = { version = "0.23", optional = true }
3838
tracing = "0.1"
39-
zstd = { version = "0.10", optional = true }
39+
zstd = { version = "0.11", optional = true }
4040

4141
[dev-dependencies]
4242
assert_matches = "1.5"
@@ -49,7 +49,7 @@ procspawn = "0.10"
4949
proptest = "1"
5050
proptest-derive = "0.3"
5151
rustls-pemfile = "1.0"
52-
rdkafka = { version = "0.28", default-features = false, features = ["libz", "tokio", "zstd"] }
52+
rdkafka = { version = "0.29", default-features = false, features = ["libz", "tokio", "zstd"] }
5353
tokio = { version = "1.14", features = ["macros", "rt-multi-thread"] }
5454
tracing-log = "0.1"
5555
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

tests/rdkafka_helper.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use chrono::{TimeZone, Utc};
44
use futures::{StreamExt, TryStreamExt};
55
use rdkafka::{
66
consumer::{Consumer, StreamConsumer},
7-
message::{Headers, OwnedHeaders},
7+
message::{Header, Headers, OwnedHeaders},
88
producer::{FutureProducer, FutureRecord},
99
util::Timeout,
1010
ClientConfig, Message, TopicPartitionList,
@@ -49,7 +49,10 @@ pub async fn produce(
4949
for (topic_name, partition_index, record) in records {
5050
let mut headers = OwnedHeaders::new();
5151
for (k, v) in record.headers {
52-
headers = headers.add(&k, &v);
52+
headers = headers.insert(Header {
53+
key: &k,
54+
value: Some(&v),
55+
});
5356
}
5457

5558
let mut f_record = FutureRecord::to(&topic_name)
@@ -104,8 +107,8 @@ pub async fn consume(
104107
.map(|headers| {
105108
(0..headers.count())
106109
.map(|i| {
107-
let (k, v) = headers.get(i).unwrap();
108-
(k.to_owned(), v.to_vec())
110+
let header = headers.get(i);
111+
(header.key.to_owned(), header.value.unwrap().to_vec())
109112
})
110113
.collect()
111114
})

0 commit comments

Comments
 (0)