Skip to content

Commit 6fa090d

Browse files
committed
put address of context inside opaque
1 parent 48a04c8 commit 6fa090d

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/producer/base_producer.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::ffi::{CStr, CString};
4545
use std::marker::PhantomData;
4646
use std::mem;
4747
use std::os::raw::c_void;
48-
use std::ptr::{self, null_mut};
48+
use std::ptr;
4949
use std::slice;
5050
use std::str;
5151
use std::sync::atomic::{AtomicBool, Ordering};
@@ -232,8 +232,14 @@ unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>
232232
Some(slice::from_raw_parts(keydata as *const u8, keylen))
233233
};
234234

235-
let producer_context: &mut Part = &mut *(rkt_opaque as *mut Part);
236-
producer_context.partition(topic_name, key, partition_cnt, is_partition_available)
235+
let producer_context = &mut *(rkt_opaque as *mut C);
236+
237+
match producer_context.get_custom_partitioner() {
238+
None => panic!("custom partitioner is not set"),
239+
Some(partitioner) => {
240+
partitioner.partition(topic_name, key, partition_cnt, is_partition_available)
241+
}
242+
}
237243
}
238244

239245
impl FromClientConfig for BaseProducer<DefaultProducerContext> {
@@ -260,15 +266,15 @@ where
260266
let native_config = config.create_native_config()?;
261267
let context = Arc::new(context);
262268

263-
let partitioner = match context.get_custom_partitioner() {
264-
None => null_mut(),
265-
Some(partitioner) => partitioner as *const Part as *mut c_void,
266-
};
267-
268-
if !partitioner.is_null() {
269+
if let Some(_) = context.get_custom_partitioner() {
269270
let default_topic_config =
270271
unsafe { rdsys::rd_kafka_conf_get_default_topic_conf(native_config.ptr()) };
271-
unsafe { rdsys::rd_kafka_topic_conf_set_opaque(default_topic_config, partitioner) };
272+
unsafe {
273+
rdsys::rd_kafka_topic_conf_set_opaque(
274+
default_topic_config,
275+
Arc::as_ptr(&context) as *const Part as *mut c_void,
276+
)
277+
};
272278
unsafe {
273279
rdsys::rd_kafka_topic_conf_set_partitioner_cb(
274280
default_topic_config,

0 commit comments

Comments
 (0)