Skip to content

Commit fc26722

Browse files
Merge pull request #191 from influxdata/crepererum/issue190
feat: implement topic deletion
2 parents 8678dfe + f3c52c7 commit fc26722

File tree

4 files changed

+267
-10
lines changed

4 files changed

+267
-10
lines changed

src/client/controller.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use crate::{
1414
messenger::RequestError,
1515
protocol::{
1616
error::Error as ProtocolError,
17-
messages::{CreateTopicRequest, CreateTopicsRequest},
18-
primitives::{Int16, Int32, String_},
17+
messages::{CreateTopicRequest, CreateTopicsRequest, DeleteTopicsRequest},
18+
primitives::{Array, Int16, Int32, String_},
1919
},
2020
throttle::maybe_throttle,
2121
validation::ExactlyOne,
@@ -103,6 +103,57 @@ impl ControllerClient {
103103
Ok(())
104104
}
105105

106+
/// Delete a topic
107+
pub async fn delete_topic(
108+
&self,
109+
name: impl Into<String> + Send,
110+
timeout_ms: i32,
111+
) -> Result<()> {
112+
let request = &DeleteTopicsRequest {
113+
topic_names: Array(Some(vec![String_(name.into())])),
114+
timeout_ms: Int32(timeout_ms),
115+
tagged_fields: None,
116+
};
117+
118+
maybe_retry(&self.backoff_config, self, "delete_topic", || async move {
119+
let (broker, gen) = self
120+
.get()
121+
.await
122+
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
123+
let response = broker
124+
.request(request)
125+
.await
126+
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(gen))))?;
127+
128+
maybe_throttle(response.throttle_time_ms)?;
129+
130+
let topic = response
131+
.responses
132+
.exactly_one()
133+
.map_err(|e| ErrorOrThrottle::Error((Error::exactly_one_topic(e), Some(gen))))?;
134+
135+
match topic.error {
136+
None => Ok(()),
137+
Some(protocol_error) => Err(ErrorOrThrottle::Error((
138+
Error::ServerError {
139+
protocol_error,
140+
error_message: topic.error_message.and_then(|s| s.0),
141+
request: RequestContext::Topic(topic.name.0),
142+
response: None,
143+
is_virtual: false,
144+
},
145+
Some(gen),
146+
))),
147+
}
148+
})
149+
.await?;
150+
151+
// Refresh the cache now there is definitely a new topic to observe.
152+
let _ = self.brokers.refresh_metadata().await;
153+
154+
Ok(())
155+
}
156+
106157
/// Retrieve the broker ID of the controller
107158
async fn get_controller_id(&self) -> Result<i32> {
108159
// Request an uncached, fresh copy of the metadata.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use std::io::{Read, Write};
2+
3+
use crate::protocol::{
4+
api_key::ApiKey,
5+
api_version::{ApiVersion, ApiVersionRange},
6+
error::Error,
7+
messages::{
8+
read_compact_versioned_array, read_versioned_array, ReadVersionedError, ReadVersionedType,
9+
RequestBody, WriteVersionedError, WriteVersionedType,
10+
},
11+
primitives::{
12+
Array, CompactArrayRef, CompactNullableString, CompactString, CompactStringRef, Int16,
13+
Int32, String_, TaggedFields,
14+
},
15+
traits::{ReadType, WriteType},
16+
};
17+
18+
#[derive(Debug)]
19+
pub struct DeleteTopicsRequest {
20+
/// The names of the topics to delete.
21+
pub topic_names: Array<String_>,
22+
23+
/// The length of time in milliseconds to wait for the deletions to complete.
24+
pub timeout_ms: Int32,
25+
26+
/// The tagged fields.
27+
///
28+
/// Added in version 4.
29+
pub tagged_fields: Option<TaggedFields>,
30+
}
31+
32+
impl RequestBody for DeleteTopicsRequest {
33+
type ResponseBody = DeleteTopicsResponse;
34+
35+
const API_KEY: ApiKey = ApiKey::DeleteTopics;
36+
37+
/// Enough for now.
38+
const API_VERSION_RANGE: ApiVersionRange =
39+
ApiVersionRange::new(ApiVersion(Int16(0)), ApiVersion(Int16(5)));
40+
41+
const FIRST_TAGGED_FIELD_IN_REQUEST_VERSION: ApiVersion = ApiVersion(Int16(4));
42+
}
43+
44+
impl<W> WriteVersionedType<W> for DeleteTopicsRequest
45+
where
46+
W: Write,
47+
{
48+
fn write_versioned(
49+
&self,
50+
writer: &mut W,
51+
version: ApiVersion,
52+
) -> Result<(), WriteVersionedError> {
53+
let v = version.0 .0;
54+
assert!(v <= 5);
55+
56+
if v >= 4 {
57+
if let Some(topic_names) = self.topic_names.0.as_ref() {
58+
let topic_names: Vec<_> = topic_names
59+
.iter()
60+
.map(|name| CompactStringRef(name.0.as_str()))
61+
.collect();
62+
CompactArrayRef(Some(&topic_names)).write(writer)?;
63+
} else {
64+
CompactArrayRef::<CompactStringRef<'_>>(None).write(writer)?;
65+
}
66+
} else {
67+
self.topic_names.write(writer)?;
68+
};
69+
70+
self.timeout_ms.write(writer)?;
71+
72+
if v >= 4 {
73+
match self.tagged_fields.as_ref() {
74+
Some(tagged_fields) => {
75+
tagged_fields.write(writer)?;
76+
}
77+
None => {
78+
TaggedFields::default().write(writer)?;
79+
}
80+
}
81+
}
82+
83+
Ok(())
84+
}
85+
}
86+
87+
#[derive(Debug)]
88+
pub struct DeleteTopicsResponse {
89+
/// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the
90+
/// request did not violate any quota.
91+
///
92+
/// Added in version 1.
93+
pub throttle_time_ms: Option<Int32>,
94+
95+
/// The results for each topic we tried to delete.
96+
pub responses: Vec<DeleteTopicsResponseTopic>,
97+
98+
/// The tagged fields.
99+
///
100+
/// Added in version 4.
101+
pub tagged_fields: Option<TaggedFields>,
102+
}
103+
104+
impl<R> ReadVersionedType<R> for DeleteTopicsResponse
105+
where
106+
R: Read,
107+
{
108+
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
109+
let v = version.0 .0;
110+
assert!(v <= 5);
111+
112+
let throttle_time_ms = (v >= 1).then(|| Int32::read(reader)).transpose()?;
113+
let responses = if v >= 4 {
114+
read_compact_versioned_array(reader, version)?.unwrap_or_default()
115+
} else {
116+
read_versioned_array(reader, version)?.unwrap_or_default()
117+
};
118+
let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
119+
120+
Ok(Self {
121+
throttle_time_ms,
122+
responses,
123+
tagged_fields,
124+
})
125+
}
126+
}
127+
128+
#[derive(Debug)]
129+
pub struct DeleteTopicsResponseTopic {
130+
/// The topic name.
131+
pub name: String_,
132+
133+
/// The error code, or 0 if there was no error.
134+
pub error: Option<Error>,
135+
136+
/// The error message, or null if there was no error.
137+
///
138+
/// Added in version 5.
139+
pub error_message: Option<CompactNullableString>,
140+
141+
/// The tagged fields.
142+
///
143+
/// Added in version 4.
144+
pub tagged_fields: Option<TaggedFields>,
145+
}
146+
147+
impl<R> ReadVersionedType<R> for DeleteTopicsResponseTopic
148+
where
149+
R: Read,
150+
{
151+
fn read_versioned(reader: &mut R, version: ApiVersion) -> Result<Self, ReadVersionedError> {
152+
let v = version.0 .0;
153+
assert!(v <= 5);
154+
155+
let name = if v >= 4 {
156+
String_(CompactString::read(reader)?.0)
157+
} else {
158+
String_::read(reader)?
159+
};
160+
let error = Error::new(Int16::read(reader)?.0);
161+
let error_message = (v >= 5)
162+
.then(|| CompactNullableString::read(reader))
163+
.transpose()?;
164+
let tagged_fields = (v >= 4).then(|| TaggedFields::read(reader)).transpose()?;
165+
166+
Ok(Self {
167+
name,
168+
error,
169+
error_message,
170+
tagged_fields,
171+
})
172+
}
173+
}

