Skip to content

Commit 2583e49

Browse files
albertlockettjmacddrewrelmas
authored
fix some incompatibilities between Go and Rust schemas (#390)
Adds capability to the Rust implementation to handle some of the Arrow schema incompatibilities identified in #353 The incompatibilities identified were of two types: - The Go implementation using dictionary encoding, and Rust not expecting this arrow type - The Go implementation using a one binary type (FixedSizeBinary) for some fields (exemplar trace/span id) and Rust expecting a different one (Binary). To flexibly handle the different types, this PR introduces two wrappers around Arrow Arrays: - `MaybeDictionaryArrayAccessor` is an enum that can contain either the native array type, or a dictionary where the value is this type (Note: we originally had `StringArrayAccessor` that did this for strings, and this PR just makes the implementation generic so it also covers primitive arrays, FixedSizeBinary and Binary types). - `BinaryArrayAccessor` that can contain either `FixedSizeBinary` or `Binary` array type (or dictionaries containing these types) and be transparently treated as a `NullableArrayAccessor` that returns `Vec<u8>` Note tested using a protobuf message that was created by golang and consuming using rust ```rs use otel_arrow_rust::opentelemetry::BatchArrowRecords; use otel_arrow_rust::Consumer; use prost::Message; use std::fs::File; use std::io::Read; fn main() { // otap_metrics.pb contains a BatchArrowRecords message serialized as protobuf. // this was generated from golang including fixes to field nullability from this PR // #374 let mut file = File::open("otap_metrics.pb").unwrap(); let mut contents = vec![]; file.read_to_end(&mut contents).unwrap(); let mut bar = BatchArrowRecords::decode(contents.as_ref()).unwrap(); let mut consumer = Consumer::default(); let result = consumer.consume_metrics_batches(&mut bar); println!("Result is OK {}", result.is_ok()) // prints "Result is OK true" } ``` --------- Co-authored-by: Joshua MacDonald <[email protected]> Co-authored-by: Drew Relmas <[email protected]>
1 parent 38926ac commit 2583e49

File tree

8 files changed

+289
-136
lines changed

8 files changed

+289
-136
lines changed

rust/otel-arrow-rust/src/arrays.rs

Lines changed: 193 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
use crate::error;
13+
use crate::error::{self, InvalidListArraySnafu};
1414
use arrow::array::{
15-
Array, ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, DictionaryArray, Float32Array,
16-
Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, PrimitiveArray, RecordBatch,
17-
StringArray, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
15+
Array, ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, DictionaryArray,
16+
FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array,
17+
Int64Array, PrimitiveArray, RecordBatch, StringArray, TimestampNanosecondArray, UInt8Array,
18+
UInt16Array, UInt32Array, UInt64Array,
1819
};
1920
use arrow::datatypes::{ArrowDictionaryKeyType, TimeUnit};
2021
use arrow::datatypes::{ArrowNativeType, DataType, UInt8Type, UInt16Type};
@@ -95,6 +96,18 @@ impl NullableArrayAccessor for BinaryArray {
9596
}
9697
}
9798

99+
impl NullableArrayAccessor for FixedSizeBinaryArray {
100+
type Native = Vec<u8>;
101+
102+
fn value_at(&self, idx: usize) -> Option<Self::Native> {
103+
if self.is_valid(idx) {
104+
Some(self.value(idx).to_vec())
105+
} else {
106+
None
107+
}
108+
}
109+
}
110+
98111
impl NullableArrayAccessor for StringArray {
99112
type Native = String;
100113

@@ -221,76 +234,190 @@ where
221234
}
222235
}
223236

224-
pub type DictionaryStringArrayAccessor<'a, K> = DictionaryArrayAccessor<'a, K, StringArray>;
237+
/// Wrapper around various arrays that may return a byte slice. Note that
238+
/// this delegates to the underlying NullableArrayAccessor implementation
239+
/// for the Arrow array which copies the bytes when value_at is called
240+
pub enum ByteArrayAccessor<'a> {
241+
Binary(MaybeDictArrayAccessor<'a, BinaryArray>),
242+
FixedSizeBinary(MaybeDictArrayAccessor<'a, FixedSizeBinaryArray>),
243+
}
225244

226-
/// [StringArrayAccessor] allows to access string values from [StringArray]s and [DictionaryArray]s.
227-
pub enum StringArrayAccessor<'a> {
228-
/// Plain StringArray
229-
String(&'a StringArray),
230-
/// DictionaryArray with UInt8 keys and String values.
231-
Dictionary8(DictionaryStringArrayAccessor<'a, UInt8Type>),
232-
/// DictionaryArray with UInt16 keys and String values.
233-
Dictionary16(DictionaryStringArrayAccessor<'a, UInt16Type>),
245+
impl<'a> ByteArrayAccessor<'a> {
246+
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
247+
match arr.data_type() {
248+
DataType::Binary => {
249+
MaybeDictArrayAccessor::<BinaryArray>::try_new(arr).map(Self::Binary)
250+
}
251+
DataType::FixedSizeBinary(dims) => {
252+
MaybeDictArrayAccessor::<FixedSizeBinaryArray>::try_new(arr, *dims)
253+
.map(Self::FixedSizeBinary)
254+
}
255+
DataType::Dictionary(_, val) => match **val {
256+
DataType::Binary => {
257+
MaybeDictArrayAccessor::<BinaryArray>::try_new(arr).map(Self::Binary)
258+
}
259+
DataType::FixedSizeBinary(dims) => {
260+
MaybeDictArrayAccessor::<FixedSizeBinaryArray>::try_new(arr, dims)
261+
.map(Self::FixedSizeBinary)
262+
}
263+
_ => error::UnsupportedDictionaryValueTypeSnafu {
264+
expect_oneof: vec![DataType::Binary, DataType::FixedSizeBinary(-1)],
265+
actual: (**val).clone(),
266+
}
267+
.fail(),
268+
},
269+
_ => InvalidListArraySnafu {
270+
expect_oneof: vec![
271+
DataType::Binary,
272+
DataType::FixedSizeBinary(-1),
273+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Binary)),
274+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)),
275+
DataType::Dictionary(
276+
Box::new(DataType::UInt8),
277+
Box::new(DataType::FixedSizeBinary(-1)),
278+
),
279+
DataType::Dictionary(
280+
Box::new(DataType::UInt16),
281+
Box::new(DataType::FixedSizeBinary(-1)),
282+
),
283+
],
284+
actual: arr.data_type().clone(),
285+
}
286+
.fail(),
287+
}
288+
}
234289
}
235290

236-
impl NullableArrayAccessor for StringArrayAccessor<'_> {
237-
type Native = String;
291+
impl NullableArrayAccessor for ByteArrayAccessor<'_> {
292+
type Native = Vec<u8>;
238293

239294
fn value_at(&self, idx: usize) -> Option<Self::Native> {
240295
match self {
241-
StringArrayAccessor::String(s) => s.value_at(idx),
242-
StringArrayAccessor::Dictionary8(d) => d.value_at(idx),
243-
StringArrayAccessor::Dictionary16(d) => d.value_at(idx),
296+
Self::Binary(b) => b.value_at(idx),
297+
Self::FixedSizeBinary(b) => b.value_at(idx),
244298
}
245299
}
246300
}
247301

