-
Notifications
You must be signed in to change notification settings - Fork 44
WIP: Add Sasl authentication - PLAIN #129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/protocol/messages/sasl_auth.rs
Outdated
{ | ||
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> { | ||
let v = version.0 .0; | ||
assert!(v <= 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not in line w/ API_VERSION_RANGE
: you don't support version 0.
src/protocol/messages/sasl_auth.rs
Outdated
version: ApiVersion, | ||
) -> Result<(), WriteVersionedError> { | ||
let v = version.0 .0; | ||
assert!(v == 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is version 1 not supported?
Edit: reading https://kafka.apache.org/protocol#sasl_handshake again I think we can also support version 0 for the SaslAuthenticate
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super familiar with the protocol.
Based on this:
If SaslHandshakeRequest version is v0, a series of SASL client and server tokens corresponding to the mechanism are sent as opaque packets without wrapping the messages with Kafka protocol headers.
I wasn't sure if this library could handle messages without Kafka protocol headers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sentence you're quoting is about the SaslHandshakeRequest
(for which I agree that we should only support version 1), not about the SaslAuthenticateRequest
. The versions for these two request-response types are different. The version selection will be done by the Messenger
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize that your comment was regarding SaslAuthenticate
. The issue I ran in to here was that the type of the fields got modified through the different versions. For e.g. auth_bytes
was BYTES
in version 0 but become COMPACT_BYTES
from version 1 onwards. Is there an example for this is handled anywhere else in the library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. TBH we don't have a beautiful solution of it. I would go for Bytes
as a type within the struct and then do some type conversion during read/write. See this example:
rskafka/src/protocol/messages/delete_records.rs
Lines 63 to 66 in ad8de3c
#[derive(Debug)] | |
pub struct DeleteRequestTopic { | |
/// The topic name. | |
pub name: String_, |
Here the name
is an ordinary string, but is compact after a certain version. This is then handled here:
rskafka/src/protocol/messages/delete_records.rs
Lines 89 to 93 in ad8de3c
if v >= 2 { | |
CompactStringRef(&self.name.0).write(writer)? | |
} else { | |
self.name.write(writer)?; | |
} |
I think you might want to introduce CompactBytesRef
for that, so you don't have to copy the payload during type conversion. This is similar how it is done for CompactString
:
rskafka/src/protocol/primitives.rs
Lines 362 to 365 in ad8de3c
/// Represents a string whose length is expressed as a variable-length integer rather than a fixed 2-byte length. | |
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] | |
#[cfg_attr(test, derive(proptest_derive::Arbitrary))] | |
pub struct CompactString(pub String); |
rskafka/src/protocol/primitives.rs
Lines 392 to 399 in ad8de3c
impl<W> WriteType<W> for CompactString | |
where | |
W: Write, | |
{ | |
fn write(&self, writer: &mut W) -> Result<(), WriteError> { | |
CompactStringRef(&self.0).write(writer) | |
} | |
} |
(note how the "write" operation is only implemented once, because CompactString
can use CompactStringRef
)
rskafka/src/protocol/primitives.rs
Lines 401 to 417 in ad8de3c
/// Same as [`CompactString`] but contains referenced data. | |
/// | |
/// This only supports writing. | |
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] | |
pub struct CompactStringRef<'a>(pub &'a str); | |
impl<'a, W> WriteType<W> for CompactStringRef<'a> | |
where | |
W: Write, | |
{ | |
fn write(&self, writer: &mut W) -> Result<(), WriteError> { | |
let len = u64::try_from(self.0.len() + 1).map_err(WriteError::Overflow)?; | |
UnsignedVarint(len).write(writer)?; | |
writer.write_all(self.0.as_bytes())?; | |
Ok(()) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat! I will follow a similar pattern.
#[derive(Debug)] | ||
pub struct SaslHandshakeRequest { | ||
pub mechanism: String_, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice if you could copy over the docstrings for the official message description. If a field was added at a specific version, this should also be documented, e.g.
/// Some description as written in <https://kafka.apache.org/protocol>
///
/// Added in version X.
pub my_field: String_,
} | ||
|
||
#[derive(Debug)] | ||
pub struct Bytes(pub Vec<u8>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Please copy over the docs from https://kafka.apache.org/protocol#protocol_types and add a roundtrip test.
src/protocol/messages/sasl_auth.rs
Outdated
#[derive(Debug, Clone)] | ||
pub struct SaslConfig { | ||
pub username: String, | ||
pub password: String, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really a part of the underlying protocol but our own API. Should probably go to src/connection/transport/sasl.rs
. This would also allow you to use this type in CliendBuilder::sasl_config
w/o leaking protocol details. I would expect that a struct/enum instead of username+password parameters are also more future-proof (in light of all the different SASL auth methods).
src/messenger.rs
Outdated
pub async fn sasl_handshake(&self, sasl_config: &SaslConfig) -> Result<(), RequestError> { | ||
let req = SaslHandshakeRequest::new(); | ||
self.request(req).await?; | ||
let req = SaslAuthenticateRequest::new(sasl_config); | ||
self.request(req).await?; | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if there is a SaslConfig
, then we need to call this each time after sync_versions
. I think you can just grep throw the code and see where is this done. Maybe you wanna make the two methods private and instead offer a setup
method that does both, so we don't forget to call one or the other a certain callsides.
@ramrengaswamy this is a good start 👍 |
|
||
let messenger = Arc::new(Messenger::new(BufStream::new(transport), max_message_size)); | ||
messenger.sync_versions().await?; | ||
if let Some(sasl_config) = sasl_config { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@crepererum I didn't refactor this into a separate setup
function because I wasn't sure SaslConfig
should leak into the Messenger
. Right now SaslConfig
is known to transport, connection and ClientBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I've just checked: this is the only place where we call sync_versions
and also the place where we set up SOCKS5 and TLS. So I think this is a good spot to call sasl_handshake
👍
@crepererum ... Ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks solid 👍
Now how do we test this? 😁
/// Setup SASL username and password. Mechanism is assumed to be PLAIN. | ||
pub fn sasl_config(mut self, username: &str, password: &str) -> Self { | ||
self.sasl_config = Some(SaslConfig::Plain{ | ||
username: username.to_string(), | ||
password: password.to_string(), | ||
}); | ||
self | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would take SaslConfig
as an argument here so we can extend the config in the future w/o overloading the ClientBuilder
w/ too many methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to pub use crate::connection::SaslConfig
in client/mod.rs
to publicly expose SaslConfig
src/connection/transport/sasl.rs
Outdated
let mut auth: Vec<u8> = Vec::new(); | ||
auth.push(0); | ||
auth.extend(username.bytes()); | ||
auth.push(0); | ||
auth.extend(password.bytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a link/reference for this binary format?
I think for PLAIN
it's OK to hand-roll this, in the future however I would like to offload this to another crate because I don't wanna deal w/ all the SASL crypto stuff in rskafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you okay depending on a pure rust crate such as this one? https://docs.rs/sasl/latest/sasl/
The other SASL packages are rust wrappers around C implementations.
src/protocol/messages/sasl_msg.rs
Outdated
/// | ||
/// Added in version 0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you don't have to mention that a field was always there ("Added in version 0."), but it's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid of it to keep it consistent with the rest of the codebase.
src/connection/transport/sasl.rs
Outdated
} | ||
|
||
impl SaslConfig { | ||
pub fn auth_bytes(&self) -> Vec<u8> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn auth_bytes(&self) -> Vec<u8> { | |
pub(crate) fn auth_bytes(&self) -> Vec<u8> { |
This is not a public API that falls under semantic versioning and that users should rely on. Same goes for fn mechanism
.
I am currently testing these changes against a simple application that I am running against a Confluent stream. |
@crepererum - Seems that the local Kafka and Redpanda have sasl enabled. So I added a |
pub async fn sasl_handshake( | ||
&self, | ||
mechanism: &str, | ||
auth_bytes: Vec<u8>, | ||
) -> Result<(), RequestError> { | ||
let req = SaslHandshakeRequest::new(mechanism); | ||
self.request(req).await?; | ||
let req = SaslAuthenticateRequest::new(auth_bytes); | ||
self.request(req).await?; | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was really surprised that this "just works" in our CI tests. In short: It doesn't. You have to check the error codes within the messages (SaslHandshakeResponse::error_code
and SaslAuthenticateResponse::error_code
) and return an error if the codes are set.
You may wonder how I found that out that quickly: Wireshark:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, can't believe I missed that! Fixing.
@crepererum tl;dr - I got the sasl integration test working locally but ran into an issue getting to work on CircleCI. The following command will run all Kafka integration tests (including sasl) To get Sasl integration test to work I made the following changes:
I have not used CircleCI before. It seems there is no way to volume mount into a Docker container. |
@ramrengaswamy Didn't have time to look into this today, sorry. I'll be out of office for two weeks, I'll get back to this on the 16th of May. |
BTW: we're testing against Kafka and really redpanda. Could you have a look if you can get SASL to work with redpanda as well? |
@ramrengaswamy any progress on this? |
I tried to contribute this by fixing the test for Redpanda but seems like they don't support SASL/PLAIN at the moment. (only SASL/SCRAM) My PR will be like this. #198 |
Is there any progress on this? Or is there anything that can be done to help with this? |
This is now supported, see #216. |
Closes #125
Describe your proposed changes here.
PLAIN
SASL auth.SaslHandshakeRequest
andSaslAuthenticateRequest
Bytes
(not used) andCompactBytes
.SaslConfig
struct and that can be set in theClientBuilder
and used during connection establishment.