Skip to content
This repository was archived by the owner on Jun 5, 2024. It is now read-only.

Commit f3f20a0

Browse files
fix mocking layer when using multiple databases (#67)
1 parent de95b6c commit f3f20a0

File tree

7 files changed

+184
-87
lines changed

7 files changed

+184
-87
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fred"
3-
version = "1.2.2"
3+
version = "1.2.3"
44
authors = ["Alec Embke <[email protected]>"]
55
edition = "2018"
66
description = "A Redis client for Rust built on Futures and Tokio."

examples/http.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ const INTERFACE: &'static str = "127.0.0.1";
4646
const PORT: u16 = 3000;
4747

4848
#[derive(Clone)]
49-
pub struct HttpInterface<'a> {
50-
client: &'a RedisClient
49+
pub struct HttpInterface {
50+
client: RedisClient
5151
}
5252

53-
impl<'a> HttpInterface<'a> {
53+
impl HttpInterface {
5454

55-
pub fn new(client: &'a RedisClient) -> HttpInterface<'a> {
55+
pub fn new(client: RedisClient) -> HttpInterface {
5656
HttpInterface { client }
5757
}
5858

5959
}
6060

61-
impl<'a> Service for HttpInterface<'a> {
61+
impl Service for HttpInterface {
6262
type Request = Request;
6363
type Response = Response;
6464
type Error = hyper::Error;
@@ -116,7 +116,7 @@ fn main() {
116116
// give the service its own clone of the client
117117
let http_client = client.clone();
118118
let server = Http::new().bind(&addr, move || {
119-
Ok(HttpInterface::new(&http_client))
119+
Ok(HttpInterface::new(http_client.clone()))
120120
});
121121

122122
let server = match server {

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ extern crate tokio_timer_patched as tokio_timer;
1717
extern crate tokio_io;
1818
extern crate rand;
1919

20+
#[macro_use]
21+
extern crate lazy_static;
22+
2023
#[macro_use]
2124
extern crate log;
2225
extern crate pretty_env_logger;

src/mocks/commands.rs

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::rc::Rc;
1111
use std::collections::BTreeMap;
1212

1313
use crate::protocol::types::*;
14+
use std::ops::{DerefMut, Deref};
15+
use std::sync::Arc;
1416

1517

1618
pub fn log_unimplemented(command: &RedisCommand) -> Result<Frame, RedisError> {
@@ -97,7 +99,7 @@ pub fn set(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
9799
data.key_types.insert(key.clone(), KeyType::Data);
98100

99101
let now = Instant::now();
100-
let _ = data.expirations.borrow_mut().add(&key, ExpireLog {
102+
let _ = data.expirations.write().deref_mut().add(&key, ExpireLog {
101103
after: now + Duration::from_millis(count as u64),
102104
internal: Some((now, (key.clone())))
103105
})?;
@@ -134,7 +136,7 @@ pub fn set(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
134136
data.key_types.insert(key.clone(), KeyType::Data);
135137

136138
let now = Instant::now();
137-
let _ = data.expirations.borrow_mut().add(&key, ExpireLog {
139+
let _ = data.expirations.write().deref_mut().add(&key, ExpireLog {
138140
after: now + Duration::from_millis(count as u64),
139141
internal: Some((now, (key.clone())))
140142
})?;
@@ -201,7 +203,7 @@ pub fn get(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
201203
}
202204

203205
pub fn del(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
204-
let keys: Vec<(KeyType, Rc<RedisKey>)> = args.into_iter().filter_map(|s| {
206+
let keys: Vec<(KeyType, Arc<RedisKey>)> = args.into_iter().filter_map(|s| {
205207
let k = match s {
206208
RedisValue::String(s) => s,
207209
RedisValue::Integer(i) => i.to_string(),
@@ -216,15 +218,15 @@ pub fn del(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redis
216218

217219
Some((kind, k))
218220
})
219-
.collect();
221+
.collect();
220222

221223
let mut deleted = 0;
222224
for (kind, key) in keys.into_iter() {
223225
if data.keys.remove(&key) {
224226
deleted += 1;
225227
}
226228
let _ = data.key_types.remove(&key);
227-
let _ = data.expirations.borrow_mut().del(&key);
229+
let _ = data.expirations.write().deref_mut().del(&key);
228230

229231
match kind {
230232
KeyType::Data => { data.data.remove(&key); },
@@ -260,7 +262,7 @@ pub fn expire(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Re
260262
let now = Instant::now();
261263
let _key = key.clone();
262264

263-
let _ = data.expirations.borrow_mut().add(&_key, ExpireLog {
265+
let _ = data.expirations.write().deref_mut().add(&_key, ExpireLog {
264266
after: now + Duration::from_millis(ms as u64),
265267
internal: Some((now, (_key.clone())))
266268
})?;
@@ -281,7 +283,7 @@ pub fn persist(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, R
281283
};
282284
let key = utils::get_key(&*data, key);
283285

284-
let count = data.expirations.borrow_mut().del(&key)?;
286+
let count = data.expirations.write().deref_mut().del(&key)?;
285287
Ok(Frame::Integer(count as i64))
286288
}
287289

@@ -379,7 +381,7 @@ pub fn hset(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redi
379381
1
380382
};
381383

382-
let _ = inner.insert(Rc::new(field), value);
384+
let _ = inner.insert(Arc::new(field), value);
383385

384386
Ok(Frame::Integer(res))
385387
}
@@ -629,13 +631,53 @@ pub fn ping(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, Redi
629631
Ok(Frame::SimpleString("PONG".into()))
630632
}
631633

632-
pub fn flushall(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
633-
data.data.clear();
634-
data.maps.clear();
635-
data.sets.clear();
636-
data.key_types.clear();
637-
data.keys.clear();
638-
utils::clear_expirations(&data.expirations);
634+
pub fn flushall(mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
635+
let global_data = utils::global_data_set();
636+
637+
for data_ref in global_data.read().deref().values() {
638+
let mut data_guard = data_ref.write();
639+
let mut data = data_guard.deref_mut();
640+
641+
data.data.clear();
642+
data.maps.clear();
643+
data.sets.clear();
644+
data.key_types.clear();
645+
data.keys.clear();
646+
utils::clear_expirations(&data.expirations);
647+
}
639648

640649
utils::ok()
650+
}
651+
652+
pub fn smembers(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
653+
args.reverse();
654+
655+
let key = match args.pop() {
656+
Some(RedisValue::String(s)) => s,
657+
Some(RedisValue::Integer(i)) => i.to_string(),
658+
_ => return Err(RedisError::new(
659+
RedisErrorKind::InvalidArgument, "Invalid key."
660+
))
661+
};
662+
let key = utils::get_key(&*data, key);
663+
664+
let members = match data.sets.get(&key) {
665+
Some(m) => m,
666+
None => return Ok(Frame::Array(vec![]))
667+
};
668+
669+
let mut out = Vec::with_capacity(members.len());
670+
for member in members.iter() {
671+
out.push(Frame::BulkString(member.key.as_bytes().to_vec()));
672+
}
673+
674+
Ok(Frame::Array(out))
675+
}
676+
677+
pub fn publish(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
678+
Ok(Frame::Integer(1))
679+
}
680+
681+
pub fn subscribe(data: &mut DataSet, mut args: Vec<RedisValue>) -> Result<Frame, RedisError> {
682+
Ok(Frame::Integer(1))
641683
}

src/mocks/mod.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,30 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
4040
let expire_tx = tx.clone();
4141
multiplexer_utils::set_command_tx(&inner, tx);
4242

43-
let data = DataSet::default();
44-
let expirations = data.expirations.clone();
43+
let data = utils::global_data_set();
4544

4645
let expire_ft = inner.timer.interval(Duration::from_secs(1)).map_err(|_| ()).for_each(move |_| {
4746
trace!("Starting to scan for expired keys.");
47+
let global_data = utils::global_data_set();
4848

49-
let expired = {
50-
let mut expiration_ref = expirations.borrow_mut();
49+
for data_ref in global_data.read().deref().values() {
50+
let data_guard = data_ref.read();
51+
let data = data_guard.deref();
52+
let expirations = data.expirations.clone();
5153

52-
let expired = expiration_ref.find_expired();
53-
expiration_ref.cleanup();
54+
let expired = {
55+
let mut expiration_guard = expirations.write();
56+
let mut expiration_ref = expiration_guard.deref_mut();
5457

55-
expired
56-
};
58+
let expired = expiration_ref.find_expired();
59+
expiration_ref.cleanup();
5760

58-
trace!("Cleaning up mock {} expired keys", expired.len());
59-
utils::cleanup_keys(&expire_tx, expired);
61+
expired
62+
};
63+
64+
trace!("Cleaning up mock {} expired keys", expired.len());
65+
utils::cleanup_keys(&expire_tx, expired);
66+
}
6067

6168
Ok::<(), ()>(())
6269
});
@@ -72,10 +79,20 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
7279
Ok::<(), ()>(())
7380
});
7481

75-
Box::new(rx.from_err::<RedisError>().fold((handle, inner, data, None), |(handle, inner, mut data, err), mut command| {
82+
Box::new(rx.from_err::<RedisError>().fold((handle, inner, 0, None), |(handle, inner, mut db, err), mut command| {
7683
debug!("{} Handling redis command {:?}", n!(inner), command.kind);
7784
client_utils::decr_atomic(&inner.cmd_buffer_len);
7885

86+
if command.kind == RedisCommandKind::Select {
87+
db = command.args.first().map(|v| {
88+
match v {
89+
RedisValue::Integer(i) => *i as u8,
90+
_ => panic!("Invalid redis database in mock layer.")
91+
}
92+
}).unwrap_or(db);
93+
}
94+
let data = utils::global_data_set_db(db);
95+
7996
if command.kind.is_close() {
8097
debug!("{} Recv close command on the command stream.", n!(inner));
8198

@@ -97,12 +114,12 @@ pub fn create_commands_ft(handle: Handle, inner: Arc<RedisClientInner>) -> Box<F
97114
return Err(RedisError::new_canceled());
98115
}
99116

100-
let result = utils::handle_command(&inner, &mut data, command);
117+
let result = utils::handle_command(&inner, &data, command);
101118
if let Some(resp_tx) = resp_tx {
102119
let _ = resp_tx.send(result);
103120
}
104121

105-
Ok((handle, inner, data, err))
122+
Ok((handle, inner, db, err))
106123
}
107124
})
108125
.map(|(_, _, _, err)| err)

src/mocks/types.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,20 @@ use futures::{
4444
Stream
4545
};
4646
use std::time::Instant;
47+
use std::sync::Arc;
48+
use parking_lot::RwLock;
4749

4850
#[derive(Clone, Debug, Eq, PartialEq)]
4951
pub struct ExpireLog {
5052
/// Timestamp of when to clean up, in ms.
5153
pub after: Instant,
5254
/// Timestamp of set operation, reference to the key. This is set by the library.
53-
pub internal: Option<(Instant, Rc<RedisKey>)>
55+
pub internal: Option<(Instant, Arc<RedisKey>)>
5456
}
5557

5658
impl ExpireLog {
5759

58-
pub fn set_internal(&mut self, set: Instant, key: &Rc<RedisKey>) {
60+
pub fn set_internal(&mut self, set: Instant, key: &Arc<RedisKey>) {
5961
self.internal = Some((set, key.clone()));
6062
}
6163

@@ -70,7 +72,7 @@ impl ExpireLog {
7072
}
7173
}
7274

73-
pub fn get_key(&self) -> Option<&Rc<RedisKey>> {
75+
pub fn get_key(&self) -> Option<&Arc<RedisKey>> {
7476
match self.internal {
7577
Some((_, ref key)) => Some(key),
7678
None => None
@@ -121,9 +123,9 @@ impl fmt::Display for KeyType {
121123
/// Uses a map of "dirty" logs to batch up slower operations on the heap.
122124
#[derive(Debug, Clone)]
123125
pub struct Expirations {
124-
pub expirations: BTreeMap<Rc<RedisKey>, Rc<ExpireLog>>,
125-
pub sorted: BinaryHeap<Rc<ExpireLog>>,
126-
pub dirty: BTreeMap<Rc<RedisKey>, Rc<ExpireLog>>
126+
pub expirations: BTreeMap<Arc<RedisKey>, Arc<ExpireLog>>,
127+
pub sorted: BinaryHeap<Arc<ExpireLog>>,
128+
pub dirty: BTreeMap<Arc<RedisKey>, Arc<ExpireLog>>
127129
}
128130

129131
impl Expirations {
@@ -137,12 +139,12 @@ impl Expirations {
137139
}
138140

139141
/// Add or update an expire log in the data set.
140-
pub fn add(&mut self, key: &Rc<RedisKey>, mut expiration: ExpireLog) -> Result<(), RedisError> {
142+
pub fn add(&mut self, key: &Arc<RedisKey>, mut expiration: ExpireLog) -> Result<(), RedisError> {
141143
if !expiration.has_internal() {
142144
expiration.set_internal(Instant::now(), key);
143145
}
144146

145-
let expiration = Rc::new(expiration);
147+
let expiration = Arc::new(expiration);
146148

147149
if let Some(old) = self.expirations.insert(key.clone(), expiration.clone()) {
148150
// move old value to deleted set for lazy deletion later
@@ -160,7 +162,7 @@ impl Expirations {
160162
Ok(())
161163
}
162164

163-
pub fn del(&mut self, key: &Rc<RedisKey>) -> Result<usize, RedisError> {
165+
pub fn del(&mut self, key: &Arc<RedisKey>) -> Result<usize, RedisError> {
164166
let old = match self.expirations.remove(key) {
165167
Some(old) => old,
166168
None => return Ok(0)
@@ -174,9 +176,9 @@ impl Expirations {
174176
self.dirty.len()
175177
}
176178

177-
pub fn find_expired(&mut self) -> Vec<Rc<ExpireLog>> {
179+
pub fn find_expired(&mut self) -> Vec<Arc<ExpireLog>> {
178180
let now = Instant::now();
179-
let mut out: Vec<Rc<ExpireLog>> = Vec::new();
181+
let mut out: Vec<Arc<ExpireLog>> = Vec::new();
180182

181183
while self.sorted.len() > 0 {
182184
let youngest = match self.sorted.pop() {
@@ -227,7 +229,7 @@ impl Expirations {
227229

228230
// do a full pass over the binary heap to remove things from the `dirty` map
229231
pub fn cleanup(&mut self) {
230-
let mut new_sorted: BinaryHeap<Rc<ExpireLog>> = BinaryHeap::new();
232+
let mut new_sorted: BinaryHeap<Arc<ExpireLog>> = BinaryHeap::new();
231233

232234
for expire in self.sorted.drain() {
233235
let expire_key = match expire.get_key() {
@@ -260,13 +262,13 @@ impl Expirations {
260262
}
261263

262264
pub struct DataSet {
263-
pub keys: BTreeSet<Rc<RedisKey>>,
264-
pub key_types: BTreeMap<Rc<RedisKey>, KeyType>,
265-
pub data: BTreeMap<Rc<RedisKey>, RedisValue>,
266-
pub maps: BTreeMap<Rc<RedisKey>, BTreeMap<Rc<RedisKey>, RedisValue>>,
267-
pub sets: BTreeMap<Rc<RedisKey>, BTreeSet<RedisKey>>,
268-
pub lists: BTreeMap<Rc<RedisKey>, VecDeque<RedisValue>>,
269-
pub expirations: Rc<RefCell<Expirations>>,
265+
pub keys: BTreeSet<Arc<RedisKey>>,
266+
pub key_types: BTreeMap<Arc<RedisKey>, KeyType>,
267+
pub data: BTreeMap<Arc<RedisKey>, RedisValue>,
268+
pub maps: BTreeMap<Arc<RedisKey>, BTreeMap<Arc<RedisKey>, RedisValue>>,
269+
pub sets: BTreeMap<Arc<RedisKey>, BTreeSet<RedisKey>>,
270+
pub lists: BTreeMap<Arc<RedisKey>, VecDeque<RedisValue>>,
271+
pub expirations: Arc<RwLock<Expirations>>,
270272
}
271273

272274
impl Default for DataSet {
@@ -279,7 +281,7 @@ impl Default for DataSet {
279281
maps: BTreeMap::new(),
280282
sets: BTreeMap::new(),
281283
lists: BTreeMap::new(),
282-
expirations: Rc::new(RefCell::new(Expirations::new()))
284+
expirations: Arc::new(RwLock::new(Expirations::new()))
283285
}
284286
}
285287

0 commit comments

Comments
 (0)