248-
impl<'a> StringArrayAccessor<'a> {
249-
pub fn new(a: &'a ArrayRef) -> error::Result<Self> {
250-
let result = match a.data_type() {
251-
DataType::Utf8 => {
252-
// safety: we've checked array data type
253-
Self::String(a.as_any().downcast_ref::<StringArray>().unwrap())
254-
}
255-
DataType::Dictionary(key, v) => {
256-
ensure!(
257-
**v == DataType::Utf8,
258-
error::UnsupportedStringColumnTypeSnafu {
259-
data_type: (**v).clone()
260-
}
261-
);
262-
match **key {
263-
DataType::UInt8 => Self::Dictionary8(DictionaryArrayAccessor::new(
264-
// safety: we've checked the key type
265-
a.as_any()
266-
.downcast_ref::<DictionaryArray<UInt8Type>>()
267-
.unwrap(),
268-
)),
269-
DataType::UInt16 => Self::Dictionary16(DictionaryArrayAccessor::new(
270-
// safety: we've checked the key type
271-
a.as_any()
272-
.downcast_ref::<DictionaryArray<UInt16Type>>()
273-
.unwrap(),
274-
)),
275-
_ => {
276-
return error::UnsupportedStringDictKeyTypeSnafu {
277-
data_type: a.data_type().clone(),
278-
}
279-
.fail();
280-
}
302+
/// Wrapper around an array that might be a dictionary or it might just be an unencoded
303+
/// array of the base type
304+
pub enum MaybeDictArrayAccessor<'a, V> {
305+
Native(&'a V),
306+
Dictionary8(DictionaryArrayAccessor<'a, UInt8Type, V>),
307+
Dictionary16(DictionaryArrayAccessor<'a, UInt16Type, V>),
308+
}
309+
310+
impl<'a, T> NullableArrayAccessor for MaybeDictArrayAccessor<'a, T>
311+
where
312+
T: Array + NullableArrayAccessor + 'static,
313+
{
314+
type Native = T::Native;
315+
316+
fn value_at(
317+
&self,
318+
idx: usize,
319+
) -> Option<<MaybeDictArrayAccessor<'a, T> as NullableArrayAccessor>::Native> {
320+
match self {
321+
Self::Native(s) => s.value_at(idx),
322+
Self::Dictionary8(d) => d.value_at(idx),
323+
Self::Dictionary16(d) => d.value_at(idx),
324+
}
325+
}
326+
}
327+
328+
impl<'a, T> MaybeDictArrayAccessor<'a, T>
329+
where
330+
T: Array + NullableArrayAccessor + 'static,
331+
{
332+
/// Inspects the given array to determine whether it can be treated as an array
333+
/// of the specified data type. The array must either:
334+
/// - Directly have the expected data type, or
335+
/// - Be a dictionary array whose value type matches the expected data type.
336+
///
337+
/// Returns a wrapped native array if the type matches.
338+
/// Returns an error if the array type can't be treated as this datatype
339+
fn try_new_with_datatype(data_type: DataType, arr: &'a ArrayRef) -> error::Result<Self> {
340+
// if the type isn't a dictionary, we treat it as an unencoded array
341+
if *arr.data_type() == data_type {
342+
return Ok(Self::Native(arr.as_any().downcast_ref::<T>().unwrap()));
343+
}
344+
345+
// determine if the type is a dictionary where the value is the desired datatype
346+
if let DataType::Dictionary(key, v) = arr.data_type() {
347+
ensure!(
348+
**v == data_type,
349+
error::UnsupportedDictionaryValueTypeSnafu {
350+
expect_oneof: vec![data_type],
351+
actual: (**v).clone()
281352
}
282-
}
283-
_ => {
284-
return error::UnsupportedStringColumnTypeSnafu {
285-
data_type: a.data_type().clone(),
353+
);
354+
355+
let result = match **key {
356+
DataType::UInt8 => Self::Dictionary8(DictionaryArrayAccessor::new(
357+
arr.as_any()
358+
.downcast_ref::<DictionaryArray<UInt8Type>>()
359+
.unwrap(),
360+
)),
361+
DataType::UInt16 => Self::Dictionary16(DictionaryArrayAccessor::new(
362+
arr.as_any()
363+
.downcast_ref::<DictionaryArray<UInt16Type>>()
364+
.unwrap(),
365+
)),
366+
_ => {
367+
return error::UnsupportedDictionaryKeyTypeSnafu {
368+
expect_oneof: vec![DataType::UInt8, DataType::UInt16],
369+
actual: (**key).clone(),
370+
}
371+
.fail();
286372
}
287-
.fail();
288-
}
289-
};
290-
Ok(result)
373+
};
374+
375+
return Ok(result);
376+
}
377+
378+
InvalidListArraySnafu {
379+
expect_oneof: vec![
380+
data_type.clone(),
381+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(data_type.clone())),
382+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(data_type.clone())),
383+
],
384+
actual: arr.data_type().clone(),
385+
}
386+
.fail()
291387
}
292388
}
293389

390+
impl<'a, V> MaybeDictArrayAccessor<'a, PrimitiveArray<V>>
391+
where
392+
V: ArrowPrimitiveType,
393+
{
394+
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
395+
Self::try_new_with_datatype(V::DATA_TYPE, arr)
396+
}
397+
}
398+
399+
impl<'a> MaybeDictArrayAccessor<'a, BinaryArray> {
400+
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
401+
Self::try_new_with_datatype(BinaryArray::DATA_TYPE, arr)
402+
}
403+
}
404+
405+
impl<'a> MaybeDictArrayAccessor<'a, FixedSizeBinaryArray> {
406+
pub fn try_new(arr: &'a ArrayRef, dims: i32) -> error::Result<Self> {
407+
Self::try_new_with_datatype(DataType::FixedSizeBinary(dims), arr)
408+
}
409+
}
410+
411+
impl<'a> MaybeDictArrayAccessor<'a, StringArray> {
412+
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
413+
Self::try_new_with_datatype(StringArray::DATA_TYPE, arr)
414+
}
415+
}
416+
417+
pub type Int32ArrayAccessor<'a> = MaybeDictArrayAccessor<'a, Int32Array>;
418+
pub type Int64ArrayAccessor<'a> = MaybeDictArrayAccessor<'a, Int64Array>;
419+
pub type StringArrayAccessor<'a> = MaybeDictArrayAccessor<'a, StringArray>;
420+
294421
pub struct DictionaryArrayAccessor<'a, K, V>
295422
where
296423
K: ArrowDictionaryKeyType,
@@ -311,8 +438,12 @@ where
311438
}
312439

