Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ arc-swap = { version = "1.7", default-features = false, optional = true }
async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
apache-avro = { version = "0.16.0", default-features = false, optional = true }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true }
arrow-schema = { version = "56.2.0", default-features = false, optional = true }
axum = { version = "0.6.20", default-features = false }
base64 = { workspace = true, optional = true }
bloomy = { version = "1.2.0", default-features = false, optional = true }
Expand Down Expand Up @@ -406,7 +407,7 @@ redis = { version = "0.32.4", default-features = false, features = ["connection-
regex.workspace = true
roaring = { version = "0.11.2", default-features = false, features = ["std"], optional = true }
rumqttc = { version = "0.24.0", default-features = false, features = ["use-rustls"], optional = true }
rust_decimal = { workspace = true, optional = true }
rust_decimal = { version = "1.37.0", default-features = false, features = ["std"], optional = true }
Comment on lines -409 to +410
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like rust_decimal is already using this version in lib/codecs/Cargo.toml. We should update the workspace reference to use 1.37.0 and keep using workspace here. Additionally lib/codecs/Cargo.toml should also use the workspace version of rust_decimal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes made in e7b89c0

seahash = { version = "4.1.0", default-features = false }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
snap = { version = "1.1.1", default-features = false }
Expand Down Expand Up @@ -583,7 +584,7 @@ enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-arrow = ["vector-lib/arrow"]
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
codecs-opentelemetry = ["vector-lib/opentelemetry"]
codecs-syslog = ["vector-lib/syslog"]

Expand Down Expand Up @@ -851,7 +852,7 @@ sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage",
sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-chronicle = []
sinks-clickhouse = []
sinks-clickhouse = ["dep:arrow", "dep:arrow-schema", "dep:rust_decimal", "codecs-arrow"]
sinks-console = []
sinks-databend = ["dep:databend-client"]
sinks-datadog_events = []
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/24074_clickhouse_arrow_format.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `clickhouse` sink now supports the `arrow_stream` format option, enabling high-performance binary data transfer using Apache Arrow IPC. This provides significantly better performance and smaller payload sizes compared to JSON-based formats.

authors: benjamin-awd
1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "tests/bin/generate-avro-fixtures.rs"
[dependencies]
apache-avro = { version = "0.20.0", default-features = false }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"] }
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
rust_decimal = { version = "1.37", default-features = false, features = ["std"] }
Expand Down
57 changes: 57 additions & 0 deletions lib/codecs/src/encoding/format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use arrow::{
ipc::writer::StreamWriter,
record_batch::RecordBatch,
};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
Expand All @@ -25,6 +26,15 @@ use vector_config::configurable_component;

use vector_core::event::{Event, Value};

/// Provides Arrow schema for encoding.
///
/// Sinks can implement this trait to provide custom schema fetching logic.
#[async_trait]
pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
/// Get the Arrow schema for encoding events.
async fn get_schema(&self) -> Result<Arc<Schema>, ArrowEncodingError>;
}

/// Configuration for Arrow IPC stream serialization
#[configurable_component]
#[derive(Clone, Default)]
Expand All @@ -45,6 +55,10 @@ pub struct ArrowStreamSerializerConfig {
#[serde(default)]
#[configurable(metadata(docs::examples = true))]
pub allow_nullable_fields: bool,

/// Schema provider for lazy schema loading.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we anticipating runtime schema changes? If not, I would move schema fetching entirely into config build phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes in aa69c1c -- I think we only need this at runtime if the target database/table is dynamic and determined by a VRL remap transform

For now I think it makes sense to keep things simple and just keep schema fetching in the config as you mentioned 👍

#[serde(skip)]
schema_provider: Option<Arc<dyn SchemaProvider>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Arc necessary? I'd think this would work with Box<> too

Copy link
Contributor Author

@benjamin-awd benjamin-awd Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the schema provider is currently only being used once at startup before being discarded I think it's fine to go with Box

Copy link
Contributor Author

@benjamin-awd benjamin-awd Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be handled by 834ce89..aa69c1c

}

impl std::fmt::Debug for ArrowStreamSerializerConfig {
Expand All @@ -58,6 +72,10 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig {
.map(|s| format!("{} fields", s.fields().len())),
)
.field("allow_nullable_fields", &self.allow_nullable_fields)
.field(
"schema_provider",
&self.schema_provider.as_ref().map(|_| "<provider>"),
)
.finish()
}
}
Expand All @@ -68,6 +86,38 @@ impl ArrowStreamSerializerConfig {
Self {
schema: Some(schema),
allow_nullable_fields: false,
schema_provider: None,
}
}

/// Create a new ArrowStreamSerializerConfig with a schema provider
pub fn with_provider(provider: Arc<dyn SchemaProvider>) -> Self {
Self {
schema: None,
schema_provider: Some(provider),
allow_nullable_fields: false,
}
}

/// Get the schema provider if one was configured
pub fn provider(&self) -> Option<&Arc<dyn SchemaProvider>> {
self.schema_provider.as_ref()
}

/// Resolve the schema from the provider if present.
pub async fn resolve(&mut self) -> Result<(), ArrowEncodingError> {
// If schema already exists, nothing to do
if self.schema.is_some() {
return Ok(());
}

// Fetch from provider if available
if let Some(provider) = &self.schema_provider {
let schema = provider.get_schema().await?;
self.schema = Some(schema);
Ok(())
} else {
Err(ArrowEncodingError::NoSchemaProvided)
}
}

Expand Down Expand Up @@ -154,6 +204,13 @@ pub enum ArrowEncodingError {
#[snafu(display("Schema must be provided before encoding"))]
NoSchemaProvided,

/// Failed to fetch schema from provider
#[snafu(display("Failed to fetch schema from provider: {}", message))]
SchemaFetchError {
/// Error message from the provider
message: String,
},

/// Unsupported Arrow data type for field
#[snafu(display(
"Unsupported Arrow data type for field '{}': {:?}",
Expand Down
4 changes: 3 additions & 1 deletion lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ mod text;
use std::fmt::Debug;

#[cfg(feature = "arrow")]
pub use arrow::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig};
pub use arrow::{
ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider,
};
pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions};
pub use cef::{CefSerializer, CefSerializerConfig};
use dyn_clone::DynClone;
Expand Down
4 changes: 3 additions & 1 deletion lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod framing;
pub mod serializer;
pub use chunking::{Chunker, Chunking, GelfChunker};
#[cfg(feature = "arrow")]
pub use format::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig};
pub use format::{
ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider,
};
pub use format::{
AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
Expand Down
Loading
Loading