Skip to content

Enhance Kafka exporter to respect max message size #36982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
yurishkuro opened this issue Dec 29, 2024 · 26 comments
Open

Enhance Kafka exporter to respect max message size #36982

yurishkuro opened this issue Dec 29, 2024 · 26 comments
Assignees
Labels
enhancement New feature or request exporter/kafka good first issue Good for newcomers help wanted Extra attention is needed Stale

Comments

@yurishkuro
Copy link
Member

Component(s)

exporter/kafka

Is your feature request related to a problem? Please describe.

The exporter has a config option MaxMessageBytes but it itself does not respect it and attempts to send a full serialized payload to the Kafka driver which may reject it based on this setting.

Describe the solution you'd like

Most payloads can be safely split into chunks of "safe" size that will be accepted by the Kafka driver. For example, in Jaeger integration tests there is a test that writes a trace with 10k spans, which is 3Mb in size when serialized as JSON. The trace can be trivially split into multiple messages that would fit in the default 1Mb size limit.

Describe alternatives you've considered

No response

Additional context

jaegertracing/jaeger#6437 (comment)

@yurishkuro yurishkuro added enhancement New feature or request needs triage New item requiring triage help wanted Extra attention is needed good first issue Good for newcomers labels Dec 29, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@RenuBhati
Copy link

Hey @yurishkuro, I would like to contribute to this issue. Can you please assign me this?

@RenuBhati
Copy link

Hey @yurishkuro, I’m tied up with another project and it’s taking longer than expected. I won't be able to pick this up. Thanks for your understanding.

@RenuBhati RenuBhati removed their assignment Dec 31, 2024
@LZiHaN
Copy link
Member

LZiHaN commented Jan 1, 2025

hi @yurishkuro , is this task still available? I’d like to give it a try, assign please

@JaredTan95 JaredTan95 removed the needs triage New item requiring triage label Jan 3, 2025
@chahatsagarmain
Copy link

@LZiHaN are you working on this ?

@LZiHaN
Copy link
Member

LZiHaN commented Jan 5, 2025

@LZiHaN are you working on this ?

Yes, I'm working on it.

@LZiHaN
Copy link
Member

LZiHaN commented Jan 8, 2025

Hi @yurishkuro ,

I’m working on implementing this feature and wanted to confirm if the approach I’m considering for message splitting and reassembling is feasible.
My plan is as follows:

  1. Splitting the Message: When a message exceeds MaxMessageBytes, I will split it into multiple chunks. Each chunk will include the following information in the message headers:
  • message_id (to identify the original message),
  • chunk_index (the position of this chunk in the message),
  • total_chunks (the total number of chunks for the message).
  1. Reassembling the Message: On the consumer side, I will use the message_id in the header to group the chunks. Each chunk will have its chunk_index and total_chunks in the header to help with ordering. Once all chunks are received, I will concatenate the Value fields from each chunk to reassemble the original message.

Is this approach viable, and would it work seamlessly with the Kafka producer/consumer setup? Or are there any potential issues with storing this information in the headers and reassembling the message on the consumer side? Looking forward to your feedback.

@yurishkuro
Copy link
Member Author

@LZiHaN this is a possible approach but it would be a breaking change since a consumer that does not understand this chunking may not be able to reassemble the message. My idea was that we instead split the spans from the payload into multiple payloads, such that each payload fits in the MaxMessageSize when serialized. It's not quite simple to implement because it's possible the payload has one huge spans, but if we can split it this way then it's a fully backwards compatible solution.

@shivanshuraj1333
Copy link
Member

@yurishkuro this seems to introduce some performance tradeoffs:

So, this marshaller, creates a message, which is then exported using sarama client to kafka. Now irrespective of what kafka configuration is, this export would fail for sizes which exceeds the client configuration (exporter).

Now, one solution could be to implement chunking traces received here based on the configured size. (as you suggested)

However, it looks like we need to calculate the size and then do the chunking based on the resulting size.
The tradeoff I see with this approach:

  1. We need to calculate the size multiple times
    • Receive the trace
    • Calculate the size
      • If the size is greater than the configuration, chunk and try again with the smaller chunks
  2. How to chunk?
    • Any preferred algorithm? (maybe binary search the spans, based on number of spans, and split accordingly)

Another approach could be appending spans in a given trace, while calculating size after each append, once the limit is received, again build the resource packet and start appending to it. For this a new size aware SplitTraces can be created.

Or maybe some other approach, WDYT would be a better implementation to it?

The size check is done here in the sarama client, and this ByteSize function can be used in kafka exporter to calculate the size.

@yurishkuro
Copy link
Member Author

To avoid perf issues the marshaling can be optimistic: try to marshal the whole thing first and only if the result is larger than max message then try to chunk the spans. As for the chunking algorithm, you don't need to be stuck in analysis paralysis, just write something that works correctly. However you implement it will be an improvement over the current state since now the message just gets dropped.

@yurishkuro
Copy link
Member Author

Btw binary search sounds like a reasonable approach - keep dividing spans in half until acceptable size is produced. Other methods would be hard since marshalers do not allow concatenation of serialized parts.

