@@ -25,7 +25,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
25
25
use crate :: producer:: DefaultProducerContext ;
26
26
use crate :: ClientContext ;
27
27
28
- /// Used internally by `MockCluster` to distinguish whether the mock cluster is owned or referenced .
28
+ /// Used internally by `MockCluster` to distinguish whether the mock cluster is owned or borrowed .
29
29
///
30
30
/// The mock cluster can be created in two ways:
31
31
///
@@ -34,13 +34,13 @@ use crate::ClientContext;
34
34
///
35
35
/// - By setting `test.mock.num.brokers` in a configuration of a producer/consumer client.
36
36
/// In this case, the client creates the mock cluster internally and destroys it in its d-tor,
37
- /// and we only hold a reference to the mock cluster obtained with `rd_kafka_handle_mock_cluster()` (cf. `Client::mock_cluser ()`).
37
+ /// and we only hold a reference to the mock cluster obtained with `rd_kafka_handle_mock_cluster()` (cf. `Client::mock_cluster ()`).
38
38
///
39
- /// In this case, we **must neither** destroy the mock clsuter in `MockCluster`'s `drop()`,
39
+ /// In this case, we **must neither** destroy the mock cluster in `MockCluster`'s `drop()`,
40
40
/// **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
41
41
enum MockClusterClient < ' c , C : ClientContext > {
42
42
Owned ( Client < C > ) ,
43
- Ref ( & ' c Client < C > ) ,
43
+ Borrowed ( & ' c Client < C > ) ,
44
44
}
45
45
46
46
/// Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of
@@ -90,7 +90,7 @@ pub enum MockCoordinator {
90
90
}
91
91
92
92
impl MockCluster < ' static , DefaultProducerContext > {
93
- /// Create new mock cluster with a given number of brokers
93
+ /// Creates a new mock cluster with the given number of brokers
94
94
pub fn new ( broker_count : i32 ) -> KafkaResult < Self > {
95
95
let config = ClientConfig :: new ( ) ;
96
96
let native_config = config. create_native_config ( ) ?;
@@ -103,8 +103,8 @@ impl MockCluster<'static, DefaultProducerContext> {
103
103
context,
104
104
) ?;
105
105
106
- let kafka_ptr = client . native_ptr ( ) ;
107
- let mock_cluster = unsafe { rdsys:: rd_kafka_mock_cluster_new ( kafka_ptr , broker_count) } ;
106
+ let mock_cluster =
107
+ unsafe { rdsys:: rd_kafka_mock_cluster_new ( client . native_ptr ( ) , broker_count) } ;
108
108
if mock_cluster. is_null ( ) {
109
109
return Err ( KafkaError :: MockCluster ( rdsys:: RDKafkaErrorCode :: Fail ) ) ;
110
110
}
@@ -120,20 +120,24 @@ impl<'c, C> MockCluster<'c, C>
120
120
where
121
121
C : ClientContext ,
122
122
{
123
- pub ( crate ) fn new_ref ( mock_cluster : * mut RDKafkaMockCluster , client : & ' c Client < C > ) -> Self {
124
- Self {
125
- mock_cluster,
126
- client : MockClusterClient :: Ref ( client) ,
123
+ /// Returns the mock cluster associated with the given client if any
124
+ pub ( crate ) fn from_client ( client : & ' c Client < C > ) -> Option < Self > {
125
+ let mock_cluster = unsafe { rdsys:: rd_kafka_handle_mock_cluster ( client. native_ptr ( ) ) } ;
126
+ if mock_cluster. is_null ( ) {
127
+ return None ;
127
128
}
129
+
130
+ Some ( MockCluster {
131
+ mock_cluster,
132
+ client : MockClusterClient :: Borrowed ( client) ,
133
+ } )
128
134
}
129
135
130
- /// Obtain the bootstrap address for the mock cluster
136
+ /// Returns the mock cluster's bootstrap.servers list
131
137
pub fn bootstrap_servers ( & self ) -> String {
132
- let raw =
138
+ let bootstrap =
133
139
unsafe { CStr :: from_ptr ( rdsys:: rd_kafka_mock_cluster_bootstraps ( self . mock_cluster ) ) } ;
134
- raw. to_str ( )
135
- . expect ( "Unexpected non-Unicode characters in bootstrap servers" )
136
- . to_string ( )
140
+ bootstrap. to_string_lossy ( ) . to_string ( )
137
141
}
138
142
139
143
/// Create a topic
@@ -147,20 +151,20 @@ where
147
151
partition_count : i32 ,
148
152
replication_factor : i32 ,
149
153
) -> KafkaResult < ( ) > {
150
- let raw_topic = CString :: new ( topic) . unwrap ( ) ;
154
+ let topic_c = CString :: new ( topic) ? ;
151
155
return_mock_op ! {
152
156
unsafe {
153
157
rdsys:: rd_kafka_mock_topic_create(
154
158
self . mock_cluster,
155
- raw_topic . as_ptr( ) ,
159
+ topic_c . as_ptr( ) ,
156
160
partition_count,
157
161
replication_factor,
158
162
)
159
163
}
160
164
}
161
165
}
162
166
163
- /// Sets the parititon leader
167
+ /// Sets the partition leader
164
168
///
165
169
/// The topic will be created if it does not exist.
166
170
///
@@ -171,22 +175,22 @@ where
171
175
partition : i32 ,
172
176
broker_id : Option < i32 > ,
173
177
) -> KafkaResult < ( ) > {
174
- let raw_topic = CString :: new ( topic) . unwrap ( ) ;
178
+ let topic_c = CString :: new ( topic) ? ;
175
179
let broker_id = broker_id. unwrap_or ( -1 ) ;
176
180
177
181
return_mock_op ! {
178
182
unsafe {
179
183
rdsys:: rd_kafka_mock_partition_set_leader(
180
184
self . mock_cluster,
181
- raw_topic . as_ptr( ) ,
185
+ topic_c . as_ptr( ) ,
182
186
partition,
183
187
broker_id,
184
188
)
185
189
}
186
190
}
187
191
}
188
192
189
- /// Sets the partitions preferred replica / follower.
193
+ /// Sets the partition's preferred replica / follower.
190
194
///
191
195
/// The topic will be created if it does not exist.
192
196
///
@@ -197,37 +201,37 @@ where
197
201
partition : i32 ,
198
202
broker_id : i32 ,
199
203
) -> KafkaResult < ( ) > {
200
- let raw_topic = CString :: new ( topic) . unwrap ( ) ;
204
+ let topic_c = CString :: new ( topic) ? ;
201
205
202
206
return_mock_op ! {
203
207
unsafe {
204
208
rdsys:: rd_kafka_mock_partition_set_follower(
205
- self . mock_cluster, raw_topic . as_ptr( ) , partition, broker_id)
209
+ self . mock_cluster, topic_c . as_ptr( ) , partition, broker_id)
206
210
}
207
211
}
208
212
}
209
213
210
- /// Set the partitions preferred replicate / follower low and high watermarks.
214
+ /// Sets the partition's preferred replica / follower low and high watermarks.
211
215
///
212
216
/// The topic will be created if it does not exist.
213
217
///
214
- /// Setting an offset to `Non ` will revert back to the leaders corresponding watermark.
218
+ /// Setting an offset to `None ` will revert back to the leader's corresponding watermark.
215
219
pub fn follower_watermarks (
216
220
& self ,
217
221
topic : & str ,
218
222
partition : i32 ,
219
223
low_watermark : Option < i64 > ,
220
224
high_watermark : Option < i64 > ,
221
225
) -> KafkaResult < ( ) > {
222
- let raw_topic = CString :: new ( topic) . unwrap ( ) ;
226
+ let topic_c = CString :: new ( topic) ? ;
223
227
let low_watermark = low_watermark. unwrap_or ( -1 ) ;
224
228
let high_watermark = high_watermark. unwrap_or ( -1 ) ;
225
229
226
230
return_mock_op ! {
227
231
unsafe {
228
232
rdsys:: rd_kafka_mock_partition_set_follower_wmarks(
229
233
self . mock_cluster,
230
- raw_topic . as_ptr( ) ,
234
+ topic_c . as_ptr( ) ,
231
235
partition,
232
236
low_watermark,
233
237
high_watermark
@@ -237,6 +241,7 @@ where
237
241
}
238
242
239
243
/// Disconnects the broker and disallows any new connections.
244
+ /// Use -1 for all brokers, or >= 0 for a specific broker.
240
245
///
241
246
/// NOTE: This does NOT trigger leader change.
242
247
pub fn broker_down ( & self , broker_id : i32 ) -> KafkaResult < ( ) > {
@@ -247,7 +252,8 @@ where
247
252
}
248
253
}
249
254
250
- /// brief Makes the broker accept connections again.
255
+ /// Makes the broker accept connections again.
256
+ /// Use -1 for all brokers, or >= 0 for a specific broker.
251
257
///
252
258
/// NOTE: This does NOT trigger leader change.
253
259
pub fn broker_up ( & self , broker_id : i32 ) -> KafkaResult < ( ) > {
@@ -259,6 +265,7 @@ where
259
265
}
260
266
261
267
/// Set broker round-trip-time delay in milliseconds.
268
+ /// Use -1 for all brokers, or >= 0 for a specific broker.
262
269
pub fn broker_round_trip_time ( & self , broker_id : i32 , delay : Duration ) -> KafkaResult < ( ) > {
263
270
let rtt_ms = delay. as_millis ( ) . try_into ( ) . unwrap_or ( c_int:: MAX ) ;
264
271
@@ -274,14 +281,15 @@ where
274
281
}
275
282
276
283
/// Sets the broker's rack as reported in Metadata to the client.
284
+ /// Use -1 for all brokers, or >= 0 for a specific broker.
277
285
pub fn broker_rack ( & self , broker_id : i32 , rack : & str ) -> KafkaResult < ( ) > {
278
- let raw_rack = CString :: new ( rack) . unwrap ( ) ;
286
+ let rack_c = CString :: new ( rack) ? ;
279
287
return_mock_op ! {
280
288
unsafe {
281
289
rdsys:: rd_kafka_mock_broker_set_rack(
282
290
self . mock_cluster,
283
291
broker_id,
284
- raw_rack . as_ptr( )
292
+ rack_c . as_ptr( )
285
293
)
286
294
}
287
295
}
@@ -291,22 +299,22 @@ where
291
299
///
292
300
/// If this API is not a standard hashing scheme will be used.
293
301
///
294
- /// `broker_id` does not need to point to an existing broker.`
302
+ /// `broker_id` does not need to point to an existing broker.
295
303
pub fn coordinator ( & self , coordinator : MockCoordinator , broker_id : i32 ) -> KafkaResult < ( ) > {
296
304
let ( kind, key) = match coordinator {
297
305
MockCoordinator :: Transaction ( key) => ( "transaction" , key) ,
298
306
MockCoordinator :: Group ( key) => ( "group" , key) ,
299
307
} ;
300
308
301
- let raw_kind = CString :: new ( kind) . unwrap ( ) ;
302
- let raw_key = CString :: new ( key) . unwrap ( ) ;
309
+ let kind_c = CString :: new ( kind) ? ;
310
+ let raw_c = CString :: new ( key) ? ;
303
311
304
312
return_mock_op ! {
305
313
unsafe {
306
314
rdsys:: rd_kafka_mock_coordinator_set(
307
315
self . mock_cluster,
308
- raw_kind . as_ptr( ) ,
309
- raw_key . as_ptr( ) ,
316
+ kind_c . as_ptr( ) ,
317
+ raw_c . as_ptr( ) ,
310
318
broker_id
311
319
)
312
320
}
@@ -356,7 +364,7 @@ mod tests {
356
364
. create ( )
357
365
. expect ( "Client creation error" ) ;
358
366
359
- let rec = FutureRecord :: to ( TOPIC ) . key ( b "msg1") . payload ( b "test") ;
367
+ let rec = FutureRecord :: to ( TOPIC ) . key ( "msg1" ) . payload ( "test" ) ;
360
368
producer. send_result ( rec) . unwrap ( ) . await . unwrap ( ) . unwrap ( ) ;
361
369
362
370
consumer. subscribe ( & [ TOPIC ] ) . unwrap ( ) ;
0 commit comments