Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
Expand Down
24 changes: 15 additions & 9 deletions docker-compose-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,57 @@ services:
image: docker.io/bitnami/kafka:3
ports:
- "9010:9010"
- "9096:9096"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020,SECURE://:9096
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020,SECURE://localhost:9096
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
volumes:
- kafka_0_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
kafka-1:
image: docker.io/bitnami/kafka:3
ports:
- "9011:9011"
- "9097:9097"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021,SECURE://:9097
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021,SECURE://localhost:9097
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
volumes:
- kafka_1_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
kafka-2:
image: docker.io/bitnami/kafka:3
ports:
- "9012:9012"
- "9098:9098"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022,SECURE://:9098
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022,SECURE://localhost:9098
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
volumes:
- kafka_2_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
proxy:
Expand Down
10 changes: 10 additions & 0 deletions kafka_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="admin-secret";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client{};
11 changes: 11 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use error::{Error, Result};

use self::{controller::ControllerClient, partition::UnknownTopicHandling};

pub use crate::connection::SaslConfig;

#[derive(Debug, Error)]
pub enum ProduceError {
#[error("Broker error: {0}")]
Expand All @@ -44,6 +46,7 @@ pub struct ClientBuilder {
max_message_size: usize,
socks5_proxy: Option<String>,
tls_config: TlsConfig,
sasl_config: Option<SaslConfig>,
backoff_config: Arc<BackoffConfig>,
}

Expand All @@ -56,6 +59,7 @@ impl ClientBuilder {
max_message_size: 100 * 1024 * 1024, // 100MB
socks5_proxy: None,
tls_config: TlsConfig::default(),
sasl_config: None,
backoff_config: Default::default(),
}
}
Expand Down Expand Up @@ -96,6 +100,12 @@ impl ClientBuilder {
self
}

/// Setup SASL username and password. Mechanism is assumed to be PLAIN.
pub fn sasl_config(mut self, sasl_config: SaslConfig) -> Self {
self.sasl_config = Some(sasl_config);
self
}

