@@ -6,11 +6,12 @@ use crate::{
6
6
protocol:: {
7
7
error:: Error as ProtocolError ,
8
8
messages:: {
9
- FetchRequest , FetchRequestPartition , FetchRequestTopic , FetchResponse ,
10
- FetchResponsePartition , IsolationLevel , ListOffsetsRequest ,
11
- ListOffsetsRequestPartition , ListOffsetsRequestTopic , ListOffsetsResponse ,
12
- ListOffsetsResponsePartition , ProduceRequest , ProduceRequestPartitionData ,
13
- ProduceRequestTopicData , ProduceResponse , NORMAL_CONSUMER ,
9
+ DeletePartitionRequest , DeletePartitionResponse , DeleteRecordsRequest ,
10
+ DeleteRecordsResponse , DeleteTopicRequest , FetchRequest , FetchRequestPartition ,
11
+ FetchRequestTopic , FetchResponse , FetchResponsePartition , IsolationLevel ,
12
+ ListOffsetsRequest , ListOffsetsRequestPartition , ListOffsetsRequestTopic ,
13
+ ListOffsetsResponse , ListOffsetsResponsePartition , ProduceRequest ,
14
+ ProduceRequestPartitionData , ProduceRequestTopicData , ProduceResponse , NORMAL_CONSUMER ,
14
15
} ,
15
16
primitives:: * ,
16
17
record:: { Record as ProtocolRecord , * } ,
@@ -159,6 +160,25 @@ impl PartitionClient {
159
160
extract_high_watermark ( partition)
160
161
}
161
162
163
+ /// Delete records whose offset is smaller than the given offset.
164
+ pub async fn delete_records ( & self , offset : i64 , timeout_ms : i32 ) -> Result < ( ) > {
165
+ let request =
166
+ & build_delete_records_request ( offset, timeout_ms, & self . topic , self . partition ) ;
167
+
168
+ maybe_retry (
169
+ & self . backoff_config ,
170
+ self ,
171
+ "delete_records" ,
172
+ || async move {
173
+ let response = self . get ( ) . await ?. request ( & request) . await ?;
174
+ process_delete_records_response ( & self . topic , self . partition , response)
175
+ } ,
176
+ )
177
+ . await ?;
178
+
179
+ Ok ( ( ) )
180
+ }
181
+
162
182
/// Retrieve the broker ID of the partition leader
163
183
async fn get_leader ( & self , broker_override : Option < BrokerConnection > ) -> Result < i32 > {
164
184
let metadata = self
@@ -625,3 +645,59 @@ fn extract_high_watermark(partition: ListOffsetsResponsePartition) -> Result<i64
625
645
_ => unreachable ! ( ) ,
626
646
}
627
647
}
648
+
649
+ fn build_delete_records_request (
650
+ offset : i64 ,
651
+ timeout_ms : i32 ,
652
+ topic : & str ,
653
+ partition : i32 ,
654
+ ) -> DeleteRecordsRequest {
655
+ DeleteRecordsRequest {
656
+ topics : vec ! [ DeleteTopicRequest {
657
+ name: String_ ( topic. to_string( ) ) ,
658
+ partitions: vec![ DeletePartitionRequest {
659
+ partition_index: Int32 ( partition) ,
660
+ offset: Int64 ( offset) ,
661
+ tagged_fields: None ,
662
+ } ] ,
663
+ tagged_fields: None ,
664
+ } ] ,
665
+ timeout_ms : Int32 ( timeout_ms) ,
666
+ tagged_fields : None ,
667
+ }
668
+ }
669
+
670
+ fn process_delete_records_response (
671
+ topic : & str ,
672
+ partition : i32 ,
673
+ response : DeleteRecordsResponse ,
674
+ ) -> Result < DeletePartitionResponse > {
675
+ let response_topic = response
676
+ . topics
677
+ . exactly_one ( )
678
+ . map_err ( Error :: exactly_one_topic) ?;
679
+
680
+ if response_topic. name . 0 != topic {
681
+ return Err ( Error :: InvalidResponse ( format ! (
682
+ "Expected data for topic '{}' but got data for topic '{}'" ,
683
+ topic, response_topic. name. 0
684
+ ) ) ) ;
685
+ }
686
+
687
+ let response_partition = response_topic
688
+ . partitions
689
+ . exactly_one ( )
690
+ . map_err ( Error :: exactly_one_partition) ?;
691
+
692
+ if response_partition. partition_index . 0 != partition {
693
+ return Err ( Error :: InvalidResponse ( format ! (
694
+ "Expected data for partition {} but got data for partition {}" ,
695
+ partition, response_partition. partition_index. 0
696
+ ) ) ) ;
697
+ }
698
+
699
+ match response_partition. error {
700
+ Some ( err) => Err ( Error :: ServerError ( err, String :: new ( ) ) ) ,
701
+ None => Ok ( response_partition) ,
702
+ }
703
+ }
0 commit comments