@@ -6,11 +6,12 @@ use crate::{
6
6
protocol:: {
7
7
error:: Error as ProtocolError ,
8
8
messages:: {
9
- FetchRequest , FetchRequestPartition , FetchRequestTopic , FetchResponse ,
10
- FetchResponsePartition , IsolationLevel , ListOffsetsRequest ,
11
- ListOffsetsRequestPartition , ListOffsetsRequestTopic , ListOffsetsResponse ,
12
- ListOffsetsResponsePartition , ProduceRequest , ProduceRequestPartitionData ,
13
- ProduceRequestTopicData , ProduceResponse , NORMAL_CONSUMER ,
9
+ DeletePartitionRequest , DeletePartitionResponse , DeleteRecordsRequest ,
10
+ DeleteRecordsResponse , DeleteTopicRequest , FetchRequest , FetchRequestPartition ,
11
+ FetchRequestTopic , FetchResponse , FetchResponsePartition , IsolationLevel ,
12
+ ListOffsetsRequest , ListOffsetsRequestPartition , ListOffsetsRequestTopic ,
13
+ ListOffsetsResponse , ListOffsetsResponsePartition , ProduceRequest ,
14
+ ProduceRequestPartitionData , ProduceRequestTopicData , ProduceResponse , NORMAL_CONSUMER ,
14
15
} ,
15
16
primitives:: * ,
16
17
record:: { Record as ProtocolRecord , * } ,
@@ -159,6 +160,29 @@ impl PartitionClient {
159
160
extract_high_watermark ( partition)
160
161
}
161
162
163
+ /// Delete records whose offset is smaller than the given offset.
164
+ ///
165
+ /// # Supported Brokers
166
+ /// Currently this is only supported by Apache Kafka but NOT by Redpanda, see
167
+ /// <https://github.com/redpanda-data/redpanda/issues/1016>.
168
+ pub async fn delete_records ( & self , offset : i64 , timeout_ms : i32 ) -> Result < ( ) > {
169
+ let request =
170
+ & build_delete_records_request ( offset, timeout_ms, & self . topic , self . partition ) ;
171
+
172
+ maybe_retry (
173
+ & self . backoff_config ,
174
+ self ,
175
+ "delete_records" ,
176
+ || async move {
177
+ let response = self . get ( ) . await ?. request ( & request) . await ?;
178
+ process_delete_records_response ( & self . topic , self . partition , response)
179
+ } ,
180
+ )
181
+ . await ?;
182
+
183
+ Ok ( ( ) )
184
+ }
185
+
162
186
/// Retrieve the broker ID of the partition leader
163
187
async fn get_leader ( & self , broker_override : Option < BrokerConnection > ) -> Result < i32 > {
164
188
let metadata = self
@@ -625,3 +649,59 @@ fn extract_high_watermark(partition: ListOffsetsResponsePartition) -> Result<i64
625
649
_ => unreachable ! ( ) ,
626
650
}
627
651
}
652
+
653
+ fn build_delete_records_request (
654
+ offset : i64 ,
655
+ timeout_ms : i32 ,
656
+ topic : & str ,
657
+ partition : i32 ,
658
+ ) -> DeleteRecordsRequest {
659
+ DeleteRecordsRequest {
660
+ topics : vec ! [ DeleteTopicRequest {
661
+ name: String_ ( topic. to_string( ) ) ,
662
+ partitions: vec![ DeletePartitionRequest {
663
+ partition_index: Int32 ( partition) ,
664
+ offset: Int64 ( offset) ,
665
+ tagged_fields: None ,
666
+ } ] ,
667
+ tagged_fields: None ,
668
+ } ] ,
669
+ timeout_ms : Int32 ( timeout_ms) ,
670
+ tagged_fields : None ,
671
+ }
672
+ }
673
+
674
+ fn process_delete_records_response (
675
+ topic : & str ,
676
+ partition : i32 ,
677
+ response : DeleteRecordsResponse ,
678
+ ) -> Result < DeletePartitionResponse > {
679
+ let response_topic = response
680
+ . topics
681
+ . exactly_one ( )
682
+ . map_err ( Error :: exactly_one_topic) ?;
683
+
684
+ if response_topic. name . 0 != topic {
685
+ return Err ( Error :: InvalidResponse ( format ! (
686
+ "Expected data for topic '{}' but got data for topic '{}'" ,
687
+ topic, response_topic. name. 0
688
+ ) ) ) ;
689
+ }
690
+
691
+ let response_partition = response_topic
692
+ . partitions
693
+ . exactly_one ( )
694
+ . map_err ( Error :: exactly_one_partition) ?;
695
+
696
+ if response_partition. partition_index . 0 != partition {
697
+ return Err ( Error :: InvalidResponse ( format ! (
698
+ "Expected data for partition {} but got data for partition {}" ,
699
+ partition, response_partition. partition_index. 0
700
+ ) ) ) ;
701
+ }
702
+
703
+ match response_partition. error {
704
+ Some ( err) => Err ( Error :: ServerError ( err, String :: new ( ) ) ) ,
705
+ None => Ok ( response_partition) ,
706
+ }
707
+ }
0 commit comments