Skip to content

Commit 3bcd692

Browse files
authored
feat!: partition data to payloads (#201)
* feat!: partition data to payloads * feat!: unit test * feat!: unit test
1 parent 0e2963b commit 3bcd692

File tree

8 files changed

+359
-91
lines changed

8 files changed

+359
-91
lines changed

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ codecov:
1111

1212
ignore:
1313
- "src/legacy"
14+
- "src/function/src/aws/lambda.rs"
1415
- "src/driver/src/build"
1516
- "**/*.md"

src/driver/src/deploy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl ExecutionEnvironment {
7474
for (_, ctx) in flow.ctx.iter() {
7575
let _: Vec<_> = lambda::function_name(&ctx)
7676
.iter()
77-
.map(move |name| async move {
77+
.map(|name| async move {
7878
client
7979
.create_function(CreateFunctionRequest {
8080
code: lambda::function_code(),

src/function/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@ snmalloc = [ "snmalloc-rs" ]
1414
arrow = { git = "https://github.com/DSLAM-UMD/arrow", branch = "scq", features = [ "simd" ] }
1515
aws_lambda_events = "0.4"
1616
datafusion = { git = "https://github.com/DSLAM-UMD/arrow", branch = "scq", features = [ "simd" ] }
17+
futures = "0.3.12"
1718
lambda_runtime = { git = "https://github.com/awslabs/aws-lambda-rust-runtime/", branch = "master" }
19+
log = "0.4.14"
1820
rayon = "1.5"
19-
2021
runtime = { path = "../runtime" }
22+
rusoto_core = "0.46.0"
23+
24+
rusoto_lambda = "0.46.0"
2125
serde_json = "1.0"
2226
snmalloc-rs = { version = "0.2", optional = true, features = [ "cache-friendly" ] }
2327
tokio = { version = "1.2", features = [ "macros", "io-util", "sync", "rt-multi-thread" ] }

src/function/src/aws/lambda.rs

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@
1414

1515
//! The generic lambda function for sub-plan execution on AWS Lambda.
1616
17+
use arrow::record_batch::RecordBatch;
1718
use aws_lambda_events::event::kafka::KafkaEvent;
1819
use aws_lambda_events::event::kinesis::KinesisEvent;
1920
use datafusion::physical_plan::Partitioning;
21+
use futures::executor::block_on;
2022
use lambda_runtime::{handler_fn, Context};
23+
use log::warn;
2124
use runtime::prelude::*;
25+
use rusoto_core::Region;
26+
use rusoto_lambda::{InvokeAsyncRequest, Lambda, LambdaClient};
2227
use serde_json::Value;
2328
use std::cell::Cell;
2429
use std::sync::Once;
@@ -78,6 +83,42 @@ async fn main() -> Result<()> {
7883
Ok(())
7984
}
8085

86+
/// Invoke functions in the next stage of the data flow.
87+
fn invoke_async_functions(ctx: &ExecutionContext, batches: &mut Vec<RecordBatch>) -> Result<()> {
88+
// retrieve the next lambda function names
89+
let next_func = LambdaExecutor::next_function(&ctx)?;
90+
91+
// create uuid builder to assign id to each payload
92+
let mut uuid_builder = UuidBuilder::new(&ctx.name, batches.len());
93+
94+
let client = &LambdaClient::new(Region::default());
95+
let nums = batches.len();
96+
(0..nums).for_each(|_| {
97+
let uuid = uuid_builder.next();
98+
// call the lambda function asynchronously until it succeeds.
99+
loop {
100+
let request = InvokeAsyncRequest {
101+
function_name: next_func.clone(),
102+
invoke_args: Payload::to_bytes(&[batches.pop().unwrap()], uuid.clone()),
103+
};
104+
105+
if let Ok(reponse) = block_on(client.invoke_async(request)) {
106+
if let Some(code) = reponse.status {
107+
// A success response (202 Accepted) indicates that the request
108+
// is queued for invocation.
109+
if code == 202 {
110+
break;
111+
} else {
112+
warn!("Unknown invoke error: {}, retry ... ", code);
113+
}
114+
}
115+
}
116+
}
117+
});
118+
119+
Ok(())
120+
}
121+
81122
async fn source_handler(ctx: &mut ExecutionContext, event: Value) -> Result<Value> {
82123
let batch = match &ctx.datasource {
83124
DataSource::KinesisEvent(_) => {
@@ -142,19 +183,28 @@ async fn source_handler(ctx: &mut ExecutionContext, event: Value) -> Result<Valu
142183
LambdaExecutor::event_sink(vec![batches]).await
143184
}
144185
ExecutionStrategy::Distributed => {
145-
unimplemented!();
186+
let mut batches = LambdaExecutor::coalesce_batches(
187+
vec![batch],
188+
globals["lambda"]["payload_batch_size"]
189+
.parse::<usize>()
190+
.unwrap(),
191+
)
192+
.await?;
193+
assert_eq!(1, batches.len());
194+
195+
invoke_async_functions(&ctx, &mut batches[0])?;
196+
Ok(serde_json::to_value(&ctx.name)?)
146197
}
147198
}
148199
}
149200

150201
async fn payload_handler(ctx: &mut ExecutionContext, event: Value) -> Result<Value> {
151-
let (batch, uuid) = Payload::to_batch(event);
152-
let schema = batch.schema();
202+
let (batches, uuid) = Payload::to_batch(event);
153203

154-
ctx.feed_one_source(&vec![vec![batch]]);
204+
ctx.feed_one_source(&vec![batches]);
155205
let batches = ctx.execute().await?;
156206

157-
Ok(Payload::from(&batches[0], schema, uuid))
207+
Ok(Payload::to_value(&batches, uuid))
158208
}
159209

160210
async fn handler(event: Value, _: Context) -> Result<Value> {
@@ -259,12 +309,12 @@ mod tests {
259309
let res = handler(event, Context::default()).await?;
260310

261311
// check the result of function execution
262-
let (batch, _) = Payload::to_batch(res);
312+
let (batches, _) = Payload::to_batch(res);
263313

264314
if i == 0 {
265315
println!(
266316
"{}",
267-
arrow::util::pretty::pretty_format_batches(&[batch]).unwrap(),
317+
arrow::util::pretty::pretty_format_batches(&batches).unwrap(),
268318
);
269319
}
270320
}

src/runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ async-trait = "0.1.42"
1414
aws_lambda_events = "0.4"
1515
base64 = "0.13.0"
1616
blake2 = "0.9"
17+
bytes = "1.0.1"
1718
datafusion = { git = "https://github.com/DSLAM-UMD/arrow", branch = "scq", features = [ "simd" ] }
1819
futures = "0.3.12"
1920
json = "0.12.4"
2021
lazy_static = "1.4"
2122
lz4 = "1.23.1"
23+
rand = "0.8.3"
2224
rayon = "1.5"
2325
rusoto_core = "0.46.0"
2426
rusoto_kafka = "0.46.0"

src/runtime/src/config/squirtle.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ production = false
99

1010
name = "execution_context"
1111

12-
# default target batch size
12+
# default target batch size (16 KB)
1313
target_batch_size = 16384
1414

15+
# default raw record batch size in the payload (512 KB)
16+
payload_batch_size = 524288
17+
1518
# multi-thread parallelism inside the cloud function
1619
parallelism = 8
1720

src/runtime/src/executor/mod.rs

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
//! distributed dataflow model.
2626
2727
use crate::config::GLOBALS as globals;
28+
use crate::context::CloudFunction;
2829
use crate::context::ExecutionContext;
29-
use crate::error::Result;
30+
use crate::error::{Result, SquirtleError};
3031
use crate::payload::{Payload, Uuid};
3132
use arrow::record_batch::RecordBatch;
3233
use async_trait::async_trait;
@@ -36,6 +37,7 @@ use datafusion::physical_plan::repartition::RepartitionExec;
3637
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
3738
use futures::stream::StreamExt;
3839
use plan::*;
40+
use rand::Rng;
3941
use rayon::prelude::*;
4042
use serde_json::Value;
4143
use std::sync::Arc;
@@ -121,11 +123,7 @@ pub trait Executor {
121123
assert_eq!(1, output_partitions.len());
122124
assert_eq!(1, output_partitions[0].len());
123125

124-
Ok(Payload::from(
125-
&output_partitions[0][0],
126-
output_partitions[0][0].schema(),
127-
Uuid::default(),
128-
))
126+
Ok(Payload::to_value(&output_partitions[0], Uuid::default()))
129127
}
130128
}
131129

@@ -178,14 +176,43 @@ impl LambdaExecutor {
178176
ExecutionStrategy::Distributed
179177
}
180178
}
179+
180+
/// Returns the next cloud function names for invocation.
181+
pub fn next_function(ctx: &ExecutionContext) -> Result<String> {
182+
let mut lambdas = match &ctx.next {
183+
CloudFunction::None => vec![],
184+
CloudFunction::Chorus((name, num)) => {
185+
(0..*num).map(|i| format!("{}-{}", name, i)).collect()
186+
}
187+
CloudFunction::Solo(name) => vec![name.to_owned()],
188+
};
189+
190+
if lambdas.is_empty() {
191+
return Err(SquirtleError::Internal(
192+
"No distributed execution plan".to_owned(),
193+
));
194+
}
195+
196+
let mut function_name = lambdas[0].clone();
197+
if lambdas.len() > 1 {
198+
// mapping to the same lambda function name through hashing technology.
199+
let mut rng = rand::thread_rng();
200+
function_name = lambdas.remove(rng.gen_range(0..lambdas.len()));
201+
}
202+
203+
Ok(function_name)
204+
}
181205
}
182206

183207
#[cfg(test)]
184208
mod tests {
185209
use super::*;
210+
use crate::datasource::{kinesis, DataSource};
186211
use crate::error::SquirtleError;
187212
use arrow::array::UInt32Array;
188213
use arrow::datatypes::{DataType, Field, Schema};
214+
use aws_lambda_events::event::kinesis::KinesisEvent;
215+
use datafusion::datasource::MemTable;
189216
use datafusion::physical_plan::expressions::Column;
190217
use tokio::task::JoinHandle;
191218

@@ -335,6 +362,62 @@ mod tests {
335362
)
336363
.unwrap()
337364
}
365+
366+
#[tokio::test]
367+
async fn next_function() -> Result<()> {
368+
let input = include_str!("../../../test/data/example-kinesis-event-1.json");
369+
let input: KinesisEvent = serde_json::from_str(input).unwrap();
370+
let partitions = vec![kinesis::to_batch(input)];
371+
372+
let mut ctx = datafusion::execution::context::ExecutionContext::new();
373+
let provider = MemTable::try_new(partitions[0][0].schema(), partitions.clone())?;
374+
375+
ctx.register_table("test", Arc::new(provider));
376+
377+
let sql = "SELECT MAX(c1), MIN(c2), c3 FROM test WHERE c2 < 99 GROUP BY c3";
378+
let logical_plan = ctx.create_logical_plan(&sql)?;
379+
let logical_plan = ctx.optimize(&logical_plan)?;
380+
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
381+
382+
// Serialize the physical plan and skip its record batches
383+
let plan = serde_json::to_string(&physical_plan)?;
384+
385+
// Deserialize the physical plan that doesn't contain record batches
386+
let plan: Arc<dyn ExecutionPlan> = serde_json::from_str(&plan)?;
387+
388+
// Feed record batches back to the plan
389+
let mut ctx = ExecutionContext {
390+
plan: plan.clone(),
391+
name: "test".to_string(),
392+
next: CloudFunction::None,
393+
datasource: DataSource::UnknownEvent,
394+
};
395+
LambdaExecutor::next_function(&ctx).expect_err("No distributed execution plan");
396+
397+
ctx = ExecutionContext {
398+
plan: plan.clone(),
399+
name: "test".to_string(),
400+
next: CloudFunction::Solo("solo".to_string()),
401+
datasource: DataSource::UnknownEvent,
402+
};
403+
assert_eq!("solo", LambdaExecutor::next_function(&ctx)?);
404+
405+
ctx = ExecutionContext {
406+
plan: plan.clone(),
407+
name: "test".to_string(),
408+
next: CloudFunction::Chorus(("chorus".to_string(), 24)),
409+
datasource: DataSource::UnknownEvent,
410+
};
411+
412+
let lambdas: Vec<String> = (0..100)
413+
.map(|_| LambdaExecutor::next_function(&ctx).unwrap())
414+
.collect();
415+
416+
assert_eq!(100, lambdas.len());
417+
assert_ne!(lambdas.iter().min(), lambdas.iter().max());
418+
419+
Ok(())
420+
}
338421
}
339422

340423
pub mod plan;

0 commit comments

Comments
 (0)