Skip to content

Commit fad1a83

Browse files
authored
Merge pull request #245 from influxdata/crepererum/rust_180
chore: rust 1.80
2 parents 329431a + 51877d6 commit fad1a83

File tree

8 files changed

+26
-37
lines changed

8 files changed

+26
-37
lines changed

Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "rskafka"
33
version = "0.5.0"
44
edition = "2021"
5-
rust-version = "1.72"
5+
rust-version = "1.80"
66
license = "MIT OR Apache-2.0"
77
readme = "README.md"
88
keywords = [
@@ -20,7 +20,6 @@ documentation = "https://docs.rs/rskafka/"
2020

2121
[dependencies]
2222
async-socks5 = { version = "0.6", optional = true }
23-
async-trait = "0.1"
2423
bytes = "1.1"
2524
chrono = { version = "0.4", default-features = false }
2625
crc32c = "0.6.5"
@@ -45,7 +44,6 @@ criterion = { version = "0.5", features = ["async_tokio"] }
4544
dotenvy = "0.15.1"
4645
futures = "0.3"
4746
j4rs = "0.20.0"
48-
once_cell = "1.9"
4947
proptest = "1"
5048
proptest-derive = "0.5"
5149
rustls-pemfile = "2.0"

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
2-
channel = "1.78"
2+
channel = "1.80"
33
components = [ "rustfmt", "clippy" ]

src/client/controller.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use async_trait::async_trait;
21
use std::ops::ControlFlow;
32
use std::sync::Arc;
43
use tokio::sync::Mutex;
@@ -172,7 +171,6 @@ impl ControllerClient {
172171
}
173172

174173
/// Caches the cluster controller broker.
175-
#[async_trait]
176174
impl BrokerCache for &ControllerClient {
177175
type R = MessengerTransport;
178176
type E = Error;

src/client/partition.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{
2323
throttle::maybe_throttle,
2424
validation::ExactlyOne,
2525
};
26-
use async_trait::async_trait;
2726
use chrono::{LocalResult, TimeZone, Utc};
2827
use std::{
2928
ops::{ControlFlow, Deref, Range},
@@ -412,7 +411,6 @@ impl PartitionClient {
412411
}
413412

414413
/// Caches the partition leader broker.
415-
#[async_trait]
416414
impl BrokerCache for &PartitionClient {
417415
type R = MessengerTransport;
418416
type E = Error;

src/connection.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use async_trait::async_trait;
21
use rand::prelude::*;
32
use std::fmt::Display;
3+
use std::future::Future;
44
use std::ops::ControlFlow;
55
use std::sync::Arc;
66
use thiserror::Error;
@@ -75,18 +75,17 @@ impl Display for MultiError {
7575
}
7676

7777
/// How to connect to a `Transport`
78-
#[async_trait]
7978
trait ConnectionHandler {
8079
type R: RequestHandler + Send + Sync;
8180

82-
async fn connect(
81+
fn connect(
8382
&self,
8483
client_id: Arc<str>,
8584
tls_config: TlsConfig,
8685
socks5_proxy: Option<String>,
8786
sasl_config: Option<SaslConfig>,
8887
max_message_size: usize,
89-
) -> Result<Arc<Self::R>>;
88+
) -> impl Future<Output = Result<Arc<Self::R>>> + Send;
9089
}
9190

9291
/// Defines the possible request modes of metadata retrieval.
@@ -138,7 +137,6 @@ impl BrokerRepresentation {
138137
}
139138
}
140139

141-
#[async_trait]
142140
impl ConnectionHandler for BrokerRepresentation {
143141
type R = MessengerTransport;
144142

@@ -373,15 +371,13 @@ impl std::fmt::Debug for BrokerConnector {
373371
}
374372
}
375373

376-
#[async_trait]
377374
trait RequestHandler {
378-
async fn metadata_request(
375+
fn metadata_request(
379376
&self,
380377
request_params: &MetadataRequest,
381-
) -> Result<MetadataResponse, RequestError>;
378+
) -> impl Future<Output = Result<MetadataResponse, RequestError>> + Send;
382379
}
383380

384-
#[async_trait]
385381
impl RequestHandler for MessengerTransport {
386382
async fn metadata_request(
387383
&self,
@@ -415,18 +411,22 @@ impl BrokerCacheGeneration {
415411
}
416412
}
417413

418-
#[async_trait]
419414
pub trait BrokerCache: Send + Sync {
420415
type R: Send + Sync;
421416
type E: std::error::Error + Send + Sync;
422417

423-
async fn get(&self) -> Result<(Arc<Self::R>, BrokerCacheGeneration), Self::E>;
418+
fn get(
419+
&self,
420+
) -> impl Future<Output = Result<(Arc<Self::R>, BrokerCacheGeneration), Self::E>> + Send;
424421

425-
async fn invalidate(&self, reason: &'static str, gen: BrokerCacheGeneration);
422+
fn invalidate(
423+
&self,
424+
reason: &'static str,
425+
gen: BrokerCacheGeneration,
426+
) -> impl Future<Output = ()> + Send;
426427
}
427428

428429
/// BrokerConnector caches an arbitrary broker that can successfully connect.
429-
#[async_trait]
430430
impl BrokerCache for &BrokerConnector {
431431
type R = MessengerTransport;
432432
type E = Error;
@@ -602,7 +602,6 @@ mod tests {
602602
}
603603
}
604604

605-
#[async_trait]
606605
impl RequestHandler for FakeBroker {
607606
async fn metadata_request(
608607
&self,
@@ -617,7 +616,6 @@ mod tests {
617616
invalidate: Box<dyn Fn() + Send + Sync>,
618617
}
619618

620-
#[async_trait]
621619
impl BrokerCache for FakeBrokerCache {
622620
type R = FakeBroker;
623621
type E = Error;
@@ -794,7 +792,6 @@ mod tests {
794792
#[derive(Debug, PartialEq)]
795793
struct FakeConn;
796794

797-
#[async_trait]
798795
impl RequestHandler for FakeConn {
799796
async fn metadata_request(
800797
&self,
@@ -804,7 +801,6 @@ mod tests {
804801
}
805802
}
806803

807-
#[async_trait]
808804
impl ConnectionHandler for FakeBrokerRepresentation {
809805
type R = FakeConn;
810806

src/protocol/frame.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
//! # References
44
//! - <https://kafka.apache.org/protocol#protocol_common>
55
6-
use std::io::Cursor;
6+
use std::{future::Future, io::Cursor};
77

8-
use async_trait::async_trait;
98
use thiserror::Error;
109
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
1110

@@ -27,12 +26,13 @@ pub enum ReadError {
2726
MessageTooLarge { limit: usize, actual: usize },
2827
}
2928

30-
#[async_trait]
3129
pub trait AsyncMessageRead {
32-
async fn read_message(&mut self, max_message_size: usize) -> Result<Vec<u8>, ReadError>;
30+
fn read_message(
31+
&mut self,
32+
max_message_size: usize,
33+
) -> impl Future<Output = Result<Vec<u8>, ReadError>> + Send;
3334
}
3435

35-
#[async_trait]
3636
impl<R> AsyncMessageRead for R
3737
where
3838
R: AsyncRead + Send + Unpin,
@@ -85,12 +85,10 @@ pub enum WriteError {
8585
TooLarge { size: usize },
8686
}
8787

88-
#[async_trait]
8988
pub trait AsyncMessageWrite {
90-
async fn write_message(&mut self, msg: &[u8]) -> Result<(), WriteError>;
89+
fn write_message(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), WriteError>> + Send;
9190
}
9291

93-
#[async_trait]
9492
impl<W> AsyncMessageWrite for W
9593
where
9694
W: AsyncWrite + Send + Unpin,

tests/java_helper.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use std::collections::BTreeMap;
1+
use std::{collections::BTreeMap, sync::LazyLock};
22

33
use chrono::{TimeZone, Utc};
44
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
5-
use once_cell::sync::Lazy;
65
use rskafka::{
76
client::partition::Compression,
87
record::{Record, RecordAndOffset},
@@ -311,7 +310,7 @@ pub async fn consume(
311310
}
312311

313312
/// Lazy static that tracks if we already installed all JVM dependencies.
314-
static JVM_SETUP: Lazy<()> = Lazy::new(|| {
313+
static JVM_SETUP: LazyLock<()> = LazyLock::new(|| {
315314
let jvm_installation = JvmBuilder::new().build().expect("setup JVM");
316315

317316
for artifact_name in [
@@ -338,7 +337,7 @@ static JVM_SETUP: Lazy<()> = Lazy::new(|| {
338337
});
339338

340339
fn setup_jvm() -> Jvm {
341-
Lazy::force(&JVM_SETUP);
340+
LazyLock::force(&JVM_SETUP);
342341

343342
let jvm = JvmBuilder::new().build().expect("setup JVM");
344343
jvm

tests/test_helpers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ impl BrokerImpl {
4343
#[derive(Debug)]
4444
pub struct TestConfig {
4545
pub bootstrap_brokers: Vec<String>,
46+
#[allow(dead_code)]
4647
pub broker_impl: BrokerImpl,
48+
#[allow(dead_code)]
4749
pub socks5_proxy: Option<String>,
4850
}
4951

0 commit comments

Comments
 (0)