/// Build [`Client`].
pub async fn build(self) -> Result<Client> {
let brokers = Arc::new(BrokerConnector::new(
Expand All @@ -104,6 +114,7 @@ impl ClientBuilder {
.unwrap_or_else(|| Arc::from(DEFAULT_CLIENT_ID)),
self.tls_config,
self.socks5_proxy,
self.sasl_config,
self.max_message_size,
Arc::clone(&self.backoff_config),
));
Expand Down
22 changes: 22 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
client::metadata_cache::MetadataCache,
};

pub use self::transport::SaslConfig;
pub use self::transport::TlsConfig;

mod topology;
Expand All @@ -45,6 +46,9 @@ pub enum Error {

#[error("all retries failed: {0}")]
RetryFailed(BackoffError),

#[error("Sasl handshake failed: {0}")]
SaslFailed(#[from] crate::messenger::SaslError),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -59,6 +63,7 @@ trait ConnectionHandler {
client_id: Arc<str>,
tls_config: TlsConfig,
socks5_proxy: Option<String>,
sasl_config: Option<SaslConfig>,
max_message_size: usize,
) -> Result<Arc<Self::R>>;
}
Expand Down Expand Up @@ -121,6 +126,7 @@ impl ConnectionHandler for BrokerRepresentation {
client_id: Arc<str>,
tls_config: TlsConfig,
socks5_proxy: Option<String>,
sasl_config: Option<SaslConfig>,
max_message_size: usize,
) -> Result<Arc<Self::R>> {
let url = self.url();
Expand All @@ -138,6 +144,11 @@ impl ConnectionHandler for BrokerRepresentation {

let mut messenger = Messenger::new(BufStream::new(transport), max_message_size, client_id);
messenger.sync_versions().await?;
if let Some(sasl_config) = sasl_config {
messenger
.sasl_handshake(sasl_config.mechanism(), sasl_config.auth_bytes())
.await?;
}
Ok(Arc::new(messenger))
}
}
Expand Down Expand Up @@ -177,6 +188,9 @@ pub struct BrokerConnector {
/// SOCKS5 proxy.
socks5_proxy: Option<String>,

/// SASL Configuration
sasl_config: Option<SaslConfig>,

/// Maximum message size for framing protocol.
max_message_size: usize,
}
Expand All @@ -187,6 +201,7 @@ impl BrokerConnector {
client_id: Arc<str>,
tls_config: TlsConfig,
socks5_proxy: Option<String>,
sasl_config: Option<SaslConfig>,
max_message_size: usize,
backoff_config: Arc<BackoffConfig>,
) -> Self {
Expand All @@ -199,6 +214,7 @@ impl BrokerConnector {
backoff_config,
tls_config,
socks5_proxy,
sasl_config,
max_message_size,
}
}
Expand Down Expand Up @@ -294,6 +310,7 @@ impl BrokerConnector {
Arc::clone(&self.client_id),
self.tls_config.clone(),
self.socks5_proxy.clone(),
self.sasl_config.clone(),
self.max_message_size,
)
.await?;
Expand Down Expand Up @@ -405,6 +422,7 @@ impl BrokerCache for &BrokerConnector {
&self.backoff_config,
self.tls_config.clone(),
self.socks5_proxy.clone(),
self.sasl_config.clone(),
self.max_message_size,
)
.await?;
Expand Down Expand Up @@ -440,6 +458,7 @@ async fn connect_to_a_broker_with_retry<B>(
backoff_config: &BackoffConfig,
tls_config: TlsConfig,
socks5_proxy: Option<String>,
sasl_config: Option<SaslConfig>,
max_message_size: usize,
) -> Result<Arc<B::R>>
where
Expand All @@ -457,6 +476,7 @@ where
Arc::clone(&client_id),
tls_config.clone(),
socks5_proxy.clone(),
sasl_config.clone(),
max_message_size,
)
.await;
Expand Down Expand Up @@ -773,6 +793,7 @@ mod tests {
_client_id: Arc<str>,
_tls_config: TlsConfig,
_socks5_proxy: Option<String>,
_sasl_config: Option<SaslConfig>,
_max_message_size: usize,
) -> Result<Arc<Self::R>> {
(self.conn)()
Expand Down Expand Up @@ -801,6 +822,7 @@ mod tests {
Default::default(),
Default::default(),
Default::default(),
Default::default(),
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/connection/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use tokio::net::TcpStream;
#[cfg(feature = "transport-tls")]
use tokio_rustls::{client::TlsStream, TlsConnector};

mod sasl;
pub use sasl::SaslConfig;

#[cfg(feature = "transport-tls")]
pub type TlsConfig = Option<Arc<rustls::ClientConfig>>;

Expand Down
28 changes: 28 additions & 0 deletions src/connection/transport/sasl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#[derive(Debug, Clone)]
pub enum SaslConfig {
/// SASL - PLAIN
///
/// # References
/// - <https://datatracker.ietf.org/doc/html/rfc4616>
Plain { username: String, password: String },
}

impl SaslConfig {
pub(crate) fn auth_bytes(&self) -> Vec<u8> {
match self {
Self::Plain { username, password } => {
let mut auth: Vec<u8> = vec![0];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC I was asking for this in the original PR but it got lost in the meantime: could you add a link to the SASL spec that describes the PLAIN auth. I think this is https://datatracker.ietf.org/doc/html/rfc4616.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes so much sense.

auth.extend(username.bytes());
auth.push(0);
auth.extend(password.bytes());
auth
}
}
}

pub(crate) fn mechanism(&self) -> &str {
match self {
Self::Plain { .. } => "PLAIN",
}
}
}
33 changes: 32 additions & 1 deletion src/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use crate::{
protocol::{
api_key::ApiKey,
api_version::ApiVersion,
error::Error as ApiError,
frame::{AsyncMessageRead, AsyncMessageWrite},
messages::{
ReadVersionedError, ReadVersionedType, RequestBody, RequestHeader, ResponseHeader,
WriteVersionedError, WriteVersionedType,
SaslAuthenticateRequest, SaslHandshakeRequest, WriteVersionedError, WriteVersionedType,
},
primitives::{Int16, Int32, NullableString, TaggedFields},
},
Expand Down Expand Up @@ -178,6 +179,15 @@ pub enum SyncVersionsError {
},
}

#[derive(Error, Debug)]
pub enum SaslError {
#[error("Request error: {0}")]
RequestError(#[from] RequestError),

#[error("API error: {0}")]
ApiError(#[from] ApiError),
}

impl<RW> Messenger<RW>
where
RW: AsyncRead + AsyncWrite + Send + 'static,
Expand Down Expand Up @@ -520,6 +530,27 @@ where

Err(SyncVersionsError::NoWorkingVersion)
}

pub async fn sasl_handshake(
&self,
mechanism: &str,
auth_bytes: Vec<u8>,
) -> Result<(), SaslError> {
let req = SaslHandshakeRequest::new(mechanism);
let resp = self.request(req).await?;
if let Some(err) = resp.error_code {
return Err(SaslError::ApiError(err));
}
let req = SaslAuthenticateRequest::new(auth_bytes);
let resp = self.request(req).await?;
if let Some(err) = resp.error_code {
if let Some(s) = resp.error_message.0 {
debug!("Sasl auth error message: {s}");
}
return Err(SaslError::ApiError(err));
}
Ok(())
}
}

impl<RW> Drop for Messenger<RW> {
Expand Down
2 changes: 2 additions & 0 deletions src/protocol/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ mod metadata;
pub use metadata::*;
mod produce;
pub use produce::*;
mod sasl_msg;
pub use sasl_msg::*;
#[cfg(test)]
mod test_utils;

Expand Down
Loading