@awangc
Copy link

awangc commented Feb 17, 2025

We'd definitely benefit from having this issue addressed (with the PR above merged)! Had a question on this issue, though, so currently there would be no way to address this if a single span has size greater than max message size other than drop it (like in the PR above)?

@jrauschenbusch
Copy link

@yurishkuro What is the status of this ticket? Indeed it's an issue when combined with the batch processor (which just caps by the number of span items, not the size of them). So currently the only chance is to lower the number and configure a good enough solution, which in turn will not lead to an optimal message size and therefore overhead on the Kafka brokers. If the Kafka exporter would allow a split this would be much more efficient.

@yurishkuro
Copy link
Member Author

There is a PR attached, but I think it's more complex than it needs to be.

@jrauschenbusch
Copy link

I'll take a look into it and would also like to contribute a good enough solution. I mean you already mentioned a feasible good enough approach where messages which are too large just get split in half. I mean at the end one needs to find a proper batch configuration (maxBatchSize) that these cases are reduced as much as possible (by checking the exposed metrics). So trying to "get the overall max message size utilization as high as possible" is scientific nice, but it's neither pragmatic nor it's good from a performance point of view.

@shivanshuraj1333
Copy link
Member

hey @jrauschenbusch there's already a PR open where I am doing the chunking #37176

I couldn't work on it as I got occupied at work, can continue to work on it.

@jrauschenbusch
Copy link

Hey @shivanshuraj1333. I've seen the attached PR, but wonder about the statement here:

There is a PR attached, but I think it's more complex than it needs to be

Imho this change should absolutely not break any existing consumers nor the OTLP specification. So an OTLP message should be consumable as is w/o any further modifications at the consumers.

My main purpose is to split OTLP Traces via otlp_proto. I know that there are other formats available which must be considered either.

Split of a ExportTraceServiceRequest requires that the Resources -> InstrumentationScopes -> Spans relationships must be kept as is. Hence the strategy would be to split Spans, but keep their relationships. Otherwise the context of Traces is modified.

I cannot tell anything about metrics or logs. But i guess the structural nesting is nearly the same in the OTLP format.

Jaeger is might different. But also here it should not break existing consumers.

@shivanshuraj1333
Copy link
Member

  1. the change proposed in the PR is not a breaking change
  2. export protocol is not modified, the marshaling is happening as it is supposed to happen

Tricky part is:

  1. how to calculate size https://github.com/IBM/sarama/blob/main/async_producer.go#L454
  2. how to split the batch when the size is greater than the configured value, I'm using binary search for that, couldn't think of an easier chunking algo

I shall revisit my implementation, if you have any comments on the logic, please feel free to add.

Also, if you can come up with an easier implementation, please feel free to raise a PR for it.

@jrauschenbusch
Copy link

Short update: By having a short look into your PR this is already the design you've chosen which is totally fine for me.

  1. Naive approach by trying to first marshal the whole Trace into a single message and if it fits send it immediately.
  2. If it's too large, then split by binary search, adding as much spans that fit into a message, still keeping the Resource->InstrumentationScope->Spans relationship.

I would say: Good enough 😅 As i've mentioned before the "normal" situation should not be to have too large batches all the time. Batches should be configured in a way to have most of the time a good message utilization (not perfect) and sometimes maybe oversized messages which should then be covered by this feature to not loose data. At least this is my opinion.

Just a short question: If this PR would be finished, is it just an enhancement for Trace data? For me this would be fine, but the exporter seems to also cover metrics and logs. Issue should be the same, right?

@shivanshuraj1333
Copy link
Member

  1. a user wouldn't know which batch size is good
  2. the current maxbatchsize is not at all used to evaluate size during export
  3. Yes other signals would have same problem but I haven't checked the code yet.

@jrauschenbusch
Copy link

You’re right.

  1. Optimization of the batch size would require a measurement via a Prometheus Counter metric how often a message was splitted and a debug log for which size it was splitted. This would allow it to enhance the throughput by not defining too large batches.
  2. Right, but not a problem as one could optimize by the metric mentioned before.

@yurishkuro
Copy link
Member Author

I mean at the end one needs to find a proper batch configuration (maxBatchSize) that these cases are reduced as much as possible (by checking the exposed metrics)

There is a difference between lacking optimization and causing data loss. The only time data loss is unavoidable is if a single span is too large for the message size. Everything else the algorithm is supposed to handle correctly (if not very efficiently).

@jrauschenbusch
Copy link

Ok. So what is the status now? Will this improvement be continued? I'm very interested because it's a major drawback now to not have it as it requires non-optimal batch sizes to be configured and still it's not a real stable setup as there can be too large batches.

@shivanshuraj1333
Copy link
Member

I'm restricted by bandwidth, will try to get the PR merged.

@jrauschenbusch
Copy link

Bandwidth? You mean you have a bad network connection, or what? 😅 But i rather guess you are currently stuck with other projects, right?

Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label May 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request exporter/kafka good first issue Good for newcomers help wanted Extra attention is needed Stale
Projects
None yet
8 participants