Skip to content

Commit cc8a813

Browse files
committed
Merge branch 'sorted-batches'
2 parents 4156040 + 31fdaa3 commit cc8a813

File tree

6 files changed

+417
-266
lines changed

6 files changed

+417
-266
lines changed

src/flow/domain/local.rs

Lines changed: 28 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -200,66 +200,40 @@ enum KeyedState<T: Eq + Hash> {
200200
Sex(FnvHashMap<(T, T, T, T, T, T), Vec<Row<Vec<T>>>>),
201201
}
202202

203-
impl<'a, T: 'static + Eq + Hash + Clone> From<&'a [T]> for KeyType<'a, T> {
204-
fn from(other: &'a [T]) -> Self {
205-
match other.len() {
206-
0 => unreachable!(),
207-
1 => KeyType::Single(&other[0]),
208-
2 => KeyType::Double((other[0].clone(), other[1].clone())),
209-
3 => KeyType::Tri((other[0].clone(), other[1].clone(), other[2].clone())),
210-
4 => KeyType::Quad((
211-
other[0].clone(),
212-
other[1].clone(),
213-
other[2].clone(),
214-
other[3].clone(),
215-
)),
216-
5 => KeyType::Quin((
217-
other[0].clone(),
218-
other[1].clone(),
219-
other[2].clone(),
220-
other[3].clone(),
221-
other[4].clone(),
222-
)),
223-
6 => KeyType::Sex((
224-
other[0].clone(),
225-
other[1].clone(),
226-
other[2].clone(),
227-
other[3].clone(),
228-
other[4].clone(),
229-
other[5].clone(),
230-
)),
231-
_ => unimplemented!(),
232-
}
233-
}
234-
}
235-
236-
impl<'a, T: 'static + Eq + Hash + Clone> From<&'a [&'a T]> for KeyType<'a, T> {
237-
fn from(other: &'a [&'a T]) -> Self {
238-
match other.len() {
203+
impl<'a, T: 'static + Eq + Hash + Clone> KeyType<'a, T> {
204+
pub fn from<I>(other: I) -> Self
205+
where
206+
I: IntoIterator<Item = &'a T>,
207+
<I as IntoIterator>::IntoIter: ExactSizeIterator,
208+
{
209+
let mut other = other.into_iter();
210+
let len = other.len();
211+
let mut more = move || other.next().unwrap();
212+
match len {
239213
0 => unreachable!(),
240-
1 => KeyType::Single(other[0]),
241-
2 => KeyType::Double((other[0].clone(), other[1].clone())),
242-
3 => KeyType::Tri((other[0].clone(), other[1].clone(), other[2].clone())),
214+
1 => KeyType::Single(more()),
215+
2 => KeyType::Double((more().clone(), more().clone())),
216+
3 => KeyType::Tri((more().clone(), more().clone(), more().clone())),
243217
4 => KeyType::Quad((
244-
other[0].clone(),
245-
other[1].clone(),
246-
other[2].clone(),
247-
other[3].clone(),
218+
more().clone(),
219+
more().clone(),
220+
more().clone(),
221+
more().clone(),
248222
)),
249223
5 => KeyType::Quin((
250-
other[0].clone(),
251-
other[1].clone(),
252-
other[2].clone(),
253-
other[3].clone(),
254-
other[4].clone(),
224+
more().clone(),
225+
more().clone(),
226+
more().clone(),
227+
more().clone(),
228+
more().clone(),
255229
)),
256230
6 => KeyType::Sex((
257-
other[0].clone(),
258-
other[1].clone(),
259-
other[2].clone(),
260-
other[3].clone(),
261-
other[4].clone(),
262-
other[5].clone(),
231+
more().clone(),
232+
more().clone(),
233+
more().clone(),
234+
more().clone(),
235+
more().clone(),
236+
more().clone(),
263237
)),
264238
_ => unimplemented!(),
265239
}

src/ops/grouped/aggregate.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ impl GroupedOperation for Aggregator {
9292
}
9393
}
9494

95-
fn apply(&self, current: Option<&DataType>, diffs: Vec<Self::Diff>) -> DataType {
95+
fn apply(
96+
&self,
97+
current: Option<&DataType>,
98+
diffs: &mut Iterator<Item = Self::Diff>,
99+
) -> DataType {
96100
let n = match current {
97101
Some(&DataType::Int(n)) => n as i64,
98102
Some(&DataType::BigInt(n)) => n,

src/ops/grouped/concat.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,11 @@ impl GroupedOperation for GroupConcat {
149149
}
150150
}
151151

152-
fn apply(&self, current: Option<&DataType>, diffs: Vec<Self::Diff>) -> DataType {
152+
fn apply(
153+
&self,
154+
current: Option<&DataType>,
155+
diffs: &mut Iterator<Item = Self::Diff>,
156+
) -> DataType {
153157
use std::collections::BTreeSet;
154158
use std::iter::FromIterator;
155159

@@ -168,14 +172,18 @@ impl GroupedOperation for GroupConcat {
168172
let clen = current.len();
169173

170174
// TODO this is not particularly robust, and requires a non-empty separator
171-
let mut current = BTreeSet::from_iter(current.split_terminator(&self.separator));
172-
for diff in &diffs {
173-
match *diff {
174-
Modify::Add(ref s) => {
175-
current.insert(s);
175+
let mut current = BTreeSet::from_iter(
176+
current
177+
.split_terminator(&self.separator)
178+
.map(|s| Cow::Borrowed(s)),
179+
);
180+
for diff in diffs {
181+
match diff {
182+
Modify::Add(s) => {
183+
current.insert(Cow::Owned(s));
176184
}
177-
Modify::Remove(ref s) => {
178-
current.remove(&**s);
185+
Modify::Remove(s) => {
186+
current.remove(&*s);
179187
}
180188
}
181189
}
@@ -184,7 +192,7 @@ impl GroupedOperation for GroupConcat {
184192
let mut new = current
185193
.into_iter()
186194
.fold(String::with_capacity(2 * clen), |mut acc, s| {
187-
acc.push_str(s);
195+
acc.push_str(&*s);
188196
acc.push_str(&self.separator);
189197
acc
190198
});

src/ops/grouped/extremum.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,11 @@ impl GroupedOperation for ExtremumOperator {
102102
}
103103
}
104104

105-
fn apply(&self, current: Option<&DataType>, diffs: Vec<Self::Diff>) -> DataType {
105+
fn apply(
106+
&self,
107+
current: Option<&DataType>,
108+
diffs: &mut Iterator<Item = Self::Diff>,
109+
) -> DataType {
106110
// Extreme values are those that are at least as extreme as the current min/max (if any).
107111
// let mut is_extreme_value : Box<Fn(i64) -> bool> = Box::new(|_|true);
108112
let mut extreme_values: Vec<i64> = vec![];

src/ops/grouped/mod.rs

Lines changed: 114 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fmt;
22
use std::collections::HashMap;
3+
use std::cmp::Ordering;
34

45
use flow::prelude::*;
56

@@ -50,7 +51,11 @@ pub trait GroupedOperation: fmt::Debug + Clone {
5051

5152
/// Given the given `current` value, and a number of changes for a group (`diffs`), compute the
5253
/// updated group value.
53-
fn apply(&self, current: Option<&DataType>, diffs: Vec<Self::Diff>) -> DataType;
54+
fn apply(
55+
&self,
56+
current: Option<&DataType>,
57+
diffs: &mut Iterator<Item = Self::Diff>,
58+
) -> DataType;
5459

5560
fn description(&self) -> String;
5661
}
@@ -153,81 +158,123 @@ where
153158
};
154159
}
155160

161+
let group_by = &self.group_by;
162+
let cmp = |a: &Record, b: &Record| {
163+
group_by
164+
.iter()
165+
.map(|&col| &a[col])
166+
.cmp(group_by.iter().map(|&col| &b[col]))
167+
};
168+
156169
// First, we want to be smart about multiple added/removed rows with same group.
157170
// For example, if we get a -, then a +, for the same group, we don't want to
158-
// execute two queries.
159-
let mut consolidate = HashMap::new();
160-
for rec in rs {
161-
let val = self.inner.to_diff(&rec[..], rec.is_positive());
162-
163-
let mut group = rec.extract().0;
164-
for (i, &col) in self.group_by.iter().enumerate() {
165-
group[i] = group[col].clone();
166-
}
167-
group.resize(self.group_by.len(), DataType::None);
168-
consolidate.entry(group).or_insert_with(Vec::new).push(val);
169-
}
171+
// execute two queries. We'll do this by sorting the batch by our group by.
172+
let mut rs: Vec<_> = rs.into();
173+
rs.sort_by(&cmp);
170174

175+
// find the current value for this group
171176
let us = self.us.unwrap();
172-
let mut misses = Vec::new();
173-
let mut out = Vec::with_capacity(2 * consolidate.len());
174-
for (group, diffs) in consolidate {
175-
// find the current value for this group
176-
let db = state
177-
.get(&*us)
178-
.expect("grouped operators must have their own state materialized");
179-
180-
let old = match db.lookup(&self.out_key[..], &KeyType::from(&group[..])) {
181-
LookupResult::Some(rs) => {
182-
debug_assert!(rs.len() <= 1, "a group had more than 1 result");
183-
rs.get(0)
184-
}
185-
LookupResult::Missing => {
186-
misses.push(Miss {
187-
node: *us,
188-
columns: self.out_key.clone(),
189-
replay_key: replay_key_col.map(|col| {
190-
// since group columns go first in our output, and the replay key must
191-
// be on our group by column (partial can't go through generated
192-
// columns), this column should be < group.len()
193-
debug_assert!(col < group.len());
194-
vec![group[col].clone()]
195-
}),
196-
key: group,
197-
});
198-
continue;
199-
}
200-
};
201-
202-
let (current, new) = {
203-
use std::borrow::Cow;
177+
let db = state
178+
.get(&*us)
179+
.expect("grouped operators must have their own state materialized");
204180

205-
// current value is in the last output column
206-
// or "" if there is no current group
207-
let current = old.map(|r| Cow::Borrowed(&r[r.len() - 1]));
208-
209-
// new is the result of applying all diffs for the group to the current value
210-
let new = self.inner.apply(current.as_ref().map(|v| &**v), diffs);
211-
(current, new)
212-
};
213-
214-
match current {
215-
Some(ref current) if new == **current => {
216-
// no change
217-
}
218-
_ => {
219-
if let Some(old) = old {
220-
// revoke old value
221-
debug_assert!(current.is_some());
222-
out.push(Record::Negative((**old).clone()));
181+
let mut misses = Vec::new();
182+
let mut out = Vec::new();
183+
{
184+
let out_key = &self.out_key;
185+
let mut handle_group =
186+
|inner: &mut T, group_r: Record, mut diffs: ::std::vec::Drain<_>| {
187+
let (group_r, _) = group_r.extract();
188+
let mut group_by_i = 0;
189+
let mut group = Vec::with_capacity(group_by.len() + 1);
190+
for (col, v) in group_r.into_iter().enumerate() {
191+
if col == group_by[group_by_i] {
192+
group.push(v);
193+
group_by_i += 1;
194+
if group_by_i == group_by.len() {
195+
break;
196+
}
197+
}
223198
}
224199

225-
// emit positive, which is group + new.
226-
let mut rec = group;
227-
rec.push(new);
228-
out.push(Record::Positive(rec));
200+
let old = {
201+
match db.lookup(&out_key[..], &KeyType::from(&group[..])) {
202+
LookupResult::Some(rs) => {
203+
debug_assert!(rs.len() <= 1, "a group had more than 1 result");
204+
rs.get(0)
205+
}
206+
LookupResult::Missing => {
207+
misses.push(Miss {
208+
node: *us,
209+
columns: out_key.clone(),
210+
replay_key: replay_key_col.map(|col| {
211+
// since group columns go first in our output, and the replay
212+
// key must be on our group by column (partial can't go through
213+
// generated columns), this column should be < group.len()
214+
debug_assert!(col < group.len());
215+
vec![group[col].clone()]
216+
}),
217+
key: group,
218+
});
219+
return;
220+
}
221+
}
222+
};
223+
224+
let (current, new) = {
225+
use std::borrow::Cow;
226+
227+
// current value is in the last output column
228+
// or "" if there is no current group
229+
let current = old.map(|r| Cow::Borrowed(&r[r.len() - 1]));
230+
231+
// new is the result of applying all diffs for the group to the current value
232+
let new = inner.apply(current.as_ref().map(|v| &**v), &mut diffs as &mut _);
233+
(current, new)
234+
};
235+
236+
match current {
237+
Some(ref current) if new == **current => {
238+
// no change
239+
}
240+
_ => {
241+
if let Some(old) = old {
242+
// revoke old value
243+
debug_assert!(current.is_some());
244+
out.push(Record::Negative((**old).clone()));
245+
}
246+
247+
// emit positive, which is group + new.
248+
let mut rec = group;
249+
rec.push(new);
250+
out.push(Record::Positive(rec));
251+
}
252+
}
253+
};
254+
255+
let mut prev_group_r = None;
256+
let mut diffs = Vec::new();
257+
for r in rs {
258+
if prev_group_r.is_some()
259+
&& cmp(prev_group_r.as_ref().unwrap(), &r) != Ordering::Equal
260+
{
261+
handle_group(
262+
&mut self.inner,
263+
prev_group_r.take().unwrap(),
264+
diffs.drain(..),
265+
);
266+
}
267+
diffs.push(self.inner.to_diff(&r[..], r.is_positive()));
268+
if prev_group_r.is_none() {
269+
prev_group_r = Some(r);
229270
}
230271
}
272+
assert!(!diffs.is_empty());
273+
handle_group(
274+
&mut self.inner,
275+
prev_group_r.take().unwrap(),
276+
diffs.drain(..),
277+
);
231278
}
232279

233280
ProcessingResult {

0 commit comments

Comments
 (0)