Skip to content

Conversation

@benjamin-awd
Copy link
Contributor

Summary

This PR adds support for the ArrowStream format to Clickhouse.

Vector configuration

 clickhouse:
    type: clickhouse
    inputs: [normalize]
    database: mydatabase
    table: logs
    endpoint: ${CLICKHOUSE_HOST}
    format: arrow_stream
    batch_encoding:
      codec: arrow_stream
      allow_nullable_fields: true

How did you test this PR?

Ran against local Clickhouse instance, integration tests

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

@benjamin-awd benjamin-awd requested a review from a team as a code owner December 12, 2025 14:55
@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Dec 12, 2025
@benjamin-awd benjamin-awd requested a review from a team as a code owner December 12, 2025 15:16
@github-actions github-actions bot added the domain: external docs Anything related to Vector's external, public documentation label Dec 12, 2025
@drichards-87 drichards-87 self-assigned this Dec 12, 2025
@drichards-87 drichards-87 removed their assignment Dec 12, 2025
Copy link
Contributor

@thomasqueirozb thomasqueirozb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my comments here are regarding improvements. This is very nice work and should work as is, thanks a lot!

Comment on lines -409 to +410
rust_decimal = { workspace = true, optional = true }
rust_decimal = { version = "1.37.0", default-features = false, features = ["std"], optional = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like rust_decimal is already using this version in lib/codecs/Cargo.toml. We should update the workspace reference to use 1.37.0 and keep using workspace here. Additionally lib/codecs/Cargo.toml should also use the workspace version of rust_decimal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes made in e7b89c0

Comment on lines 288 to 293
match self.format {
Format::JsonEachRow => "json_each_row",
Format::JsonAsObject => "json_as_object",
Format::JsonAsString => "json_as_string",
Format::ArrowStream => "arrow_stream",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just the serde representation of Format? Wondering if we should keep this as is or use serde here somehow to avoid drift

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to simplify the validation a bit, let me know if it makes sense 9e93926

If there are additional batch formats in the future, it might make sense to create a separate enum but I think this should be good enough for now

use std::sync::Arc;

use ::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use ::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};

Don't think the :: prefix is needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FROM system.columns WHERE database = '{}' AND table = '{}' \
ORDER BY position FORMAT JSONEachRow",
database, table
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an abundance of safety here I think we should have some kind of SQLi prevention. Something as simple as limiting database and table to ascii chars and -, _ should be enough. I wouldn't be opposed to using some other crate here instead.

Note: I spent some time looking into if the sqlx crate is viable for this but it doesn't look like it is unfortunately. All query operations depend on a database connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ideal solution would be to use the official Clickhouse rust client:

use clickhouse::sql::Identifier;

client.query("SELECT * FROM ?").bind(Identifier("table_name"));

but I think it only makes sense as part of a long term refactor of the entire sink to move away from HTTP. This would also be nice since it allows query level settings to be passed directly via the client instead of defining them in the config

    let client = Client::default()
        .with_url("http://localhost:8123")
        .with_option("async_insert", "1")
        .with_option("wait_for_async_insert", "0");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more robust handling: 4e101ac

Comment on lines 98 to 99
/// For example: "Nullable(LowCardinality(String))" -> ("String", true)
fn unwrap_type_modifiers(ch_type: &str) -> (&str, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have LowCardinality(Nullable(String))? If so this logic fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah it should actually be LowCardinality(Nullable(String))

The inverse is not possible:

CREATE TABLE lc_test
(
    `id` UInt16,
    `col` Nullable(LowCardinality(String))
)
ENGINE = MergeTree
ORDER BY id

Received exception from server (version 25.4.5):
Code: 43. DB::Exception: Received from localhost:9000. DB::Exception: Nested type LowCardinality(String) cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to refactor the type handling: 4211ce3 (also fixed the ordering issue)

Comment on lines 157 to 166
// Complex types
_ if base_type.starts_with("Array") => {
return Err(unsupported(ch_type, "Array"));
}
_ if base_type.starts_with("Tuple") => {
return Err(unsupported(ch_type, "Tuple"));
}
_ if base_type.starts_with("Map") => {
return Err(unsupported(ch_type, "Map"));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope: is this something you plan to add support for in the future? Is this something that would add much value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, eventually -- but it's a little complex and will involve changes to the Arrow encoder logic as well

Decided to elide it from this PR to make it a bit easier to review

/// Parses comma-separated arguments from a parenthesized string.
/// Input: "(arg1, arg2, arg3)" -> Output: Ok(vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()])
/// Returns an error if parentheses are malformed.
fn parse_args(input: &str) -> Result<Vec<String>, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying there is anything wrong with this implementation, but usually manually keeping track of nested quotes and paren depth is not trivial and very error prone. Ideally we'd have robust tests for this. Another idea is to parse this using nom also, this way you can get rid of the state machine but it could also be that the nom code wouldn't be much better.

LLM generated nom parser - not tested at all
/// Parses comma-separated arguments from a parenthesized string.
/// Input: "(arg1, arg2, arg3)" -> Output: Ok(vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()])
/// Returns an error if parentheses are malformed.
fn parse_args(input: &str) -> Result<Vec<String>, String> {
    use nom::{
        IResult,
        branch::alt,
        bytes::complete::{tag, take_until},
        character::complete::{char, multispace0},
        combinator::{map, recognize},
        multi::separated_list0,
        sequence::{delimited, pair, preceded},
    };

    fn quoted_string(input: &str) -> IResult<&str, &str> {
        delimited(char('\''), take_until("'"), char('\''))(input)
    }

    fn nested_parens(input: &str) -> IResult<&str, &str> {
        // This handles nested structures like "Array(Int32)"
        recognize(pair(
            take_until("("),
            delimited(char('('), nested_arg, char(')')),
        ))(input)
    }

    fn nested_arg(input: &str) -> IResult<&str, &str> {
        recognize(alt((
            quoted_string,
            nested_parens,
            nom::bytes::complete::is_not(",)"),
        )))(input)
    }

    fn single_arg(input: &str) -> IResult<&str, &str> {
        preceded(
            multispace0,
            alt((
                quoted_string,
                nested_parens,
                nom::bytes::complete::is_not(",)"),
            )),
        )(input)
    }

    fn args_parser(input: &str) -> IResult<&str, Vec<&str>> {
        delimited(
            char('('),
            separated_list0(
                delimited(multispace0, char(','), multispace0),
                map(single_arg, str::trim),
            ),
            preceded(multispace0, char(')')),
        )(input)
    }

    let trimmed = input.trim();
    match args_parser(trimmed) {
        Ok(("", args)) => Ok(args.into_iter().map(String::from).collect()),
        Ok((remaining, _)) => Err(format!("Unexpected trailing input: '{}'", remaining)),
        Err(e) => Err(format!("Failed to parse arguments: {}", e)),
    }
}

I also think this is would break if something like this came back: ('arg\'\)'). Since this is data coming straight from clickhouse I'm not sure if this is a problem but if any user-controlled data is involved this could potentially happen.

Everything I said in here basically boils down to:

  1. Function could maybe be improved using nom.
  2. Needs tests.
  3. User controlled data can result in an Err. (Not sure if applicable, we can also just Err if something weird comes in)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nom idea is pretty cool, but it's quite verbose and I think the current approach is still sustainable for now. Regardless, we still need a looooot of custom logic for this

User controlled data can result in an Err. (Not sure if applicable, we can also just Err if something weird comes in)

We don't need to worry about user controlled data, since it should be validated by Clickhouse already.

Needs tests

There are some tests already for parse_args and parse_ch_type, but there there are probably edge cases that might pop up -- I think it would be advisable to flag this feature as beta in the docs

I was looking at clickhouse-arrow for inspiration but it is also fairly complex

})
.ok_or_else(|| {
format!(
"Could not parse Decimal type '{}'. Valid formats: Decimal(P,S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Could not parse Decimal type '{}'. Valid formats: Decimal(P,S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S)",
"Could not parse Decimal type '{}'. Valid formats: Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S)",

Either this or the docstring needs to be changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the error message: 1826aa3

// Date and time types (timezones not currently handled, defaults to UTC)
"Date" | "Date32" => DataType::Date32,
"DateTime" => DataType::Timestamp(TimeUnit::Second, None),
_ if base_type.starts_with("DateTime64") => parse_datetime64_precision(base_type)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using starts_with here is prone to someone using DateTime64FooBarBaz and getting back a DateTime64. Again I'm worried about two things here: first is untrusted input, but not sure if applicable; second is that if some other DateTime64 type is added like DateTime64UTC or something like that then this becomes a real problem. You already implemented extract_identifier, so the simplest solution is to move all these to a _ => { block and matching on extract_identifier().0 instead of using str::starts_with.

This is of course relevant in the other matches that are using starts_with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented by 1db0f07

Comment on lines 361 to 363
let mut arrow_config = ArrowStreamSerializerConfig::with_provider(provider);
// Preserve allow_nullable_fields setting from the user's config
arrow_config.allow_nullable_fields = base_config.allow_nullable_fields;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the one allow_nullable_fields as an "exception" that we pass along to the newly created arrow_config can be a source of bugs in the future if more fields to ArrowStreamSerializerConfig. I think we would be better off modifying the original base_config by removing schema and inserting a schema provider instead, either directly by doing base_config.schema = None and base_config.schema_provider = Some(provider) or by doing something like base_config.with_schema_provider(provider).

Copy link
Contributor Author

@benjamin-awd benjamin-awd Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, changes made: e1b22a1d8


/// Schema provider for lazy schema loading.
#[serde(skip)]
schema_provider: Option<Arc<dyn SchemaProvider>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Arc necessary? I'd think this would work with Box<> too

Copy link
Contributor Author

@benjamin-awd benjamin-awd Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the schema provider is currently only being used once at startup before being discarded I think it's fine to go with Box

Copy link
Contributor Author

@benjamin-awd benjamin-awd Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be handled by 834ce89..aa69c1c

#[configurable(metadata(docs::examples = true))]
pub allow_nullable_fields: bool,

/// Schema provider for lazy schema loading.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we anticipating runtime schema changes? If not, I would move schema fetching entirely into config build phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes in aa69c1c -- I think we only need this at runtime if the target database/table is dynamic and determined by a VRL remap transform

For now I think it makes sense to keep things simple and just keep schema fetching in the config as you mentioned 👍

@benjamin-awd benjamin-awd force-pushed the ch-sink-arrow branch 2 times, most recently from e678a00 to db3297d Compare December 13, 2025 08:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add ArrowStream format to Clickhouse sink

4 participants