Skip to content

Commit 48a04c8

Browse files
committed
avoid using boxed partitioner
1 parent 5d1c3d4 commit 48a04c8

File tree

4 files changed

+15
-7
lines changed

4 files changed

+15
-7
lines changed

src/client.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,18 @@ impl<C: ClientContext> Client<C> {
221221
native_config: NativeClientConfig,
222222
rd_kafka_type: RDKafkaType,
223223
context: C,
224+
) -> KafkaResult<Client<C>> {
225+
Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
226+
}
227+
228+
/// Creates a new `Client` given a configuration, a client type and a context.
229+
pub fn new_context_arc(
230+
config: &ClientConfig,
231+
native_config: NativeClientConfig,
232+
rd_kafka_type: RDKafkaType,
233+
context: Arc<C>,
224234
) -> KafkaResult<Client<C>> {
225235
let mut err_buf = ErrBuf::new();
226-
let context = Arc::new(context);
227236
unsafe {
228237
rdsys::rd_kafka_conf_set_opaque(
229238
native_config.ptr(),

src/producer/base_producer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,11 @@ where
258258
context: C,
259259
) -> KafkaResult<BaseProducer<C, Part>> {
260260
let native_config = config.create_native_config()?;
261+
let context = Arc::new(context);
261262

262263
let partitioner = match context.get_custom_partitioner() {
263264
None => null_mut(),
264-
Some(partitioner) => partitioner.as_ref() as *const Part as *mut c_void,
265+
Some(partitioner) => partitioner as *const Part as *mut c_void,
265266
};
266267

267268
if !partitioner.is_null() {
@@ -279,7 +280,7 @@ where
279280
unsafe {
280281
rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<Part, C>))
281282
};
282-
let client = Client::new(
283+
let client = Client::new_context_arc(
283284
config,
284285
native_config,
285286
RDKafkaType::RD_KAFKA_PRODUCER,

src/producer/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,7 @@ pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContex
201201
fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
202202

203203
/// This method is called when creating producer in order to register custom partitioner.
204-
/// Box is used to make sure data is on the heap as partitioner address will fly across FFI boundary.
205-
#[allow(clippy::borrowed-box)]
206-
fn get_custom_partitioner(&self) -> Option<&Box<Part>> {
204+
fn get_custom_partitioner(&self) -> Option<&Part> {
207205
None
208206
}
209207
}

tests/test_low_producers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl<Part: Partitioner + Send + Sync> ProducerContext<Part> for CollectingContex
9494
}
9595
}
9696

97-
fn get_custom_partitioner(&self) -> Option<&Box<Part>> {
97+
fn get_custom_partitioner(&self) -> Option<&Part> {
9898
match &self.partitioner {
9999
None => None,
100100
Some(p) => Some(&p),

0 commit comments

Comments
 (0)