Skip to content

Commit 9198dba

Browse files
committed
Add missing mock APIs
1 parent 536c0af commit 9198dba

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

rdkafka-sys/src/types.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::error::Error;
55
use std::ffi::CStr;
66
use std::fmt;
77

8+
use num_enum::IntoPrimitive;
9+
810
use crate::bindings;
911
use crate::helpers;
1012

@@ -477,6 +479,72 @@ impl fmt::Display for RDKafkaErrorCode {
477479

478480
impl Error for RDKafkaErrorCode {}
479481

482+
/// Native rdkafka ApiKeys / protocol requests
483+
#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive)]
484+
#[repr(i16)]
485+
#[non_exhaustive]
486+
pub enum RDKafkaApiKey {
487+
Produce = 0,
488+
Fetch = 1,
489+
ListOffsets = 2,
490+
Metadata = 3,
491+
LeaderAndIsr = 4,
492+
StopReplica = 5,
493+
UpdateMetadata = 6,
494+
ControlledShutdown = 7,
495+
OffsetCommit = 8,
496+
OffsetFetch = 9,
497+
FindCoordinator = 10,
498+
JoinGroup = 11,
499+
Heartbeat = 12,
500+
LeaveGroup = 13,
501+
SyncGroup = 14,
502+
DescribeGroups = 15,
503+
ListGroups = 16,
504+
SaslHandshake = 17,
505+
ApiVersion = 18,
506+
CreateTopics = 19,
507+
DeleteTopics = 20,
508+
DeleteRecords = 21,
509+
InitProducerId = 22,
510+
OffsetForLeaderEpoch = 23,
511+
AddPartitionsToTxn = 24,
512+
AddOffsetsToTxn = 25,
513+
EndTxn = 26,
514+
WriteTxnMarkers = 27,
515+
TxnOffsetCommit = 28,
516+
DescribeAcls = 29,
517+
CreateAcls = 30,
518+
DeleteAcls = 31,
519+
DescribeConfigs = 32,
520+
AlterConfigs = 33,
521+
AlterReplicaLogDirs = 34,
522+
DescribeLogDirs = 35,
523+
SaslAuthenticate = 36,
524+
CreatePartitions = 37,
525+
CreateDelegationToken = 38,
526+
RenewDelegationToken = 39,
527+
ExpireDelegationToken = 40,
528+
DescribeDelegationToken = 41,
529+
DeleteGroups = 42,
530+
ElectLeaders = 43,
531+
IncrementalAlterConfigs = 44,
532+
AlterPartitionReassignments = 45,
533+
ListPartitionReassignments = 46,
534+
OffsetDelete = 47,
535+
DescribeClientQuotas = 48,
536+
AlterClientQuotas = 49,
537+
DescribeUserScramCredentials = 50,
538+
AlterUserScramCredentials = 51,
539+
Vote = 52,
540+
BeginQuorumEpoch = 53,
541+
EndQuorumEpoch = 54,
542+
DescribeQuorum = 55,
543+
AlterIsr = 56,
544+
UpdateFeatures = 57,
545+
Envelope = 58,
546+
}
547+
480548
#[cfg(test)]
481549
mod tests {
482550
use super::*;

src/mocking.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,40 @@ where
140140
bootstrap.to_string_lossy().to_string()
141141
}
142142

143+
/// Clear the cluster's error state for the given ApiKey.
144+
pub fn clear_request_errors(&self, api_key: RDKafkaApiKey) {
145+
unsafe { rdsys::rd_kafka_mock_clear_request_errors(self.mock_cluster, api_key.into()) }
146+
}
147+
148+
/// Push errors onto the cluster's error stack for the given ApiKey.
149+
///
150+
/// The protocol requests matching the given ApiKey will fail with the
151+
/// provided error code and removed from the stack, starting with
152+
/// the first error code, then the second, etc.
153+
///
154+
/// Passing RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker
155+
/// disconnect the client which can be useful to trigger a disconnect
156+
/// on certain requests.
157+
pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr]) {
158+
unsafe {
159+
rdsys::rd_kafka_mock_push_request_errors_array(
160+
self.mock_cluster,
161+
api_key.into(),
162+
errors.len(),
163+
errors.as_ptr(),
164+
)
165+
}
166+
}
167+
168+
/// Set the topic error to return in protocol requests.
169+
///
170+
/// Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.
171+
pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()> {
172+
let topic_c = CString::new(topic)?;
173+
unsafe { rdsys::rd_kafka_mock_topic_set_error(self.mock_cluster, topic_c.as_ptr(), error) }
174+
Ok(())
175+
}
176+
143177
/// Create a topic
144178
///
145179
/// This is an alternative to automatic topic creation as performed by the client itself.
@@ -320,6 +354,31 @@ where
320354
}
321355
}
322356
}
357+
358+
/// Set the allowed ApiVersion range for the given ApiKey.
359+
///
360+
/// Set min_version and max_version to `None` to disable the API completely.
361+
/// max_version MUST not exceed the maximum implemented value.
362+
pub fn apiversion(
363+
&self,
364+
api_key: RDKafkaApiKey,
365+
min_version: Option<i16>,
366+
max_version: Option<i16>,
367+
) -> KafkaResult<()> {
368+
let min_version = min_version.unwrap_or(-1);
369+
let max_version = max_version.unwrap_or(-1);
370+
371+
return_mock_op! {
372+
unsafe {
373+
rdsys::rd_kafka_mock_set_apiversion(
374+
self.mock_cluster,
375+
api_key.into(),
376+
min_version,
377+
max_version,
378+
)
379+
}
380+
}
381+
}
323382
}
324383

325384
impl<'c, C> Drop for MockCluster<'c, C>

0 commit comments

Comments
 (0)