Skip to content

Commit c9b964e

Browse files
authored
feat: support nexmark q13 on cloud functions (#405)
* feat: support nexmark q13 on cloud functions * fix: we cannot infer schema for side input This is because inferred schema's field type could be different. For example, Int32 and Int64 are different. * fix: add side_input_schema() * fix: unused variable: `ctx` * fix: use base64 to encode schema
1 parent 4d8d980 commit c9b964e

File tree

15 files changed

+304
-112
lines changed

15 files changed

+304
-112
lines changed

benchmarks/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ publish = false
99

1010
[dependencies]
1111
arrow = { git = "https://github.com/flock-lab/arrow-rs", branch = "flock", features = [ "simd" ] }
12+
base64 = "0.13.0"
1213
chrono = "0.4.19"
1314
datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" }
1415
env_logger = "^0.9"
@@ -18,6 +19,7 @@ humantime = "2.1.0"
1819
itertools = "0.10.0"
1920
lazy_static = "1.4"
2021
log = "0.4.14"
22+
reqwest = "0.11.7"
2123
rusoto_core = "0.47.0"
2224
rusoto_lambda = "0.47.0"
2325
rusoto_logs = "0.47.0"

benchmarks/src/nexmark/main.rs

Lines changed: 95 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,24 @@ use flock::prelude::*;
2424
use humantime::parse_duration;
2525
use lazy_static::lazy_static;
2626
use log::info;
27-
use nexmark::event::{Auction, Bid, Person};
27+
use nexmark::event::{side_input_schema, Auction, Bid, Person};
2828
use nexmark::NEXMarkSource;
2929
use rainbow::{rainbow_println, rainbow_string};
30-
use rusoto_core::{ByteStream, Region};
30+
use rusoto_core::Region;
3131
use rusoto_lambda::InvocationResponse;
32-
use rusoto_s3::{ListObjectsV2Request, PutObjectRequest, S3Client, S3};
32+
use rusoto_s3::S3Client;
3333
use std::collections::HashMap;
3434
use std::sync::Arc;
3535
use structopt::StructOpt;
3636
use tokio::task::JoinHandle;
3737

38+
static SIDE_INPUT_DOWNLOAD_URL: &str = concat!(
39+
"https://gist.githubusercontent.com/gangliao/",
40+
"de6f544b8a93f26081036e0a7f8c1715/raw/",
41+
"586c88ad6f89d12c9f1753622eddf4788f6f0f9d/",
42+
"nexmark_q13_side_input.csv"
43+
);
44+
3845
lazy_static! {
3946
pub static ref FLOCK_S3_KEY: String = FLOCK_CONF["flock"]["s3_key"].to_string();
4047
pub static ref FLOCK_S3_BUCKET: String = FLOCK_CONF["flock"]["s3_bucket"].to_string();
@@ -51,6 +58,7 @@ lazy_static! {
5158
pub static ref NEXMARK_SOURCE_LOG_GROUP: String = "/aws/lambda/flock_datasource".to_string();
5259
pub static ref NEXMARK_Q4_S3_KEY: String = FLOCK_CONF["nexmark"]["q4_s3_key"].to_string();
5360
pub static ref NEXMARK_Q6_S3_KEY: String = FLOCK_CONF["nexmark"]["q6_s3_key"].to_string();
61+
pub static ref NEXMARK_Q13_S3_SIDE_INPUT_KEY: String = FLOCK_CONF["nexmark"]["q13_s3_side_input_key"].to_string();
5462
}
5563

5664
#[derive(Default, Clone, Debug, StructOpt)]
@@ -115,10 +123,18 @@ pub async fn register_nexmark_tables() -> Result<DataFusionExecutionContext> {
115123
)?;
116124
ctx.register_table("bid", Arc::new(bid_table))?;
117125

126+
// For NEXMark Q13
127+
let side_input_schema = Arc::new(side_input_schema());
128+
let side_input_table = MemTable::try_new(
129+
side_input_schema.clone(),
130+
vec![vec![RecordBatch::new_empty(side_input_schema)]],
131+
)?;
132+
ctx.register_table("side_input", Arc::new(side_input_table))?;
133+
118134
Ok(ctx)
119135
}
120136

121-
pub fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> NEXMarkSource {
137+
pub async fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> Result<NEXMarkSource> {
122138
let window = match opt.query_number {
123139
0..=4 | 6 | 9 | 10 | 13 => StreamWindow::ElementWise,
124140
5 => StreamWindow::HoppingWindow((10, 5)),
@@ -131,7 +147,28 @@ pub fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> NEXMarkSource {
131147
if opt.query_number == 10 {
132148
opt.data_sink_type = "s3".to_string();
133149
}
134-
NEXMarkSource::new(opt.seconds, opt.generators, opt.events_per_second, window)
150+
151+
if opt.query_number == 13 {
152+
let data = reqwest::get(SIDE_INPUT_DOWNLOAD_URL)
153+
.await
154+
.map_err(|_| "Failed to download side input data")?
155+
.text_with_charset("utf-8")
156+
.await
157+
.map_err(|_| "Failed to read side input data")?;
158+
put_object_to_s3_if_missing(
159+
FLOCK_S3_BUCKET.clone(),
160+
NEXMARK_Q13_S3_SIDE_INPUT_KEY.clone(),
161+
data.as_bytes().to_vec(),
162+
)
163+
.await?;
164+
}
165+
166+
Ok(NEXMarkSource::new(
167+
opt.seconds,
168+
opt.generators,
169+
opt.events_per_second,
170+
window,
171+
))
135172
}
136173

137174
pub async fn plan_placement(
@@ -145,27 +182,12 @@ pub async fn plan_placement(
145182
6 => (FLOCK_S3_BUCKET.clone(), NEXMARK_Q6_S3_KEY.clone()),
146183
_ => unreachable!(),
147184
};
148-
if let Some(0) = FLOCK_S3_CLIENT
149-
.list_objects_v2(ListObjectsV2Request {
150-
bucket: s3_bucket.clone(),
151-
prefix: Some(s3_key.clone()),
152-
max_keys: Some(1),
153-
..Default::default()
154-
})
155-
.await
156-
.map_err(|e| FlockError::Internal(e.to_string()))?
157-
.key_count
158-
{
159-
FLOCK_S3_CLIENT
160-
.put_object(PutObjectRequest {
161-
bucket: s3_bucket.clone(),
162-
key: s3_key.clone(),
163-
body: Some(ByteStream::from(serde_json::to_vec(&physcial_plan)?)),
164-
..Default::default()
165-
})
166-
.await
167-
.map_err(|e| FlockError::Internal(e.to_string()))?;
168-
}
185+
put_object_to_s3_if_missing(
186+
s3_bucket.clone(),
187+
s3_key.clone(),
188+
serde_json::to_vec(&physcial_plan)?,
189+
)
190+
.await?;
169191
Ok((FLOCK_EMPTY_PLAN.clone(), Some((s3_bucket, s3_key))))
170192
}
171193
_ => Ok((physcial_plan, None)),
@@ -318,6 +340,48 @@ pub async fn create_physical_plans(
318340
Ok(plans)
319341
}
320342

343+
pub async fn add_extra_metadata(
344+
opt: &NexmarkBenchmarkOpt,
345+
metadata: &mut HashMap<String, String>,
346+
) -> Result<()> {
347+
metadata.insert(
348+
"invocation_type".to_string(),
349+
if opt.async_type {
350+
"async".to_string()
351+
} else {
352+
"sync".to_string()
353+
},
354+
);
355+
356+
if opt.query_number == 12 {
357+
metadata.insert(
358+
"add_process_time_query".to_string(),
359+
nexmark_query(opt.query_number)[0].clone(),
360+
);
361+
}
362+
363+
if opt.query_number == 11 || opt.query_number == 12 {
364+
metadata.insert("session_key".to_string(), "bidder".to_string());
365+
metadata.insert("session_name".to_string(), "bid".to_string());
366+
}
367+
368+
if opt.query_number == 13 {
369+
metadata.insert(
370+
"side_input_s3_key".to_string(),
371+
NEXMARK_Q13_S3_SIDE_INPUT_KEY.clone(),
372+
);
373+
metadata.insert("side_input_format".to_string(), "csv".to_string());
374+
375+
let side_input_schema = Arc::new(side_input_schema());
376+
metadata.insert(
377+
"side_input_schema".to_string(),
378+
base64::encode(schema_to_bytes(side_input_schema)),
379+
);
380+
}
381+
382+
Ok(())
383+
}
384+
321385
pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> {
322386
rainbow_println("================================================================");
323387
rainbow_println(" Running the benchmark ");
@@ -326,7 +390,7 @@ pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> {
326390
rainbow_println(format!("{:#?}\n", opt));
327391

328392
let query_number = opt.query_number;
329-
let nexmark_conf = create_nexmark_source(opt);
393+
let nexmark_conf = create_nexmark_source(opt).await?;
330394

331395
let mut ctx = register_nexmark_tables().await?;
332396
let plans = create_physical_plans(&mut ctx, query_number).await?;
@@ -343,26 +407,7 @@ pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> {
343407
// *delete* and **recreate** the source function every time we change the query.
344408
let mut metadata = HashMap::new();
345409
metadata.insert("workers".to_string(), serde_json::to_string(&worker)?);
346-
metadata.insert(
347-
"invocation_type".to_string(),
348-
if opt.async_type {
349-
"async".to_string()
350-
} else {
351-
"sync".to_string()
352-
},
353-
);
354-
355-
if query_number == 12 {
356-
metadata.insert(
357-
"add_process_time_query".to_string(),
358-
nexmark_query(query_number)[0].clone(),
359-
);
360-
}
361-
362-
if query_number == 11 || query_number == 12 {
363-
metadata.insert("session_key".to_string(), "bidder".to_string());
364-
metadata.insert("session_name".to_string(), "bid".to_string());
365-
}
410+
add_extra_metadata(opt, &mut metadata).await?;
366411

367412
let tasks = (0..opt.generators)
368413
.into_iter()
@@ -432,6 +477,7 @@ pub fn nexmark_query(query_number: usize) -> Vec<String> {
432477
10 => vec![include_str!("query/q10.sql")],
433478
11 => vec![include_str!("query/q11.sql")],
434479
12 => include_str!("query/q12.sql").split(';').collect(),
480+
13 => vec![include_str!("query/q13.sql")],
435481
_ => unreachable!(),
436482
}
437483
.into_iter()
@@ -453,7 +499,7 @@ mod tests {
453499
events_per_second: 1000,
454500
..Default::default()
455501
};
456-
let conf = create_nexmark_source(&mut opt);
502+
let conf = create_nexmark_source(&mut opt).await?;
457503
let (event, _) = Arc::new(conf.generate_data()?)
458504
.select(1, 0)
459505
.expect("Failed to select event.");
@@ -470,6 +516,7 @@ mod tests {
470516
nexmark_query(10),
471517
nexmark_query(11),
472518
nexmark_query(12),
519+
nexmark_query(13),
473520
];
474521
let ctx = register_nexmark_tables().await?;
475522
for sql in sqls {

benchmarks/src/nexmark/query/q13.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
SELECT auction,
2+
bidder,
3+
price,
4+
b_date_time,
5+
value
6+
FROM bid
7+
JOIN side_input
8+
ON auction = key;

benchmarks/src/s3/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async fn benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> {
4949
"Running the NEXMark benchmark [S3] with the following options: {:?}",
5050
opt
5151
);
52-
let nexmark_conf = create_nexmark_source(opt);
52+
let nexmark_conf = create_nexmark_source(opt).await?;
5353
let query_number = opt.query_number;
5454

5555
let mut ctx = register_nexmark_tables().await?;

flock-cli/src/nexmark.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,11 @@ fn run_args() -> App<'static> {
5050
Arg::new("query number")
5151
.short('q')
5252
.long("query")
53-
.help("Sets the NEXMark benchmark query number [0-12]")
53+
.help("Sets the NEXMark benchmark query number")
5454
.takes_value(true)
55+
.possible_values(&[
56+
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13",
57+
])
5558
.default_value("3"),
5659
)
5760
.arg(

flock-function/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ snmalloc = [ "snmalloc-rs" ]
1313
[dependencies]
1414
arrow = { git = "https://github.com/flock-lab/arrow-rs", branch = "flock", features = [ "simd" ] }
1515
aws_lambda_events = "0.5"
16+
base64 = "0.13.0"
1617
bytes = "1.1.0"
1718
chrono = "0.4.19"
1819
datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" }

0 commit comments

Comments
 (0)