Skip to content

Exporter batcher dynamic sharding of the partitions #12473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
bogdandrutu opened this issue Feb 25, 2025 · 6 comments
Open

Exporter batcher dynamic sharding of the partitions #12473

bogdandrutu opened this issue Feb 25, 2025 · 6 comments

Comments

@bogdandrutu
Copy link
Member

bogdandrutu commented Feb 25, 2025

Component(s)

exporter/exporterhelper

Describe the issue you're reporting

Current Status

Currently the exporter batcher does not support "metadata" batching (which is supported by the batch processor), and it only has one active batch at any moment.

The current implementation that only supports one active batch has throughput issues, because the collector cannot linearly scale, since batching the request requires exclusive locks only one goroutine can "batch" at any point.

To solve the linearly scale problem, we need to get to be able to consume in parallel from the "queue" multiple requests while constructing multiple batches at the same time. This naturally happens when using metadata batching, but even that will suffer sometimes because of hot partitions that will not allow multiple updates at the same time.

In the current design where we always have a queue (even for the sync requests that wait for the response, we still have a logical queue there) all implementations are suffering from this problem, since batching is "single threaded"

Proposal

In this proposal, we will use the following terminologies:

  1. A partition represents the logical separation of the data based on the input (e.g. a key in the context or an entry in the resource attributes). Even when no partitioning is configured (the current state), we can consider that we always have one partition.
  2. A partition may be split into multiple shards and multiple batches may be produced for the same partition when the incoming load is larger than what a single goroutine can progress.

Implementation details

In order to implement the sharding capability of a partition, the queue batching needs to keep some statistics:

  1. Number of "blocked in the queue" requests.
  2. Number of requests processed per each partition.

There will be a "sharder" task that will support dynamic sharding using two actions:

  1. Split a partition into multiple shards.
  2. Merge shards from a partition into fewer shards.

The sharder task will be executed periodically (e.g. every minute) and based on the number of "blocked" requests and the traffic pattern from last N minutes per partition will trigger split and/or merge requests to different partitions.

The consumer goroutines that consume requests from the "queue" (as explained, we always have a queue, may or may not wait for the response):

  1. Pick one request and determine the right partition for that request based on the configuration (using metadata from context, or any other supported mechanism).
  2. If the determined partition has multiple shards, use a round-robin mechanism to select the shard in which the incoming request goes.
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@bogdandrutu bogdandrutu changed the title Exporter batcher sharding and partitioning Exporter batcher dynamic sharding and partitioning Feb 25, 2025
@axw
Copy link
Contributor

axw commented Feb 25, 2025

@bogdandrutu lack of partitioning is holding us back from using the batcher, and I was anticipating the single-threaded issue, so I'd certainly welcome these capabilities.

Do you have in mind that the batcher itself does the partitioning (#10825), or should that responsibility be further up the pipeline before queuing? i.e. the alternative would be that each partition has its own (logical) queue for isolation, and behind that is a partition-specific instance of the batcher; each batcher would have its own sharder task.

Related:

@bogdandrutu
Copy link
Member Author

bogdandrutu commented Feb 25, 2025

@axw thanks for the comment, for this specific issue I was mostly focus on the dynamic sharding and will try to keep it as much as possible that way.

I do understand that for partitioning there are 2 use cases: 1. The downstream service is the same, so all partitions are part of the same failure domain; 2. The downstream service is different, so every partition is part of a different failure domains.

Will discuss this separately in the partitioning issue how to handle these 2 cases.

@bogdandrutu bogdandrutu changed the title Exporter batcher dynamic sharding and partitioning Exporter batcher dynamic sharding of the partitions Feb 25, 2025
@dmitryax
Copy link
Member

Thanks for putting the proposal, @bogdandrutu. It makes sense. I believe the performance bottleneck can be addressed separately using the dynamic sharding only, given that the partitioning can potentially be done in a different place (as mentioned in 2 case above).

@adriangb
Copy link

I'm very excited about this development. I would like to test using the OTEL Collector as a batching mechanisms for ingestion in a multi-tenant system. If it's able to do per-tenant batching and be disk persistent so data is not lost because of upstream failures it would be a perfect out of the box system for this use case that a lot of telemetry systems have 😄, which I think is why these features are being developed but since I wasn't 100% sure I wanted to share the use case.

@Erog38
Copy link

Erog38 commented Apr 4, 2025

I'm super excited about this proposal. Thank you!

github-merge-queue bot pushed a commit that referenced this issue Jun 5, 2025
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR introduces two new components
* `Partitioner` - an interface for fetching batch key. A partitioner
type should implement the function `GetKey()` which returns the batching
key. `Partitioner` should be provided to the `queue_bacher` along with
`sizer` in `queue_batch::Settings`.
* `multi_batcher`. It supports key-based batching by routing the
requests to a corresponding `shard_batcher`. Each `shard_batcher`
corresponds to a shard described in
#12473.

<!-- Issue number if applicable -->
#### Link to tracking issue
#12795

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Dmitry Anoshin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants