Skip to content

Commit b3a9e7a

Browse files
vojtechkralscanterog
authored andcommitted
mocking: Add support for mock cluster reference from Client configured with test.mock.num.brokers
1 parent 8d7d98a commit b3a9e7a

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

src/client.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
3030
use crate::groups::GroupList;
3131
use crate::log::{debug, error, info, trace, warn};
3232
use crate::metadata::Metadata;
33+
use crate::mocking::MockCluster;
3334
use crate::statistics::Statistics;
3435
use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
3536

@@ -400,6 +401,20 @@ impl<C: ClientContext> Client<C> {
400401
}
401402
}
402403

404+
/// If this client was configured with `test.mock.num.brokers`,
405+
/// this will return a [`MockCluster`] instance associated with this client,
406+
/// otherwise `None` is returned.
407+
///
408+
/// [`MockCluster`]: crate::mocking::MockCluster
409+
pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
410+
let mc = unsafe { rdsys::rd_kafka_handle_mock_cluster(self.native.ptr()) };
411+
if !mc.is_null() {
412+
Some(MockCluster::new_ref(mc, self))
413+
} else {
414+
None
415+
}
416+
}
417+
403418
/// Returns a NativeTopic from the current client. The NativeTopic shouldn't outlive the client
404419
/// it was generated from.
405420
pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {

src/mocking.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,25 @@ use crate::client::Client;
2323
use crate::config::ClientConfig;
2424
use crate::error::{IsError, KafkaError, KafkaResult};
2525
use crate::producer::DefaultProducerContext;
26+
use crate::ClientContext;
27+
28+
/// Used internally by `MockCluster` to distinguish whether the mock cluster is owned or referenced.
29+
///
30+
/// The mock cluster can be created in two ways:
31+
///
32+
/// - With `rd_kafka_mock_cluster_new()`. In this case the caller of the c-tor is responsible
33+
/// for destroying the returned mock cluster instance.
34+
///
35+
/// - By setting `test.mock.num.brokers` in a configuration of a producer/consumer client.
36+
/// In this case, the client creates the mock cluster internally and destroys it in its d-tor,
37+
/// and we only hold a reference to the mock cluster obtained with `rd_kafka_handle_mock_cluster()` (cf. `Client::mock_cluser()`).
38+
///
39+
/// In this case, we **must neither** destroy the mock clsuter in `MockCluster`'s `drop()`,
40+
/// **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
41+
enum MockClusterClient<'c, C: ClientContext> {
42+
Owned(Client<C>),
43+
Ref(&'c Client<C>),
44+
}
2645

2746
/// Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of
2847
/// Kafka protocol operations, error injection, etc.
@@ -38,10 +57,18 @@ use crate::producer::DefaultProducerContext;
3857
/// - High-level balanced consumer groups with offset commits
3958
/// - Topic Metadata and auto creation
4059
///
60+
/// The mock cluster can be either created with [`MockCluster::new()`]
61+
/// or by configuring the `test.mock.num.brokers` property when creating a producer/consumer.
62+
/// This will override that producer/consumer's bootstrap servers setting and internally
63+
/// create a mock cluster. You can then obtain this mock cluster using [`Client::mock_cluster()`].
64+
///
4165
/// Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
42-
pub struct MockCluster {
66+
///
67+
/// [`MockCluster::new()`]: MockCluster::new()
68+
/// [`Client::mock_cluster()`]: crate::client::Client::mock_cluster()
69+
pub struct MockCluster<'c, C: ClientContext> {
4370
mock_cluster: *mut RDKafkaMockCluster,
44-
_client: Client<DefaultProducerContext>,
71+
client: MockClusterClient<'c, C>,
4572
}
4673

4774
/// Utility macro to simplify returns for operations done on the mock API
@@ -62,7 +89,7 @@ pub enum MockCoordinator {
6289
Group(String),
6390
}
6491

65-
impl MockCluster {
92+
impl MockCluster<'static, DefaultProducerContext> {
6693
/// Create new mock cluster with a given number of brokers
6794
pub fn new(broker_count: i32) -> KafkaResult<Self> {
6895
let config = ClientConfig::new();
@@ -84,9 +111,21 @@ impl MockCluster {
84111

85112
Ok(MockCluster {
86113
mock_cluster,
87-
_client: client,
114+
client: MockClusterClient::Owned(client),
88115
})
89116
}
117+
}
118+
119+
impl<'c, C> MockCluster<'c, C>
120+
where
121+
C: ClientContext,
122+
{
123+
pub(crate) fn new_ref(mock_cluster: *mut RDKafkaMockCluster, client: &'c Client<C>) -> Self {
124+
Self {
125+
mock_cluster,
126+
client: MockClusterClient::Ref(client),
127+
}
128+
}
90129

91130
/// Obtain the bootstrap address for the mock cluster
92131
pub fn bootstrap_servers(&self) -> String {
@@ -275,21 +314,26 @@ impl MockCluster {
275314
}
276315
}
277316

278-
impl Drop for MockCluster {
317+
impl<'c, C> Drop for MockCluster<'c, C>
318+
where
319+
C: ClientContext,
320+
{
279321
fn drop(&mut self) {
280-
unsafe {
281-
rdsys::rd_kafka_mock_cluster_destroy(self.mock_cluster);
322+
if let MockClusterClient::Owned(..) = self.client {
323+
unsafe {
324+
rdsys::rd_kafka_mock_cluster_destroy(self.mock_cluster);
325+
}
282326
}
283327
}
284328
}
285329

286330
#[cfg(test)]
287331
mod tests {
288-
use tokio;
289-
use crate::Message;
332+
use crate::consumer::{Consumer, StreamConsumer};
290333
use crate::message::ToBytes;
291334
use crate::producer::{FutureProducer, FutureRecord};
292-
use crate::consumer::{Consumer, StreamConsumer};
335+
use crate::Message;
336+
use tokio;
293337

294338
use super::*;
295339

0 commit comments

Comments
 (0)