Skip to content

feat: add function to materialize transport optimized parent ids #455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

albertlockett
Copy link
Member

@albertlockett albertlockett commented May 16, 2025

Adds a function in otlp::attributes::decoder called materialize_parent_id that will take the "transport optimized" into the actual parent IDs.

Internally this method tries to make use of the eq compute kernel for all the type, key & value comparisons when deciding what to delta encode. The motivation is that kernels can leverage SIMD if compiled with the correct rustflags (see module comments)

Part of #449

@albertlockett albertlockett requested a review from a team as a code owner May 16, 2025 21:35
@albertlockett
Copy link
Member Author

For reference -- the logic for creating what we're decoding in this code can be found here:

func (s *Attrs16ByTypeKeyValueParentId) Sort(attrs []Attr16) []string {
sort.Slice(attrs, func(i, j int) bool {
attrsI := attrs[i]
attrsJ := attrs[j]
if attrsI.Value.Type() == attrsJ.Value.Type() {
if attrsI.Key == attrsJ.Key {
cmp := Compare(attrsI.Value, attrsJ.Value)
if cmp == 0 {
return attrsI.ParentID < attrsJ.ParentID
} else {
return cmp < 0
}
} else {
return attrsI.Key < attrsJ.Key
}
} else {
return attrsI.Value.Type() < attrsJ.Value.Type()
}
})
return []string{constants.AttributeType, constants.AttributeKey, constants.Value, constants.ParentID}
}
func (s *Attrs16ByTypeKeyValueParentId) Encode(parentID uint16, key string, value *pcommon.Value) uint16 {
if s.prevValue == nil {
s.prevKey = key
s.prevValue = value
s.prevParentID = parentID
return parentID
}
if s.IsSameGroup(key, value) {
delta := parentID - s.prevParentID
s.prevParentID = parentID
return delta
} else {
s.prevKey = key
s.prevValue = value
s.prevParentID = parentID
return parentID
}
}

And here's what we consider "equal" values for purposes of choosing whether or not to delta encode some row:
func Equal(a, b *pcommon.Value) bool {
if a == nil || b == nil {
return false
}
switch a.Type() {
case pcommon.ValueTypeInt:
if b.Type() == pcommon.ValueTypeInt {
return a.Int() == b.Int()
} else {
return false
}
case pcommon.ValueTypeDouble:
if b.Type() == pcommon.ValueTypeDouble {
return a.Double() == b.Double()
} else {
return false
}
case pcommon.ValueTypeBool:
if b.Type() == pcommon.ValueTypeBool {
return a.Bool() == b.Bool()
} else {
return false
}
case pcommon.ValueTypeStr:
if b.Type() == pcommon.ValueTypeStr {
return a.Str() == b.Str()
} else {
return false
}
case pcommon.ValueTypeBytes:
if a.Type() == pcommon.ValueTypeBytes && b.Type() == pcommon.ValueTypeBytes {
return bytes.Equal(a.Bytes().AsRaw(), b.Bytes().AsRaw())
} else {
return false
}
case pcommon.ValueTypeMap:
return false
case pcommon.ValueTypeSlice:
return false
case pcommon.ValueTypeEmpty:
return false
default:
return false
}
}

@albertlockett albertlockett force-pushed the improve-attributes-materialization-perf branch from d361ec2 to 5fb82b7 Compare May 16, 2025 21:35
Copy link

codecov bot commented May 16, 2025

Codecov Report

Attention: Patch coverage is 97.52475% with 10 lines in your changes missing coverage. Please review.

Project coverage is 61.18%. Comparing base (52d7cb0) to head (435f75e).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #455      +/-   ##
==========================================
+ Coverage   60.65%   61.18%   +0.52%     
==========================================
  Files         185      185              
  Lines       26379    26783     +404     