313440
pub fn value_at(&self, idx: usize) -> Option<V::Native> {
314-
let offset = self.inner.key(idx).unwrap();
315-
self.value.value_at(offset)
441+
if self.inner.is_valid(idx) {
442+
let offset = self.inner.key(idx).unwrap();
443+
self.value.value_at(offset)
444+
} else {
445+
None
446+
}
316447
}
317448
}
318449

@@ -327,7 +458,7 @@ mod tests {
327458
fn test_dictionary_accessor() {
328459
let expected: DictionaryArray<UInt16Type> = vec!["a", "a", "b", "c"].into_iter().collect();
329460
let dict = Arc::new(expected) as ArrayRef;
330-
let accessor = StringArrayAccessor::new(&dict).unwrap();
461+
let accessor = StringArrayAccessor::try_new(&dict).unwrap();
331462
assert_eq!("a", accessor.value_at(0).unwrap());
332463
assert_eq!("a", accessor.value_at(1).unwrap());
333464
assert_eq!("b", accessor.value_at(2).unwrap());

rust/otel-arrow-rust/src/error.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,13 @@ pub enum Error {
9696
location: Location,
9797
},
9898

99-
#[snafu(display("Invalid List array data type, expect {}, actual {}", expect, actual))]
99+
#[snafu(display(
100+
"Invalid List array data type, expect one of {:?}, actual {}",
101+
expect_oneof,
102+
actual
103+
))]
100104
InvalidListArray {
101-
expect: DataType,
105+
expect_oneof: Vec<DataType>,
102106
actual: DataType,
103107
#[snafu(implicit)]
104108
location: Location,
@@ -139,6 +143,30 @@ pub enum Error {
139143
location: Location,
140144
},
141145

146+
#[snafu(display(
147+
"Unsupported dictionary key type, expect one of {:?}, actual {}",
148+
expect_oneof,
149+
actual
150+
))]
151+
UnsupportedDictionaryKeyType {
152+
expect_oneof: Vec<DataType>,
153+
actual: DataType,
154+
#[snafu(implicit)]
155+
location: Location,
156+
},
157+
158+
#[snafu(display(
159+
"Unsupported dictionary value type. expect {:?}, actual {}",
160+
expect_oneof,
161+
actual
162+
))]
163+
UnsupportedDictionaryValueType {
164+
expect_oneof: Vec<DataType>,
165+
actual: DataType,
166+
#[snafu(implicit)]
167+
location: Location,
168+
},
169+
142170
#[snafu(display("Unsupported string column type, given: {}", data_type))]
143171
UnsupportedStringColumnType {
144172
data_type: DataType,

rust/otel-arrow-rust/src/otlp/attributes/parent_id.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,42 +10,30 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
use crate::arrays::NullableArrayAccessor;
1413
use crate::otlp::attributes::decoder::{
1514
Attrs16ParentIdDecoder, Attrs32ParentIdDecoder, AttrsParentIdDecoder,
1615
};
17-
use arrow::array::{UInt16Array, UInt32Array};
18-
use arrow::datatypes::DataType;
16+
use arrow::datatypes::{UInt16Type, UInt32Type};
1917
use num_enum::TryFromPrimitive;
2018
use std::hash::Hash;
2119
use std::ops::{Add, AddAssign};
2220

2321
pub trait ParentId: Copy + Hash + Eq + Default + Add<Output = Self> + AddAssign {
24-
type Array: NullableArrayAccessor<Native = Self> + 'static;
25-
26-
fn arrow_data_type() -> DataType;
22+
type ArrayType;
2723

2824
fn new_decoder() -> AttrsParentIdDecoder<Self>;
2925
}
3026

3127
impl ParentId for u16 {
32-
type Array = UInt16Array;
33-
34-
fn arrow_data_type() -> DataType {
35-
DataType::UInt16
36-
}
28+
type ArrayType = UInt16Type;
3729

3830
fn new_decoder() -> AttrsParentIdDecoder<Self> {
3931
Attrs16ParentIdDecoder::default()
4032
}
4133
}
4234

4335
impl ParentId for u32 {
44-
type Array = UInt32Array;
45-
46-
fn arrow_data_type() -> DataType {
47-
DataType::UInt32
48-
}
36+
type ArrayType = UInt32Type;
4937

5038
fn new_decoder() -> AttrsParentIdDecoder<Self> {
5139
Attrs32ParentIdDecoder::default()

0 commit comments

Comments
 (0)