src/protocol/messages/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ mod create_topics;
2424
pub use create_topics::*;
2525
mod delete_records;
2626
pub use delete_records::*;
27+
mod delete_topics;
28+
pub use delete_topics::*;
2729
mod fetch;
2830
pub use fetch::*;
2931
mod header;

tests/client.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,26 @@ async fn test_topic_crud() {
4646
}
4747
}
4848
}
49-
let new_topic = format!("{}{}", prefix, max_id + 1);
50-
controller_client
51-
.create_topic(&new_topic, 2, 1, 5_000)
52-
.await
53-
.unwrap();
49+
50+
// create two topics
51+
let mut new_topics = vec![];
52+
for offset in 1..=2 {
53+
let new_topic = format!("{}{}", prefix, max_id + offset);
54+
controller_client
55+
.create_topic(&new_topic, 2, 1, 5_000)
56+
.await
57+
.unwrap();
58+
new_topics.push(new_topic);
59+
}
5460

5561
// might take a while to converge
5662
tokio::time::timeout(TEST_TIMEOUT, async {
5763
loop {
5864
let topics = client.list_topics().await.unwrap();
59-
let topic = topics.iter().find(|t| t.name == new_topic);
60-
if topic.is_some() {
65+
if new_topics
66+
.iter()
67+
.all(|new_topic| topics.iter().any(|t| &t.name == new_topic))
68+
{
6169
return;
6270
}
6371

@@ -67,8 +75,9 @@ async fn test_topic_crud() {
6775
.await
6876
.unwrap();
6977

78+
// topic already exists
7079
let err = controller_client
71-
.create_topic(&new_topic, 1, 1, 5_000)
80+
.create_topic(&new_topics[0], 1, 1, 5_000)
7281
.await
7382
.unwrap_err();
7483
match err {
@@ -78,6 +87,28 @@ async fn test_topic_crud() {
7887
} => {}
7988
_ => panic!("Unexpected error: {}", err),
8089
}
90+
91+
// delete one topic
92+
controller_client
93+
.delete_topic(&new_topics[0], 5_000)
94+
.await
95+
.unwrap();
96+
97+
// might take a while to converge
98+
tokio::time::timeout(TEST_TIMEOUT, async {
99+
loop {
100+
let topics = client.list_topics().await.unwrap();
101+
if topics.iter().all(|t| t.name != new_topics[0])
102+
&& topics.iter().any(|t| t.name == new_topics[1])
103+
{
104+
return;
105+
}
106+
107+
tokio::time::sleep(Duration::from_millis(10)).await;
108+
}
109+
})
110+
.await
111+
.unwrap();
81112
}
82113

83114
#[tokio::test]

0 commit comments

Comments
 (0)