==========================================
+ Hits        16001    16386     +385     
- Misses       9842     9856      +14     
- Partials      536      541       +5     
Components Coverage Δ
otap-dataflow 68.29% <ø> (ø)
beaubourg 67.19% <ø> (ø)
otel-arrow-rust 64.30% <98.25%> (+3.10%) ⬆️
query_abstraction 81.42% <ø> (ø)
syslog_cef_receivers 99.17% <ø> (ø)
otel-arrow-go 52.80% <0.00%> (-0.13%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. I just think it would be interesting to run a criterion benchmark on this logic to test a few optimization ideas.

For example, in the AttrsParentIdDecoder::decode method can't we avoid the allocations (to_string(), and clone()) with the specification of a lifetime on the AttrsParentIdDecoder struct and on the 2 parameters of the method?

Also instead of computing create_next_element_equality_array each time we hit a type/key boundary, can't we precompute the “next element” boolean arrays once for the full key and type columns, then just index into those bitmaps?

/// This returns a new RecordBatch with the parent_id column replaced with the materialized id.
///
#[allow(unused)] // TODO -- remove allow(unused) when we use this to optimize decoding OTAP
pub fn materialize_parent_id<T>(record_batch: &RecordBatch) -> Result<RecordBatch>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should declare this function as async so that we can yield asynchronously every n iterations to hand back control in the case of very large batches. For now, we can simply have the async signature in place and add the yields later. I seem to remember that DataFusion does something similar for potentially long-running operations. Can you confirm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Cool!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if DataFusion makes heavy use of this. The only place I could find in their code that they do this is in RepartitionExec in the case that the input stream is endless:
https://github.com/apache/datafusion/blob/8d9c0f6b87d9f3a52e4d3dc642535d09cf86049f/datafusion/physical-plan/src/repartition/mod.rs#L923

That said they do suggest implementers of ExecutionPlan (aka a step in DF's physical plan) do yield in special circumstances (for example, if the time complexity is non linear).

But in this case, the performance of this function should be linear. If this method running too long is a concern, I wonder if instead of marking it async and yielding, that if in the future (if/when we need it) we could wrap this in an implementation of futures::Stream that would process the record batch in chunks?

e.g. something like

pub struct ParentIdMaterializerStream {
    schema: SchemaRef,
    input: Arc<RecordBatch>,
    chunk_size: usize,
    offset: usize,
}

impl futures::Stream for ParentIdMaterializerStream {
    type Item = RecordBatch;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.offset >= self.input.num_rows() {
            return Poll::Ready(None);
        }

        let remaining = self.input.num_rows() - self.offset;
        let to_process = std::cmp::min(self.chunk_size, remaining);
        let columns: Vec<ArrayRef> = self
            .input
            .columns()
            .iter()
            .map(|col| col.slice(self.offset, to_process))
            .collect();

        let batch_slice = RecordBatch::try_new(self.schema.clone(), columns)?;

        // TODO pass initial parent ID & last row or something to give context of previous row
        let result = materialize_parent_id::<u16>(&batch_slice).unwrap()

        self.offset += to_process;
        Poll::Ready(Some(result))
    }
}

WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to DataFusion docs for more context about ExecutionPlan: & when to yield:
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albertlockett I think you can track this point as a GH issue, with the goal of comparing the stream-based approach versus a simple await/yield point, both in terms of performance and memory allocation. The fact that we are currently trying to use a thread-per-core approach means we need to pay a bit more attention to regularly yielding control, more so than we would with a work-stealing approach. This will need to be addressed in future PRs, including benchmarking and behavior in case of cancellation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created an issue to track this: #479

/// | "a1" | str("a") | 0 | <-- parent id = 0
/// | "a1" | str("a") | 0 | <-- key & val == previous row -> delta encoded -> parent id = prev id + 0 = 0
/// | "a1" | str("a") | 1 | <-- key & val == previous row -> delta encoded -> parent id = prev id + 1 = 1
/// | "a1" | str("a") | 2 | <-- key & val == previous row -> delta encoded -> parent id = prev id + 2 = 3
Copy link
Contributor

@utpilla utpilla May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super familiar with arrow's delta encoding, but I was wondering if there should be a 2 here instead of 3

Suggested change
/// | "a1" | str("a") | 2 | <-- key & val == previous row -> delta encoded -> parent id = prev id + 2 = 3
/// | "a1" | str("a") | 2 | <-- key & val == previous row -> delta encoded -> parent id = prev id + 2 = 2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this comment, I was trying to explain that the "prev id" was the ID from line 97, which is 1 and the delta is 2.

1 (prev id) + 2 (delta) = 3

{
// if the batch is empty, just skip all this logic and return a batch
if record_batch.num_rows() == 0 {
return Ok(record_batch.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the arrow crate offers a method to create an empty RecordBatch. It might be good to check if using that would be more performant. https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html#method.new_empty

Suggested change
return Ok(record_batch.clone());
return Ok(RecordBatch::new_empty(record_batch.schema()));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if empty_batch is more optimized than clone.

It looks like RecordBatch::new_empty creates a new vec of empty arrays for each field in the schema, and in doing so creates new Arc for each column). By contrast, RecordBatch::clone will create will clone the Arc for the schema, then create a new vec and fill it with cloned Arc for each exiting empty column.

I thought cloning an Arc::clone was generally faster than Arc::new, so I'd expect RecordBatch::clone to be faster than new_empty.

I'm planning on writing a benchmark for this tomorrow as suggested here, so I could test this out to confirm either way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla using RecordBatch::new_empty here is quite a bit slower:

Current code path calling record_batch.clone()

$ RUST_BACKTRACE=full  cargo bench --bench materialize_parent_id
# ...
materialize_parent_ids/materialize_parent_ids/0
                        time:   [21.644 ns 21.675 ns 21.707 ns]
$ RUST_BACKTRACE=full  cargo bench --bench materialize_parent_id
# ...
materialize_parent_ids/materialize_parent_ids/0
                        time:   [1.4948 µs 1.4995 µs 1.5049 µs]
                        change: [+6776.3% +6803.6% +6828.3%] (p = 0.00 < 0.05)
                        Performance has regressed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems off if the only major difference between the two is about Arc:new vs Arc::clone. From 20ns to 1.5 us. That's almost 75 times more!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the difference can be explained by the fact that a clone only duplicates the Arcs, whereas the empty call not only creates the Arcs but also all the data structures pointed to by those Arcs.

@albertlockett albertlockett force-pushed the improve-attributes-materialization-perf branch from 313f3cd to 8944f11 Compare May 20, 2025 13:52
@albertlockett
Copy link
Member Author

Thanks for the review @lquerel

Overall LGTM. I just think it would be interesting to run a criterion benchmark on this logic to test a few optimization ideas.

For example, in the AttrsParentIdDecoder::decode method can't we avoid the allocations (to_string(), and clone()) with the specification of a lifetime on the AttrsParentIdDecoder struct and on the 2 parameters of the method?

Yeah I agree. But I'm not sure it's worth fixing in this PR because in a followup PR, I think I can replace AttributeParentIdDecoder with this method where it's called in AttributeStore.

Also instead of computing create_next_element_equality_array each time we hit a type/key boundary, can't we precompute the “next element” boolean arrays once for the full key and type columns, then just index into those bitmaps?

I added a benchmark for this, and you're right, computing the create_next_element_equality_array array for the entire key column at once is a bit faster (and it makes the code simpler). In fact, that's how I had originally written it, so it was straight-forward to change it back. We do allocate a larger array than than is technically needed, but it's probably faster because we only do one allocation.

materialize_parent_ids/materialize_parent_ids/128
                        time:   [3.5858 µs 3.5949 µs 3.6041 µs]
                        change: [-9.5912% -9.2245% -8.8545%] (p = 0.00 < 0.05)
                        Performance has improved.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvements

@lquerel lquerel merged commit 735dc4e into open-telemetry:main May 20, 2025
41 checks passed
jmacd pushed a commit that referenced this pull request May 23, 2025
…#468)

Part of #449 

This PR adds:
- A new structure called `OtapBatch` that encapsulates all the record
batches associated with a `BatchArrowRecord`. It provides efficient
methods for setting & getting the record batches by payload type. This
has been integrated into the decoder code.
- Adds new `otap::transform` module, methods for sorting record batch by
parent ID and removing delta encoding.
- Adds helper methods for updating schema/field metadata, and invokes
these methods to update the metadata with the current state of the
record batches. (state including things like the encoding of the parent
ID column, and the column sort order).

_The scope of this PR has somewhat changed. I'll leave the original
description below, for posterity_.

---
**Original Description:**

Restores some of the code from
#447 , but with a
slightly different implementation of the "Version 2" attribute store.
- This new version assumes that the record batch it receives is sorted
by the parent ID.
- To do this sort, we first materialize the parent IDs using the code in
this PR: #455 and then
sort the record batch (see implementation of `sort_by_parent_id`).
- This new store also lazily materializes the KeyValues, by returning an
iterator from it's `attribute_by_delta_id` method (similar to what we
did in #447).
- Note that this new attribute store implementation _does not_ support
random access. It expects `attribute_by_delta_id` to be called with the
delta ID of each parent ID.

The benchmark results are interesting.. The v2 store gets slightly worse
performance for small batch sizes (128), but gets much better
performance for larger batch sizes (8092), which is encouraging!
```
materialize_parent_ids/v1_attr_store/128
                        time:   [7.6742 µs 7.6967 µs 7.7190 µs]

materialize_parent_ids/v2_attr_store/128
                        time:   [10.495 µs 10.515 µs 10.534 µs]

materialize_parent_ids/v1_attr_store/1536
                        time:   [118.21 µs 118.76 µs 119.32 µs]

materialize_parent_ids/v2_attr_store/1536
                        time:   [89.421 µs 89.582 µs 89.765 µs]

materialize_parent_ids/v1_attr_store/8092
                        time:   [1.2816 ms 1.2888 ms 1.2966 ms]

materialize_parent_ids/v2_attr_store/8092
                        time:   [464.49 µs 465.22 µs 466.00 µs]
```

Still very much WIP. One thing that needs to be sorted is where/when we
apply this transform materialize parent IDs and sort the record batch,
and how we track the state of the batch. This PR is currently doing it
`decode::decoder::Consumer::consume_bar`, which may not be the right
place.

We might consider not merging all the code in this PR straight away, as
some of this work is exploratory/proof of concept
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants