Skip to content

Commit 304eedd

Browse files
committed
feat: implement record deletion
1 parent 0f9cd4e commit 304eedd

File tree

7 files changed

+505
-10
lines changed

7 files changed

+505
-10
lines changed

.circleci/config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ jobs:
256256
RUST_LOG: "trace"
257257
# Run integration tests
258258
TEST_INTEGRATION: 1
259+
# Kafka support DeleteRecords
260+
TEST_DELETE_RECORDS: 1
259261
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
260262
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
261263
KAFKA_CONNECT: "kafka-1:9093"

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,11 @@ $ docker-compose -f docker-compose-kafka.yml up
127127
in one session, and then run:
128128

129129
```console
130-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9094 cargo test
130+
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test
131131
```
132132

133-
in another session.
133+
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
134+
environment variables.
134135

135136
### Fuzzing
136137
RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have [cargo-fuzz] installed.

src/client/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use thiserror::Error;
22

3+
pub use crate::messenger::RequestError;
34
pub use crate::protocol::error::Error as ProtocolError;
45

56
#[derive(Error, Debug)]
@@ -8,7 +9,7 @@ pub enum Error {
89
Connection(#[from] crate::connection::Error),
910

1011
#[error("Request error: {0}")]
11-
Request(#[from] crate::messenger::RequestError),
12+
Request(#[from] RequestError),
1213

1314
#[error("Invalid response: {0}")]
1415
InvalidResponse(String),

src/client/partition.rs

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ use crate::{
66
protocol::{
77
error::Error as ProtocolError,
88
messages::{
9-
FetchRequest, FetchRequestPartition, FetchRequestTopic, FetchResponse,
10-
FetchResponsePartition, IsolationLevel, ListOffsetsRequest,
11-
ListOffsetsRequestPartition, ListOffsetsRequestTopic, ListOffsetsResponse,
12-
ListOffsetsResponsePartition, ProduceRequest, ProduceRequestPartitionData,
13-
ProduceRequestTopicData, ProduceResponse, NORMAL_CONSUMER,
9+
DeleteRecordsRequest, DeleteRecordsResponse, DeleteRequestPartition,
10+
DeleteRequestTopic, DeleteResponsePartition, FetchRequest, FetchRequestPartition,
11+
FetchRequestTopic, FetchResponse, FetchResponsePartition, IsolationLevel,
12+
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
13+
ListOffsetsResponse, ListOffsetsResponsePartition, ProduceRequest,
14+
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse, NORMAL_CONSUMER,
1415
},
1516
primitives::*,
1617
record::{Record as ProtocolRecord, *},
@@ -159,6 +160,29 @@ impl PartitionClient {
159160
extract_high_watermark(partition)
160161
}
161162

163+
/// Delete records whose offset is smaller than the given offset.
164+
///
165+
/// # Supported Brokers
166+
/// Currently this is only supported by Apache Kafka but NOT by Redpanda, see
167+
/// <https://github.com/redpanda-data/redpanda/issues/1016>.
168+
pub async fn delete_records(&self, offset: i64, timeout_ms: i32) -> Result<()> {
169+
let request =
170+
&build_delete_records_request(offset, timeout_ms, &self.topic, self.partition);
171+
172+
maybe_retry(
173+
&self.backoff_config,
174+
self,
175+
"delete_records",
176+
|| async move {
177+
let response = self.get().await?.request(&request).await?;
178+
process_delete_records_response(&self.topic, self.partition, response)
179+
},
180+
)
181+
.await?;
182+
183+
Ok(())
184+
}
185+
162186
/// Retrieve the broker ID of the partition leader
163187
async fn get_leader(&self, broker_override: Option<BrokerConnection>) -> Result<i32> {
164188
let metadata = self
@@ -625,3 +649,59 @@ fn extract_high_watermark(partition: ListOffsetsResponsePartition) -> Result<i64
625649
_ => unreachable!(),
626650
}
627651
}
652+
653+
fn build_delete_records_request(
654+
offset: i64,
655+
timeout_ms: i32,
656+
topic: &str,
657+
partition: i32,
658+
) -> DeleteRecordsRequest {
659+
DeleteRecordsRequest {
660+
topics: vec![DeleteRequestTopic {
661+
name: String_(topic.to_string()),
662+
partitions: vec![DeleteRequestPartition {
663+
partition_index: Int32(partition),
664+
offset: Int64(offset),
665+
tagged_fields: None,
666+
}],
667+
tagged_fields: None,
668+
}],
669+
timeout_ms: Int32(timeout_ms),
670+
tagged_fields: None,
671+
}
672+
}
673+
674+
fn process_delete_records_response(
675+
topic: &str,
676+
partition: i32,
677+
response: DeleteRecordsResponse,
678+
) -> Result<DeleteResponsePartition> {
679+
let response_topic = response
680+
.topics
681+
.exactly_one()
682+
.map_err(Error::exactly_one_topic)?;
683+
684+
if response_topic.name.0 != topic {
685+
return Err(Error::InvalidResponse(format!(
686+
"Expected data for topic '{}' but got data for topic '{}'",
687+
topic, response_topic.name.0
688+
)));
689+
}
690+
691+
let response_partition = response_topic
692+
.partitions
693+
.exactly_one()
694+
.map_err(Error::exactly_one_partition)?;
695+
696+
if response_partition.partition_index.0 != partition {
697+
return Err(Error::InvalidResponse(format!(
698+
"Expected data for partition {} but got data for partition {}",
699+
partition, response_partition.partition_index.0
700+
)));
701+
}
702+
703+
match response_partition.error {
704+
Some(err) => Err(Error::ServerError(err, String::new())),
705+
None => Ok(response_partition),
706+
}
707+
}

0 commit comments

Comments
 (0)