Skip to content

Commit dc802ad

Browse files
author
ppom
committed
fix all async & clippy issues
- use hasmap instead of btreemap rust-lang/rust#64552 - use async iterators as few as possible - move first rotate_db into its own thread to be out of the runtime, so that we can use blocking_send - send flush to database
1 parent dfc8141 commit dc802ad

File tree

9 files changed

+98
-56
lines changed

9 files changed

+98
-56
lines changed

rust/src/concepts/filter.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
cmp::Ordering,
33
collections::{BTreeMap, BTreeSet},
44
fmt::Display,
5+
hash::Hash,
56
sync::Arc,
67
};
78

@@ -253,6 +254,12 @@ impl PartialOrd for Filter {
253254
Some(self.cmp(other))
254255
}
255256
}
257+
impl Hash for Filter {
258+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
259+
self.stream_name.hash(state);
260+
self.name.hash(state);
261+
}
262+
}
256263

257264
#[allow(clippy::unwrap_used)]
258265
#[cfg(test)]

rust/src/concepts/stream.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{cmp::Ordering, collections::BTreeMap};
1+
use std::{cmp::Ordering, collections::BTreeMap, hash::Hash};
22

33
use serde::Deserialize;
44

@@ -81,6 +81,11 @@ impl PartialOrd for Stream {
8181
Some(self.cmp(other))
8282
}
8383
}
84+
impl Hash for Stream {
85+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
86+
self.name.hash(state);
87+
}
88+
}
8489

8590
#[cfg(test)]
8691
pub mod tests {

rust/src/daemon/action.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl State {
3535
.first_key_value()
3636
.is_some_and(|(k, _)| *k + after < now)
3737
{
38+
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
3839
let (_, m) = self.ordered_times.pop_first().unwrap();
3940
self.pending.remove(&m);
4041
}
@@ -78,6 +79,7 @@ impl ActionManager {
7879
self.exec_now(m);
7980
} else {
8081
{
82+
#[allow(clippy::unwrap_used)] // propagating panics is ok
8183
let mut state = self.state.lock().unwrap();
8284
state.clear_past_times(t, self.action.after_duration());
8385
state.add_match(&m, exec_t);
@@ -86,6 +88,7 @@ impl ActionManager {
8688
tokio::spawn(async move {
8789
let dur = (exec_t - now).to_std().expect("Duration is bigger than what's supported. Did you put an enormous after duration?");
8890
tokio::time::sleep(dur).await;
91+
#[allow(clippy::unwrap_used)] // propagating panics is ok
8992
let mut state = this.state.lock().unwrap();
9093
if state.remove(m.clone(), t) {
9194
this.exec_now(m);
@@ -98,7 +101,8 @@ impl ActionManager {
98101
&self,
99102
order: Order,
100103
is_match: F,
101-
) -> BTreeMap<String, Vec<String>> {
104+
) -> BTreeMap<Vec<String>, Vec<String>> {
105+
#[allow(clippy::unwrap_used)] // propagating panics is ok
102106
let mut state = self.state.lock().unwrap();
103107
state
104108
.pending
@@ -118,7 +122,7 @@ impl ActionManager {
118122
time.to_rfc3339().chars().take(19).collect()
119123
})
120124
.collect();
121-
acc.insert(match_.join(" "), times);
125+
acc.insert(match_, times);
122126
acc
123127
})
124128
}
@@ -152,6 +156,7 @@ impl ActionManager {
152156

153157
pub fn quit(&mut self) {
154158
if self.action.on_exit() {
159+
#[allow(clippy::unwrap_used)] // propagating panics is ok
155160
let mut state = self.state.lock().unwrap();
156161
for (m, times) in &state.pending {
157162
for _ in times {

rust/src/daemon/database/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,17 @@ macro_rules! flush_or_die {
6464
pub fn database_manager(
6565
config: &'static Config,
6666
mut log_rx: mpsc::Receiver<DatabaseManagerInput>,
67-
matches_tx: BTreeMap<&Filter, mpsc::Sender<MFT>>,
67+
matches_tx: BTreeMap<&'static Filter, mpsc::Sender<MFT>>,
6868
) -> thread::JoinHandle<()> {
69-
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) {
70-
Ok(dbs) => dbs,
71-
Err(err) => {
72-
error!("while rotating databases on start: {}", err);
73-
exit(1);
74-
}
75-
};
76-
7769
thread::spawn(move || {
70+
let (mut log_db, mut flush_db) = match rotate_db(config, Some(matches_tx)) {
71+
Ok(dbs) => dbs,
72+
Err(err) => {
73+
error!("while rotating databases on start: {}", err);
74+
exit(1);
75+
}
76+
};
77+
7878
let mut cpt = 0;
7979
while let Some(order) = log_rx.blocking_recv() {
8080
match order {
@@ -177,7 +177,7 @@ fn _rotate_db(
177177

178178
// Read flushes
179179
let mut flushes: BTreeMap<&'static Filter, BTreeMap<Match, Time>> = BTreeMap::new();
180-
while let Some(flush_entry) = flush_read_db.next() {
180+
for flush_entry in flush_read_db {
181181
match flush_entry {
182182
Ok(entry) => {
183183
let matches_map = flushes.entry(entry.f).or_default();
@@ -193,7 +193,7 @@ fn _rotate_db(
193193
let now = Local::now();
194194

195195
// Read matches
196-
while let Some(log_entry) = log_read_db.next() {
196+
for log_entry in log_read_db {
197197
match log_entry {
198198
Ok(mut entry) => {
199199
// Check if number of patterns is in sync

rust/src/daemon/filter.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl FilterManager {
9595
.for_each(|manager| manager.quit());
9696
}
9797

98-
pub fn handle_order(
98+
pub async fn handle_order(
9999
&mut self,
100100
patterns: &BTreeMap<Arc<Pattern>, Regex>,
101101
order: Order,
@@ -110,18 +110,17 @@ impl FilterManager {
110110
.all(|(a_match, regex)| regex.is_match(a_match))
111111
};
112112

113-
let cs = self
114-
.matches
115-
.clone()
116-
.iter()
113+
let matches = self.matches.clone();
114+
let cs: BTreeMap<_, _> = matches
115+
.into_iter()
117116
// match filtering
118117
.filter(|(match_, _)| is_match(match_))
119118
.map(|(match_, times)| {
120119
if let Order::Flush = order {
121-
self.remove_match(match_);
120+
self.remove_match(&match_);
122121
}
123122
(
124-
match_.join(" "),
123+
match_,
125124
PatternStatus {
126125
matches: times.len(),
127126
..Default::default()
@@ -130,15 +129,31 @@ impl FilterManager {
130129
})
131130
.collect();
132131

133-
self.action_managers.iter().fold(cs, |mut acc, manager| {
132+
let cs = self.action_managers.iter().fold(cs, |mut acc, manager| {
134133
for (match_, times) in manager.handle_order(order, is_match) {
135134
let pattern_status = acc.entry(match_).or_default();
136135
pattern_status
137136
.actions
138137
.insert(manager.action().to_string(), times);
139138
}
140139
acc
141-
})
140+
});
141+
142+
let now = Local::now();
143+
for match_ in cs.keys() {
144+
#[allow(clippy::unwrap_used)] // propagating panics is ok
145+
self.log_tx
146+
.send(DatabaseManagerInput::Flush(LogEntry {
147+
exec: false,
148+
m: match_.to_vec(),
149+
f: self.filter,
150+
t: now,
151+
}))
152+
.await
153+
.unwrap()
154+
}
155+
156+
cs.into_iter().map(|(k, v)| (k.join(" "), v)).collect()
142157
}
143158

144159
fn add_match(&mut self, m: &Match, t: Time) {
@@ -162,6 +177,7 @@ impl FilterManager {
162177
.first_key_value()
163178
.is_some_and(|(k, _)| *k + retry_duration < now)
164179
{
180+
#[allow(clippy::unwrap_used)] // we just checked in the condition that first is_some
165181
let (_, m) = self.ordered_times.pop_first().unwrap();
166182
self.matches.remove(&m);
167183
}

rust/src/daemon/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::BTreeMap,
2+
collections::{BTreeMap, HashMap},
33
error::Error,
44
path::PathBuf,
55
sync::{
@@ -8,7 +8,6 @@ use std::{
88
},
99
};
1010

11-
use socket::socket_manager;
1211
use tokio::{
1312
process::Child,
1413
select,
@@ -20,6 +19,7 @@ use tracing::info;
2019
use crate::concepts::{Config, Filter, Stream};
2120
use database::database_manager;
2221
use filter::FilterManager;
22+
use socket::socket_manager;
2323
use stream::stream_manager;
2424

2525
mod database;
@@ -31,7 +31,7 @@ mod stream;
3131

3232
// type SharedState = BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>;
3333
struct SharedState {
34-
pub s: BTreeMap<&'static Stream, Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>>,
34+
pub s: HashMap<&'static Stream, Arc<Mutex<HashMap<&'static Filter, FilterManager>>>>,
3535
}
3636

3737
// #[allow(unsafe_code)]
@@ -66,10 +66,10 @@ pub async fn daemon(
6666
};
6767

6868
// Filter managers
69-
let mut stream_filter_managers_handlers = BTreeMap::new();
69+
let mut stream_filter_managers_handlers = HashMap::new();
7070
let mut log2filter_tx = BTreeMap::new();
7171
for stream in config.streams().values() {
72-
let mut filter_managers_handlers = BTreeMap::new();
72+
let mut filter_managers_handlers = HashMap::new();
7373
for filter in stream.filters().values() {
7474
let manager = FilterManager::new(
7575
filter,
@@ -92,10 +92,11 @@ pub async fn daemon(
9292
database_manager(config, log_rx, log2filter_tx)
9393
};
9494

95-
let mut stream_filter_managers = SharedState { s: BTreeMap::new() };
95+
let mut stream_filter_managers = SharedState { s: HashMap::new() };
9696
for (stream, filter_manager_handlers) in stream_filter_managers_handlers {
97-
let mut filter_managers = BTreeMap::new();
97+
let mut filter_managers = HashMap::new();
9898
for (filter, filter_manager_handler) in filter_manager_handlers {
99+
#[allow(clippy::unwrap_used)] // propagating panics is ok
99100
filter_managers.insert(filter, filter_manager_handler.await.unwrap());
100101
}
101102
stream_filter_managers
@@ -128,7 +129,6 @@ pub async fn daemon(
128129

129130
let socket_manager_task_handle = {
130131
let socket = socket.to_owned();
131-
let stream_filter_managers = stream_filter_managers.clone();
132132
tokio::spawn(async move {
133133
socket_manager(config, socket, stream_filter_managers, shutdown_rx).await
134134
})

rust/src/daemon/socket.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ fn open_socket(path: PathBuf) -> Result<UnixListener, String> {
5050

5151
async fn answer_order(
5252
config: &'static Config,
53-
shared_state: &SharedState,
53+
shared_state: &Arc<SharedState>,
5454
options: ClientRequest,
5555
) -> Result<ClientStatus, String> {
5656
// Compute options
@@ -82,17 +82,22 @@ async fn answer_order(
8282
})
8383
.collect::<Result<BTreeMap<Arc<Pattern>, Regex>, String>>()?;
8484

85-
let cs: ClientStatus = futures::stream::iter(shared_state.s.iter())
86-
// stream filtering
87-
.filter(|(stream, _)| async {
88-
stream_name.is_none()
89-
|| stream_name
90-
.clone()
91-
.is_some_and(|name| name == stream.name())
92-
})
93-
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async {
94-
let mut filter_manager = filter_manager.lock().await;
95-
let inner_map = filter_manager
85+
let cs: ClientStatus = futures::stream::iter(
86+
shared_state
87+
.s
88+
.iter()
89+
// stream filtering
90+
.filter(|(stream, _)| {
91+
stream_name.is_none()
92+
|| stream_name
93+
.clone()
94+
.is_some_and(|name| name == stream.name())
95+
}),
96+
)
97+
.fold(BTreeMap::new(), |mut acc, (stream, filter_manager)| async {
98+
let mut filter_manager = filter_manager.lock().await;
99+
let inner_map = futures::stream::iter(
100+
filter_manager
96101
.iter_mut()
97102
// filter filtering
98103
.filter(|(filter, _)| {
@@ -106,18 +111,20 @@ async fn answer_order(
106111
patterns
107112
.iter()
108113
.all(|(pattern, _)| filter.patterns().get(pattern).is_some())
109-
})
110-
.map(|(filter, manager)| {
111-
(
112-
filter.name().to_owned(),
113-
manager.handle_order(&patterns, options.order),
114-
)
115-
})
116-
.collect();
117-
acc.insert(stream.name().to_owned(), inner_map);
118-
acc
114+
}),
115+
)
116+
.then(|(filter, manager)| async {
117+
(
118+
filter.name().to_owned(),
119+
manager.handle_order(&patterns, options.order).await,
120+
)
119121
})
122+
.collect()
120123
.await;
124+
acc.insert(stream.name().to_owned(), inner_map);
125+
acc
126+
})
127+
.await;
121128

122129
Ok(cs)
123130
}

rust/src/daemon/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
1+
use std::{collections::HashMap, process::Stdio, sync::Arc};
22

33
use tokio::{
44
io::{AsyncBufReadExt, BufReader},
@@ -15,7 +15,7 @@ use crate::{
1515
pub async fn stream_manager(
1616
stream: &'static Stream,
1717
child_tx: oneshot::Sender<Option<Child>>,
18-
filter_managers: Arc<Mutex<BTreeMap<&'static Filter, FilterManager>>>,
18+
filter_managers: Arc<Mutex<HashMap<&'static Filter, FilterManager>>>,
1919
) {
2020
info!("{}: start {:?}", stream.name(), stream.cmd());
2121
let mut child = match Command::new(&stream.cmd()[0])

rust/tests/simple.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ async fn simple() {
146146

147147
file_with_contents(out_path, "");
148148

149-
assert!(daemon(config_path.into(), socket_path.into()).await.is_ok());
149+
assert!(daemon(config_path.into(), socket_path.into())
150+
.await
151+
.is_err());
150152

151153
// 36 from DB
152154
// 12 from DB

0 commit comments

Comments
 (0)