Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ rand = "0.8.5"
walkdir = "2.3.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Performance opts
jemallocator = "0.5.0"


[dev-dependencies]
assert_cmd = "2.0.4"
predicates = "2.1.1"
tempfile = "3.3.0"

[profile.release]
lto = true
111 changes: 110 additions & 1 deletion src/commands/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::errors::PQRSError::{FileExists, FileNotFound};
use crate::utils::{check_path_present, get_row_batches, open_file, write_parquet};
use clap::Parser;
use log::debug;
use parquet::basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel};
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::schema::types::ColumnPath;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs;
use std::ops::Add;
use std::path::PathBuf;

Expand All @@ -16,12 +22,115 @@ pub struct MergeCommandArgs {
/// Parquet file to write
#[arg(short, long)]
output: PathBuf,

/// Path to a json config file specifying WriterProperties::builder() properties.
#[arg(short, long)]
config: Option<PathBuf>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MergeConfig {
pub set_dictionary_enabled: Option<bool>,
/// The encodings for this are the just text values of the enum parquet::basic::Encoding
pub column_encodings: Option<HashMap<String, String>>,
pub column_dictionary_enabled: Option<HashMap<String, bool>>,
pub compression: Option<String>,
pub compression_level: Option<u32>,
}
Comment on lines +31 to +39
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want to use a config file instead of command line options for these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using CLI opts your command blows up to be huge when you have a large number of columns. Using a JSON file is super convenient and easy to read :)


fn build_encoding_mappings() -> HashMap<&'static str, Encoding> {
HashMap::from([
("PLAIN", Encoding::PLAIN),
("PLAIN_DICTIONARY", Encoding::PLAIN_DICTIONARY),
("RLE", Encoding::RLE),
("BIT_PACKED", Encoding::BIT_PACKED),
("DELTA_BINARY_PACKED", Encoding::DELTA_BINARY_PACKED),
("DELTA_LENGTH_BYTE_ARRAY", Encoding::DELTA_LENGTH_BYTE_ARRAY),
("DELTA_BYTE_ARRAY", Encoding::DELTA_BYTE_ARRAY),
("RLE_DICTIONARY", Encoding::RLE_DICTIONARY),
("BYTE_STREAM_SPLIT", Encoding::BYTE_STREAM_SPLIT),
])
}

fn build_props_from_json_config(
config_path: PathBuf,
) -> Result<WriterProperties, PQRSError> {
let data = fs::read_to_string(config_path)?;
let merge_config: MergeConfig = serde_json::from_str(&data)?;
let mut props =
WriterProperties::builder().set_writer_version(WriterVersion::PARQUET_2_0);

if let Some(de) = merge_config.set_dictionary_enabled {
props = props.set_dictionary_enabled(de);
}

if let Some(column_encodings) = merge_config.column_encodings {
let encoding_mappings = build_encoding_mappings();
for (column_name, encoding_str) in column_encodings {
if !encoding_mappings.contains_key(encoding_str.as_str()) {
return Err(PQRSError::IllegalEncodingType());
}

let encoding = *encoding_mappings
.get(encoding_str.clone().as_str())
.unwrap();
props = props.set_column_encoding(ColumnPath::from(column_name), encoding)
Comment on lines +70 to +77
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made idiomatic use pattern matching?

}
}

if let Some(column_de) = merge_config.column_dictionary_enabled {
for (column_name, de) in column_de {
println!("{column_name}");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably don't want println!()s in the release

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! Sorry.

props =
props.set_column_dictionary_enabled(ColumnPath::from(column_name), de);
}
}

if let Some(compression_algo) = merge_config.compression {
if compression_algo.to_lowercase() == "brotli" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for compression_algo, I think we can use pattern matching here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout!

props = props.set_compression(Compression::BROTLI(
BrotliLevel::try_new(
merge_config
.compression_level
.expect("Compression level was not set!"),
)
.expect("Invalid Brotli level!"),
))
} else if compression_algo.to_lowercase() == "gzip" {
props = props.set_compression(Compression::GZIP(
GzipLevel::try_new(
merge_config
.compression_level
.expect("Compression level was not set!"),
)
.expect("Invalid GZIP level!"),
))
} else if compression_algo.to_lowercase() == "zstd" {
props = props.set_compression(Compression::ZSTD(
ZstdLevel::try_new(
merge_config
.compression_level
.expect("Compression level was not set!")
as i32,
)
.expect("Invalid ZSTD level!"),
))
}
}

Ok(props.build())
}

pub(crate) fn execute(opts: MergeCommandArgs) -> Result<(), PQRSError> {
debug!("The file names to read are: {:?}", opts.input);
debug!("The file name to write to: {}", opts.output.display());

let merge_config = if opts.config.is_some() {
Some(build_props_from_json_config(opts.config.unwrap())?)
} else {
None
};

// make sure output does not exist already before any reads
if check_path_present(&opts.output) {
return Err(FileExists(opts.output));
Expand All @@ -43,7 +152,7 @@ pub(crate) fn execute(opts: MergeCommandArgs) -> Result<(), PQRSError> {
}
// debug!("The combined data looks like this: {:#?}", combined);
// debug!("This is the input schema: {:#?}", combined.schema);
write_parquet(combined, &opts.output)?;
write_parquet(combined, &opts.output, merge_config)?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ pub enum PQRSError {
UTF8ConvertError(#[from] FromUtf8Error),
#[error("Could not read/write to buffer")]
BufferWriteError(#[from] IntoInnerError<BufWriter<Vec<u8>>>),
#[error("Illegal encoding type")]
IllegalEncodingType(),
}
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ mod commands;
mod errors;
mod utils;

// use jemalloc for release builds
extern crate jemallocator;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

Comment on lines +9 to +13
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unfamiliar with jemallocator. can you give a tl;dr on these performance optimizations? what do they do?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to only use my first commit in the PR? The second commit is performance optimized stuff that only belongs on my fork.

TL;DR this can be removed for the main branch

#[derive(Subcommand, Debug)]
enum Commands {
Cat(commands::cat::CatCommandArgs),
Expand Down
5 changes: 4 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::errors::PQRSError::CouldNotOpenFile;
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use log::debug;
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
use parquet::file::properties::WriterProperties;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -259,14 +260,16 @@ pub fn get_row_batches(file: File) -> Result<ParquetData, PQRSError> {
pub fn write_parquet<P: AsRef<Path>>(
data: ParquetData,
output: P,
props: Option<WriterProperties>,
) -> Result<(), PQRSError> {
let file = File::create(output)?;
let fields = data.schema.fields().to_vec();
// the schema from the record batch might not contain the file specific metadata
// drop the schema to make sure that we don't fail in that case
let schema_without_metadata = Schema::new(fields);

let mut writer = ArrowWriter::try_new(file, Arc::new(schema_without_metadata), None)?;
let mut writer =
ArrowWriter::try_new(file, Arc::new(schema_without_metadata), props)?;

// write record batches one at a time
// record batches are not combined
Expand Down