Skip to content

Commit 3f48cae

Browse files
enhancement(codecs): allow configurable null handling in Arrow encoder (#24288)
* enhancement(codecs): allow configurable null handling in Arrow encoder * chore: update changelog * Remove whitespace from changelog * make fmt * Remove Arc::clone when value can be fully moved --------- Co-authored-by: Thomas <[email protected]> Co-authored-by: Thomas <[email protected]>
1 parent a053a2e commit 3f48cae

File tree

2 files changed

+217
-3
lines changed

2 files changed

+217
-3
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
The Arrow encoder now supports configurable null handling through the `allow_nullable_fields`
2+
option. This controls whether nullable fields should be explicitly marked
3+
as nullable in the Arrow schema, enabling better compatibility with
4+
downstream systems that have specific requirements for null handling.
5+
6+
authors: benjamin-awd

lib/codecs/src/encoding/format/arrow.rs

Lines changed: 211 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ pub struct ArrowStreamSerializerConfig {
3333
#[serde(skip)]
3434
#[configurable(derived)]
3535
pub schema: Option<Arc<arrow::datatypes::Schema>>,
36+
37+
/// Allow null values for non-nullable fields in the schema.
38+
///
39+
/// When enabled, missing or incompatible values will be encoded as null even for fields
40+
/// marked as non-nullable in the Arrow schema. This is useful when working with downstream
41+
/// systems that can handle null values through defaults, computed columns, or other mechanisms.
42+
///
43+
/// When disabled (default), missing values for non-nullable fields will cause encoding errors,
44+
/// ensuring all required data is present before sending to the sink.
45+
#[serde(default)]
46+
#[configurable(metadata(docs::examples = true))]
47+
pub allow_nullable_fields: bool,
3648
}
3749

3850
impl std::fmt::Debug for ArrowStreamSerializerConfig {
@@ -45,6 +57,7 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig {
4557
.as_ref()
4658
.map(|s| format!("{} fields", s.fields().len())),
4759
)
60+
.field("allow_nullable_fields", &self.allow_nullable_fields)
4861
.finish()
4962
}
5063
}
@@ -54,6 +67,7 @@ impl ArrowStreamSerializerConfig {
5467
pub fn new(schema: Arc<arrow::datatypes::Schema>) -> Self {
5568
Self {
5669
schema: Some(schema),
70+
allow_nullable_fields: false,
5771
}
5872
}
5973

@@ -77,12 +91,25 @@ pub struct ArrowStreamSerializer {
7791
impl ArrowStreamSerializer {
7892
/// Create a new ArrowStreamSerializer with the given configuration
7993
pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, vector_common::Error> {
80-
let schema = config.schema.ok_or_else(|| {
94+
let mut schema = config.schema.ok_or_else(|| {
8195
vector_common::Error::from(
8296
"Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer."
8397
)
8498
})?;
8599

100+
// If allow_nullable_fields is enabled, transform the schema once here
101+
// instead of on every batch encoding
102+
if config.allow_nullable_fields {
103+
schema = Arc::new(Schema::new_with_metadata(
104+
schema
105+
.fields()
106+
.iter()
107+
.map(|f| Arc::new(make_field_nullable(f)))
108+
.collect::<Vec<_>>(),
109+
schema.metadata().clone(),
110+
));
111+
}
112+
86113
Ok(Self { schema })
87114
}
88115
}
@@ -172,18 +199,38 @@ pub fn encode_events_to_arrow_ipc_stream(
172199

173200
let schema_ref = schema.ok_or(ArrowEncodingError::NoSchemaProvided)?;
174201

175-
let record_batch = build_record_batch(schema_ref.clone(), events)?;
202+
let record_batch = build_record_batch(schema_ref, events)?;
176203

177204
let ipc_err = |source| ArrowEncodingError::IpcWrite { source };
178205

179206
let mut buffer = BytesMut::new().writer();
180-
let mut writer = StreamWriter::try_new(&mut buffer, &schema_ref).map_err(ipc_err)?;
207+
let mut writer =
208+
StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).map_err(ipc_err)?;
181209
writer.write(&record_batch).map_err(ipc_err)?;
182210
writer.finish().map_err(ipc_err)?;
183211

184212
Ok(buffer.into_inner().freeze())
185213
}
186214

215+
/// Recursively makes a Field and all its nested fields nullable
216+
fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
217+
let new_data_type = match field.data_type() {
218+
DataType::List(inner_field) => DataType::List(Arc::new(make_field_nullable(inner_field))),
219+
DataType::Struct(fields) => {
220+
DataType::Struct(fields.iter().map(|f| make_field_nullable(f)).collect())
221+
}
222+
DataType::Map(inner_field, sorted) => {
223+
DataType::Map(Arc::new(make_field_nullable(inner_field)), *sorted)
224+
}
225+
other => other.clone(),
226+
};
227+
228+
field
229+
.clone()
230+
.with_data_type(new_data_type)
231+
.with_nullable(true)
232+
}
233+
187234
/// Builds an Arrow RecordBatch from events
188235
fn build_record_batch(
189236
schema: Arc<Schema>,
@@ -1442,4 +1489,165 @@ mod tests {
14421489
assert!(!id_array.is_null(1));
14431490
assert!(!id_array.is_null(2));
14441491
}
1492+
1493+
#[test]
1494+
fn test_config_allow_nullable_fields_overrides_schema() {
1495+
use tokio_util::codec::Encoder;
1496+
1497+
// Create events: One valid, one missing the "required" field
1498+
let mut log1 = LogEvent::default();
1499+
log1.insert("strict_field", 42);
1500+
let log2 = LogEvent::default();
1501+
let events = vec![Event::Log(log1), Event::Log(log2)];
1502+
1503+
let schema = Arc::new(Schema::new(vec![Field::new(
1504+
"strict_field",
1505+
DataType::Int64,
1506+
false,
1507+
)]));
1508+
1509+
let mut config = ArrowStreamSerializerConfig::new(Arc::clone(&schema));
1510+
config.allow_nullable_fields = true;
1511+
1512+
let mut serializer =
1513+
ArrowStreamSerializer::new(config).expect("Failed to create serializer");
1514+
1515+
let mut buffer = BytesMut::new();
1516+
serializer
1517+
.encode(events, &mut buffer)
1518+
.expect("Encoding should succeed when allow_nullable_fields is true");
1519+
1520+
let cursor = Cursor::new(buffer);
1521+
let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
1522+
let batch = reader.next().unwrap().expect("Failed to read batch");
1523+
1524+
assert_eq!(batch.num_rows(), 2);
1525+
1526+
let binding = batch.schema();
1527+
let output_field = binding.field(0);
1528+
assert!(
1529+
output_field.is_nullable(),
1530+
"The output schema field should have been transformed to nullable=true"
1531+
);
1532+
1533+
let array = batch
1534+
.column(0)
1535+
.as_any()
1536+
.downcast_ref::<Int64Array>()
1537+
.unwrap();
1538+
1539+
assert_eq!(array.value(0), 42);
1540+
assert!(!array.is_null(0));
1541+
assert!(
1542+
array.is_null(1),
1543+
"The missing value should be encoded as null"
1544+
);
1545+
}
1546+
1547+
#[test]
1548+
fn test_make_field_nullable_with_nested_types() {
1549+
// Test that make_field_nullable recursively handles List and Struct types
1550+
1551+
// Create a nested structure: Struct containing a List of Structs
1552+
// struct { inner_list: [{ nested_field: Int64 }] }
1553+
let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
1554+
let inner_struct =
1555+
DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
1556+
let list_field = Field::new("item", inner_struct, false);
1557+
let list_type = DataType::List(Arc::new(list_field));
1558+
let outer_field = Field::new("inner_list", list_type, false);
1559+
let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
1560+
1561+
let original_field = Field::new("root", outer_struct, false);
1562+
1563+
// Apply make_field_nullable
1564+
let nullable_field = make_field_nullable(&original_field);
1565+
1566+
// Verify root field is nullable
1567+
assert!(
1568+
nullable_field.is_nullable(),
1569+
"Root field should be nullable"
1570+
);
1571+
1572+
// Verify nested struct is nullable
1573+
if let DataType::Struct(root_fields) = nullable_field.data_type() {
1574+
let inner_list_field = &root_fields[0];
1575+
assert!(
1576+
inner_list_field.is_nullable(),
1577+
"inner_list field should be nullable"
1578+
);
1579+
1580+
// Verify list element is nullable
1581+
if let DataType::List(list_item_field) = inner_list_field.data_type() {
1582+
assert!(
1583+
list_item_field.is_nullable(),
1584+
"List item field should be nullable"
1585+
);
1586+
1587+
// Verify inner struct fields are nullable
1588+
if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
1589+
let nested_field = &inner_struct_fields[0];
1590+
assert!(
1591+
nested_field.is_nullable(),
1592+
"nested_field should be nullable"
1593+
);
1594+
} else {
1595+
panic!("Expected Struct type for list items");
1596+
}
1597+
} else {
1598+
panic!("Expected List type for inner_list");
1599+
}
1600+
} else {
1601+
panic!("Expected Struct type for root field");
1602+
}
1603+
}
1604+
1605+
#[test]
1606+
fn test_make_field_nullable_with_map_type() {
1607+
// Test that make_field_nullable handles Map types
1608+
// Map is internally represented as List<Struct<key, value>>
1609+
1610+
// Create a map: Map<Utf8, Int64>
1611+
// Internally: List<Struct<entries: {key: Utf8, value: Int64}>>
1612+
let key_field = Field::new("key", DataType::Utf8, false);
1613+
let value_field = Field::new("value", DataType::Int64, false);
1614+
let entries_struct =
1615+
DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
1616+
let entries_field = Field::new("entries", entries_struct, false);
1617+
let map_type = DataType::Map(Arc::new(entries_field), false);
1618+
1619+
let original_field = Field::new("my_map", map_type, false);
1620+
1621+
// Apply make_field_nullable
1622+
let nullable_field = make_field_nullable(&original_field);
1623+
1624+
// Verify root field is nullable
1625+
assert!(
1626+
nullable_field.is_nullable(),
1627+
"Root map field should be nullable"
1628+
);
1629+
1630+
// Verify map entries are nullable
1631+
if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
1632+
assert!(
1633+
entries_field.is_nullable(),
1634+
"Map entries field should be nullable"
1635+
);
1636+
1637+
// Verify the struct inside the map is nullable
1638+
if let DataType::Struct(struct_fields) = entries_field.data_type() {
1639+
let key_field = &struct_fields[0];
1640+
let value_field = &struct_fields[1];
1641+
assert!(key_field.is_nullable(), "Map key field should be nullable");
1642+
assert!(
1643+
value_field.is_nullable(),
1644+
"Map value field should be nullable"
1645+
);
1646+
} else {
1647+
panic!("Expected Struct type for map entries");
1648+
}
1649+
} else {
1650+
panic!("Expected Map type for my_map field");
1651+
}
1652+
}
14451653
}

0 commit comments

Comments
 (0)