Skip to content

Commit 2f371bd

Browse files
Merge pull request #216 from cognitedata/sasl_impl
feat: sasl auth impl
2 parents 8c2c097 + 59c2e87 commit 2f371bd

File tree

12 files changed

+485
-13
lines changed

12 files changed

+485
-13
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ $ docker-compose -f docker-compose-kafka.yml up
131131
in one session, and then run:
132132

133133
```console
134-
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
134+
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test
135135
```
136136

137137
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other

docker-compose-kafka.yml

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,51 +14,57 @@ services:
1414
image: docker.io/bitnami/kafka:3
1515
ports:
1616
- "9010:9010"
17+
- "9096:9096"
1718
environment:
1819
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
1920
- KAFKA_CFG_BROKER_ID=0
2021
- ALLOW_PLAINTEXT_LISTENER=yes
21-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
22-
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
23-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
22+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
23+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020,SECURE://:9096
24+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020,SECURE://localhost:9096
2425
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
2526
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
2627
volumes:
2728
- kafka_0_data:/bitnami/kafka
29+
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
2830
depends_on:
2931
- zookeeper
3032
kafka-1:
3133
image: docker.io/bitnami/kafka:3
3234
ports:
3335
- "9011:9011"
36+
- "9097:9097"
3437
environment:
3538
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
3639
- KAFKA_CFG_BROKER_ID=1
3740
- ALLOW_PLAINTEXT_LISTENER=yes
38-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
39-
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
40-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
41+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
42+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021,SECURE://:9097
43+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021,SECURE://localhost:9097
4144
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
4245
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
4346
volumes:
4447
- kafka_1_data:/bitnami/kafka
48+
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
4549
depends_on:
4650
- zookeeper
4751
kafka-2:
4852
image: docker.io/bitnami/kafka:3
4953
ports:
5054
- "9012:9012"
55+
- "9098:9098"
5156
environment:
5257
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
5358
- KAFKA_CFG_BROKER_ID=2
5459
- ALLOW_PLAINTEXT_LISTENER=yes
55-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
56-
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
57-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
60+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
61+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022,SECURE://:9098
62+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022,SECURE://localhost:9098
5863
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
5964
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
6065
volumes:
6166
- kafka_2_data:/bitnami/kafka
67+
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
6268
depends_on:
6369
- zookeeper
6470
proxy:

kafka_jaas.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
KafkaServer {
2+
org.apache.kafka.common.security.plain.PlainLoginModule required
3+
user_admin="admin-secret";
4+
};
5+
KafkaClient {
6+
org.apache.kafka.common.security.plain.PlainLoginModule required
7+
username="admin"
8+
password="admin-secret";
9+
};
10+
Client{};

src/client/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use error::{Error, Result};
2222

2323
use self::{controller::ControllerClient, partition::UnknownTopicHandling};
2424

25+
pub use crate::connection::SaslConfig;
26+
2527
#[derive(Debug, Error)]
2628
pub enum ProduceError {
2729
#[error("Broker error: {0}")]
@@ -44,6 +46,7 @@ pub struct ClientBuilder {
4446
max_message_size: usize,
4547
socks5_proxy: Option<String>,
4648
tls_config: TlsConfig,
49+
sasl_config: Option<SaslConfig>,
4750
backoff_config: Arc<BackoffConfig>,
4851
}
4952

@@ -56,6 +59,7 @@ impl ClientBuilder {
5659
max_message_size: 100 * 1024 * 1024, // 100MB
5760
socks5_proxy: None,
5861
tls_config: TlsConfig::default(),
62+
sasl_config: None,
5963
backoff_config: Default::default(),
6064
}
6165
}
@@ -96,6 +100,12 @@ impl ClientBuilder {
96100
self
97101
}
98102

103+
/// Setup SASL username and password. Mechanism is assumed to be PLAIN.
104+
pub fn sasl_config(mut self, sasl_config: SaslConfig) -> Self {
105+
self.sasl_config = Some(sasl_config);
106+
self
107+
}
108+
99109
/// Build [`Client`].
100110
pub async fn build(self) -> Result<Client> {
101111
let brokers = Arc::new(BrokerConnector::new(
@@ -104,6 +114,7 @@ impl ClientBuilder {
104114
.unwrap_or_else(|| Arc::from(DEFAULT_CLIENT_ID)),
105115
self.tls_config,
106116
self.socks5_proxy,
117+
self.sasl_config,
107118
self.max_message_size,
108119
Arc::clone(&self.backoff_config),
109120
));

src/connection.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919
client::metadata_cache::MetadataCache,
2020
};
2121

22+
pub use self::transport::SaslConfig;
2223
pub use self::transport::TlsConfig;
2324

2425
mod topology;
@@ -45,6 +46,9 @@ pub enum Error {
4546

4647
#[error("all retries failed: {0}")]
4748
RetryFailed(BackoffError),
49+
50+
#[error("Sasl handshake failed: {0}")]
51+
SaslFailed(#[from] crate::messenger::SaslError),
4852
}
4953

5054
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -59,6 +63,7 @@ trait ConnectionHandler {
5963
client_id: Arc<str>,
6064
tls_config: TlsConfig,
6165
socks5_proxy: Option<String>,
66+
sasl_config: Option<SaslConfig>,
6267
max_message_size: usize,
6368
) -> Result<Arc<Self::R>>;
6469
}
@@ -121,6 +126,7 @@ impl ConnectionHandler for BrokerRepresentation {
121126
client_id: Arc<str>,
122127
tls_config: TlsConfig,
123128
socks5_proxy: Option<String>,
129+
sasl_config: Option<SaslConfig>,
124130
max_message_size: usize,
125131
) -> Result<Arc<Self::R>> {
126132
let url = self.url();
@@ -138,6 +144,11 @@ impl ConnectionHandler for BrokerRepresentation {
138144

139145
let mut messenger = Messenger::new(BufStream::new(transport), max_message_size, client_id);
140146
messenger.sync_versions().await?;
147+
if let Some(sasl_config) = sasl_config {
148+
messenger
149+
.sasl_handshake(sasl_config.mechanism(), sasl_config.auth_bytes())
150+
.await?;
151+
}
141152
Ok(Arc::new(messenger))
142153
}
143154
}
@@ -177,6 +188,9 @@ pub struct BrokerConnector {
177188
/// SOCKS5 proxy.
178189
socks5_proxy: Option<String>,
179190

191+
/// SASL Configuration
192+
sasl_config: Option<SaslConfig>,
193+
180194
/// Maximum message size for framing protocol.
181195
max_message_size: usize,
182196
}
@@ -187,6 +201,7 @@ impl BrokerConnector {
187201
client_id: Arc<str>,
188202
tls_config: TlsConfig,
189203
socks5_proxy: Option<String>,
204+
sasl_config: Option<SaslConfig>,
190205
max_message_size: usize,
191206
backoff_config: Arc<BackoffConfig>,
192207
) -> Self {
@@ -199,6 +214,7 @@ impl BrokerConnector {
199214
backoff_config,
200215
tls_config,
201216
socks5_proxy,
217+
sasl_config,
202218
max_message_size,
203219
}
204220
}
@@ -294,6 +310,7 @@ impl BrokerConnector {
294310
Arc::clone(&self.client_id),
295311
self.tls_config.clone(),
296312
self.socks5_proxy.clone(),
313+
self.sasl_config.clone(),
297314
self.max_message_size,
298315
)
299316
.await?;
@@ -405,6 +422,7 @@ impl BrokerCache for &BrokerConnector {
405422
&self.backoff_config,
406423
self.tls_config.clone(),
407424
self.socks5_proxy.clone(),
425+
self.sasl_config.clone(),
408426
self.max_message_size,
409427
)
410428
.await?;
@@ -440,6 +458,7 @@ async fn connect_to_a_broker_with_retry<B>(
440458
backoff_config: &BackoffConfig,
441459
tls_config: TlsConfig,
442460
socks5_proxy: Option<String>,
461+
sasl_config: Option<SaslConfig>,
443462
max_message_size: usize,
444463
) -> Result<Arc<B::R>>
445464
where
@@ -457,6 +476,7 @@ where
457476
Arc::clone(&client_id),
458477
tls_config.clone(),
459478
socks5_proxy.clone(),
479+
sasl_config.clone(),
460480
max_message_size,
461481
)
462482
.await;
@@ -773,6 +793,7 @@ mod tests {
773793
_client_id: Arc<str>,
774794
_tls_config: TlsConfig,
775795
_socks5_proxy: Option<String>,
796+
_sasl_config: Option<SaslConfig>,
776797
_max_message_size: usize,
777798
) -> Result<Arc<Self::R>> {
778799
(self.conn)()
@@ -801,6 +822,7 @@ mod tests {
801822
Default::default(),
802823
Default::default(),
803824
Default::default(),
825+
Default::default(),
804826
)
805827
.await
806828
.unwrap();

src/connection/transport.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use tokio::net::TcpStream;
1010
#[cfg(feature = "transport-tls")]
1111
use tokio_rustls::{client::TlsStream, TlsConnector};
1212

13+
mod sasl;
14+
pub use sasl::SaslConfig;
15+
1316
#[cfg(feature = "transport-tls")]
1417
pub type TlsConfig = Option<Arc<rustls::ClientConfig>>;
1518

src/connection/transport/sasl.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#[derive(Debug, Clone)]
2+
pub enum SaslConfig {
3+
/// SASL - PLAIN
4+
///
5+
/// # References
6+
/// - <https://datatracker.ietf.org/doc/html/rfc4616>
7+
Plain { username: String, password: String },
8+
}
9+
10+
impl SaslConfig {
11+
pub(crate) fn auth_bytes(&self) -> Vec<u8> {
12+
match self {
13+
Self::Plain { username, password } => {
14+
let mut auth: Vec<u8> = vec![0];
15+
auth.extend(username.bytes());
16+
auth.push(0);
17+
auth.extend(password.bytes());
18+
auth
19+
}
20+
}
21+
}
22+
23+
pub(crate) fn mechanism(&self) -> &str {
24+
match self {
25+
Self::Plain { .. } => "PLAIN",
26+
}
27+
}
28+
}

src/messenger.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use crate::{
3030
protocol::{
3131
api_key::ApiKey,
3232
api_version::ApiVersion,
33+
error::Error as ApiError,
3334
frame::{AsyncMessageRead, AsyncMessageWrite},
3435
messages::{
3536
ReadVersionedError, ReadVersionedType, RequestBody, RequestHeader, ResponseHeader,
36-
WriteVersionedError, WriteVersionedType,
37+
SaslAuthenticateRequest, SaslHandshakeRequest, WriteVersionedError, WriteVersionedType,
3738
},
3839
primitives::{Int16, Int32, NullableString, TaggedFields},
3940
},
@@ -178,6 +179,15 @@ pub enum SyncVersionsError {
178179
},
179180
}
180181

182+
#[derive(Error, Debug)]
183+
pub enum SaslError {
184+
#[error("Request error: {0}")]
185+
RequestError(#[from] RequestError),
186+
187+
#[error("API error: {0}")]
188+
ApiError(#[from] ApiError),
189+
}
190+
181191
impl<RW> Messenger<RW>
182192
where
183193
RW: AsyncRead + AsyncWrite + Send + 'static,
@@ -520,6 +530,27 @@ where
520530

521531
Err(SyncVersionsError::NoWorkingVersion)
522532
}
533+
534+
pub async fn sasl_handshake(
535+
&self,
536+
mechanism: &str,
537+
auth_bytes: Vec<u8>,
538+
) -> Result<(), SaslError> {
539+
let req = SaslHandshakeRequest::new(mechanism);
540+
let resp = self.request(req).await?;
541+
if let Some(err) = resp.error_code {
542+
return Err(SaslError::ApiError(err));
543+
}
544+
let req = SaslAuthenticateRequest::new(auth_bytes);
545+
let resp = self.request(req).await?;
546+
if let Some(err) = resp.error_code {
547+
if let Some(s) = resp.error_message.0 {
548+
debug!("Sasl auth error message: {s}");
549+
}
550+
return Err(SaslError::ApiError(err));
551+
}
552+
Ok(())
553+
}
523554
}
524555

525556
impl<RW> Drop for Messenger<RW> {

src/protocol/messages/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ mod metadata;
3636
pub use metadata::*;
3737
mod produce;
3838
pub use produce::*;
39+
mod sasl_msg;
40+
pub use sasl_msg::*;
3941
#[cfg(test)]
4042
mod test_utils;
4143

0 commit comments

Comments
 (0)