Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion src/commands/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ use crate::errors::PQRSError::{FileExists, FileNotFound};
use crate::utils::{check_path_present, get_row_batches, open_file, write_parquet};
use clap::Parser;
use log::debug;
use parquet::basic::{BrotliLevel, Compression, Encoding, GzipLevel};
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::schema::types::ColumnPath;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs;
use std::ops::Add;
use std::path::PathBuf;

Expand All @@ -16,12 +22,97 @@ pub struct MergeCommandArgs {
/// Parquet file to write
#[arg(short, long)]
output: PathBuf,

/// Path to a json config file specifying WriterProperties::builder() properties.
#[arg(short, long)]
config: Option<PathBuf>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MergeConfig {
pub set_dictionary_enabled: Option<bool>,
/// The encodings for this are the just text values of the enum parquet::basic::Encoding
pub column_encodings: Option<HashMap<String, String>>,
pub column_dictionary_enabled: Option<HashMap<String, bool>>,
pub compression: Option<String>,
pub compression_level: Option<u32>,
}
Comment on lines +31 to +39
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want to use a config file instead of command line options for these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using CLI opts your command blows up to be huge when you have a large number of columns. Using a JSON file is super convenient and easy to read :)


fn build_encoding_mappings() -> HashMap<&'static str, Encoding> {
HashMap::from([
("PLAIN", Encoding::PLAIN),
("PLAIN_DICTIONARY", Encoding::PLAIN_DICTIONARY),
("RLE", Encoding::RLE),
("BIT_PACKED", Encoding::BIT_PACKED),
("DELTA_BINARY_PACKED", Encoding::DELTA_BINARY_PACKED),
("DELTA_LENGTH_BYTE_ARRAY", Encoding::DELTA_LENGTH_BYTE_ARRAY),
("DELTA_BYTE_ARRAY", Encoding::DELTA_BYTE_ARRAY),
("RLE_DICTIONARY", Encoding::RLE_DICTIONARY),
("BYTE_STREAM_SPLIT", Encoding::BYTE_STREAM_SPLIT),
])
}

fn build_props_from_json_config(
config_path: PathBuf,
) -> Result<WriterProperties, PQRSError> {
let data = fs::read_to_string(config_path)?;
let merge_config: MergeConfig = serde_json::from_str(&data)?;
let mut props =
WriterProperties::builder().set_writer_version(WriterVersion::PARQUET_2_0);

if let Some(de) = merge_config.set_dictionary_enabled {
props = props.set_dictionary_enabled(de);
}

if let Some(column_encodings) = merge_config.column_encodings {
let encoding_mappings = build_encoding_mappings();
for (column_name, encoding_str) in column_encodings {
if !encoding_mappings.contains_key(encoding_str.as_str()) {
return Err(PQRSError::IllegalEncodingType());
}

let encoding = *encoding_mappings
.get(encoding_str.clone().as_str())
.unwrap();
props = props.set_column_encoding(ColumnPath::from(column_name), encoding)
Comment on lines +70 to +77
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made idiomatic use pattern matching?

}
}

if let Some(column_de) = merge_config.column_dictionary_enabled {
for (column_name, de) in column_de {
println!("{column_name}");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably don't want println!()s in the release

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! Sorry.

props =
props.set_column_dictionary_enabled(ColumnPath::from(column_name), de);
}
}

if let Some(compression_algo) = merge_config.compression {
if compression_algo.to_lowercase() == "brotli" {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for compression_algo, I think we can use pattern matching here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout!

props = props.set_compression(Compression::BROTLI(
BrotliLevel::try_new(merge_config.compression_level.unwrap())
.expect("Invalid Brotli level!"),
))
} else if compression_algo.to_lowercase() == "gzip" {
props = props.set_compression(Compression::GZIP(
GzipLevel::try_new(merge_config.compression_level.unwrap())
.expect("Invalid GZIP level!"),
))
}
}

Ok(props.build())
}

pub(crate) fn execute(opts: MergeCommandArgs) -> Result<(), PQRSError> {
debug!("The file names to read are: {:?}", opts.input);
debug!("The file name to write to: {}", opts.output.display());

let merge_config = if opts.config.is_some() {
Some(build_props_from_json_config(opts.config.unwrap())?)
} else {
None
};

// make sure output does not exist already before any reads
if check_path_present(&opts.output) {
return Err(FileExists(opts.output));
Expand All @@ -43,7 +134,7 @@ pub(crate) fn execute(opts: MergeCommandArgs) -> Result<(), PQRSError> {
}
// debug!("The combined data looks like this: {:#?}", combined);
// debug!("This is the input schema: {:#?}", combined.schema);
write_parquet(combined, &opts.output)?;
write_parquet(combined, &opts.output, merge_config)?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ pub enum PQRSError {
UTF8ConvertError(#[from] FromUtf8Error),
#[error("Could not read/write to buffer")]
BufferWriteError(#[from] IntoInnerError<BufWriter<Vec<u8>>>),
#[error("Illegal encoding type")]
IllegalEncodingType(),
}
5 changes: 4 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::errors::PQRSError::CouldNotOpenFile;
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use log::debug;
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
use parquet::file::properties::WriterProperties;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::Row;
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -259,14 +260,16 @@ pub fn get_row_batches(file: File) -> Result<ParquetData, PQRSError> {
pub fn write_parquet<P: AsRef<Path>>(
data: ParquetData,
output: P,
props: Option<WriterProperties>,
) -> Result<(), PQRSError> {
let file = File::create(output)?;
let fields = data.schema.fields().to_vec();
// the schema from the record batch might not contain the file specific metadata
// drop the schema to make sure that we don't fail in that case
let schema_without_metadata = Schema::new(fields);

let mut writer = ArrowWriter::try_new(file, Arc::new(schema_without_metadata), None)?;
let mut writer =
ArrowWriter::try_new(file, Arc::new(schema_without_metadata), props)?;

// write record batches one at a time
// record batches are not combined
Expand Down