Skip to content

[receiver/kafka] backoff in case of next consumer error #37009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 24, 2025

Conversation

yiquanzhou
Copy link
Contributor

@yiquanzhou yiquanzhou commented Jan 2, 2025

Description

Currently if the next consumer returns an error, kafka receiver simply returns the error and will consume the next message without any backpressure. This behavior is not optimal in case of some errors. For example the memory limiter could return data refused error when the memory usage of the collector is too high. Keeping consuming and sending messages to the memory limiter could further increase the memory usage and cause OutOfMemory error in the collector.

This PR provides an optional error backoff config which allows to wait before consuming the next message in case of errors that require backoff.

Testing

  • Added a test case to TestXXXConsumerGroupHandler_error_nextConsumer tests with an error that requires backoff.

Documentation

  • Added the configuration for error_backoff

Copy link

linux-foundation-easycla bot commented Jan 2, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@yiquanzhou yiquanzhou changed the title Kafka receiver backoff [receiver/kafka] backoff in case of next consumer error Jan 2, 2025
@yiquanzhou yiquanzhou changed the title [receiver/kafka] backoff in case of next consumer error [receiver/kafka] backoff in case of next consumer error (WIP) Jan 2, 2025
@yiquanzhou yiquanzhou force-pushed the kafka-receiver-backoff branch from 79afbda to 78a77fe Compare January 3, 2025 08:40
@yiquanzhou yiquanzhou changed the title [receiver/kafka] backoff in case of next consumer error (WIP) [receiver/kafka] backoff in case of next consumer error Jan 3, 2025
@yiquanzhou yiquanzhou marked this pull request as ready for review January 3, 2025 13:50
@yiquanzhou yiquanzhou requested a review from a team as a code owner January 3, 2025 13:50
@@ -582,8 +593,22 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should messages be marked if you're going to retry them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if the next consumer returns an error, the kafka receiver simply drops the message and returns the error. At the beginning I intended to keep this behavior and only implement backoff delays without retrying the message. But then I went through the code again and I think we could implement the retry without introducing too much complexity.

I've updated the code and added some comments to explain. Let me know if that makes sense.

I've updated the unit test but I wonder how I could test this change in a integration or e2e test, in particular the retry logic if the offset is correctly reset and the failed message is consumed again in the next loop. Do you have any suggestion?

Copy link
Contributor Author

@yiquanzhou yiquanzhou Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @atoulme , could you please take another look at the changes that I added for the retry logic ? Also I'd appreciate some guidance on the testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoulme could you please check out the changes and my comments? I'd like to move this PR forward

Copy link
Contributor Author

@yiquanzhou yiquanzhou Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested the change by running the collector locally with the following config:

  kafka:
    encoding: text_utf-8 // the payload are decoded as text and inserted as the body of a log record
    error_backoff:
      enabled: true
      initial_interval: 5s
      max_interval: 1m
      multiplier: 1.5
      randomization_factor: 0
      max_elapsed_time: 1m
...
  memory_limiter:
    check_interval: 1s
    limit_mib: 30 // set a low memory limit to trigger the "data refused due to high memory usage" error 
    spike_limit_mib: 0

...
  pipelines:
    logs:
      receivers:
      - otlp
      - kafka
      processors:
      - memory_limiter
      - batch
      exporters:
      - debug

I also ran a kafka broker locally and use kafka-producer-perf-test tool to send messages to otlp_logs topic

$ ./kafka-producer-perf-test \
  --topic otlp_logs \
  --throughput 100000 \
  --num-records 1000000 \
  --record-size 1024 \
  --producer-props bootstrap.servers=localhost:9092

By adding some temporary logging, I can validate that when memory limit processor is returning data refused due to high memory usage error, the backoff is triggered and the same message will be retried after the backoff interval. Once the max_elapsed_time is exceeded, the backoff and retry is skipped and the receiver will continue consuming next messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added an entry to the change log.
@atoulme @MovieStoreGuy @pavolloffay Could any of you please give a final review of the PR?

Copy link
Contributor

github-actions bot commented Feb 5, 2025

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Feb 5, 2025
* main: (392 commits)
  fix(deps): update module golang.org/x/text to v0.22.0 (open-telemetry#37686)
  [exporter/bmchelix] Second PR of New component: BMC Helix Exporter (open-telemetry#37350)
  chore(deps): update otel/opentelemetry-collector-contrib docker tag to v0.119.0 (open-telemetry#37688)
  [chore] fix codeowners allowlist (open-telemetry#37684)
  chore(deps): update otel/opentelemetry-collector docker tag to v0.119.0 (open-telemetry#37687)
  Update jpkroehling's affiliation (open-telemetry#37683)
  fix(deps): update module github.com/clickhouse/clickhouse-go/v2 to v2.30.3 (open-telemetry#37655)
  fix(deps): update all opentelemetry collector contrib packages to v0.119.0 (open-telemetry#37666)
  fix(deps): update module github.com/elastic/go-docappender/v2 to v2.4.0 (open-telemetry#37667)
  fix(deps): update all golang.org/x packages (open-telemetry#37680)
  [exporter/prometheusremotewrite] Fix WAL deadlock (open-telemetry#37630)
  fix(deps): update opentelemetry-go monorepo (open-telemetry#37673)
  fix(deps): update module github.com/shirou/gopsutil/v4 to v4.25.1 (open-telemetry#37671)
  fix(deps): update module github.com/spf13/pflag to v1.0.6 (open-telemetry#37658)
  fix(deps): update all github.com/aws packages (open-telemetry#37661)
  [chore] Prepare release 0.119.0 (open-telemetry#37660)
  make update-otel OTEL_VERSION=v0.119.0 OTEL_STABLE_VERSION=v1.25.0 (open-telemetry#37656)
  add documentation and warning log for deprecating AccessTokenPassthrough (open-telemetry#37575)
  chore: add myself, echlebek, as a codeowner (open-telemetry#37650)
  [processor/transform] Add support for flat configuration style (open-telemetry#37444)
  ...
@yiquanzhou yiquanzhou force-pushed the kafka-receiver-backoff branch 2 times, most recently from e3eb720 to 7204f7a Compare February 5, 2025 10:23
@yiquanzhou yiquanzhou force-pushed the kafka-receiver-backoff branch from 7204f7a to 47a699c Compare February 5, 2025 10:28
@github-actions github-actions bot removed the Stale label Feb 6, 2025
@yiquanzhou yiquanzhou requested a review from atoulme February 10, 2025 08:12
* main: (76 commits)
  Update All OpenTelemetry Collector Contrib packages (open-telemetry#37839)
  [chore] fix codeowners allowlist (open-telemetry#37856)
  Update module github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp to v1.26.0 (open-telemetry#37841)
  Update module github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common to v1.0.1095 (open-telemetry#37825)
  Update module github.com/grpc-ecosystem/grpc-gateway/v2 to v2.26.1 (open-telemetry#37821)
  [chore][exporter/elasticsearch] Bump go-docappender to v2.5.0 (open-telemetry#37852)
  Update All github.com/aws packages (open-telemetry#37816)
  Update module google.golang.org/protobuf to v1.36.5 (open-telemetry#37827)
  Update module github.com/SAP/go-hdb to v1.12.12 (open-telemetry#37817)
  Update module github.com/huaweicloud/huaweicloud-sdk-go-v3 to v0.1.135 (open-telemetry#37822)
  Update module github.com/ClickHouse/clickhouse-go/v2 to v2.31.0 (open-telemetry#37835)
  [receiver/datadog] Implement support for span links (open-telemetry#37449)
  receiver/prometheusreceiver: allow cumulative resets when using the adjuster (open-telemetry#37718)
  Update README.md (open-telemetry#37826)
  [receiver/github] add workflow run event trace handling (open-telemetry#37578)
  Update All github.com/datadog packages to v0.62.2 (open-telemetry#37838)
  [chore] Update types used in extensiontest.NewNopSettingsWithType (open-telemetry#37844)
  [processor/redaction] introduce `allowed_values` parameter in processor config (open-telemetry#37638)
  [chore] Update otel version (open-telemetry#37808)
  [testbed] Include CPU and memory limits to benchmark results file (open-telemetry#36753)
  ...
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method return any form of error, and what is the purpose of the empty string here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this method does not return error.
https://github.com/IBM/sarama/blob/main/consumer_group.go#L788

The empty string is the metadata parameter. We are also passing an empty string as metadata when calling session.MarkMessage at
https://github.com/dash0hq/opentelemetry-collector-contrib/blob/47a699cb7f71d6bd1f1ea50e237596e4673e92a1/receiver/kafkareceiver/kafka_receiver.go#L609

* main: (111 commits)
  fix(azuremonitorreceiver): Azure Monitor receiver should not produce gaps in data points for PT1M time grains (open-telemetry#37342)
  fix(deps): update module sigs.k8s.io/controller-runtime to v0.20.2 (open-telemetry#37996)
  fix(deps): update module github.com/hashicorp/consul/api to v1.31.2 (open-telemetry#38031)
  [processor/resourcedetection] add instructions for recommended use of the dynatrace detector (open-telemetry#37962)
  fix(deps): update module github.com/go-sql-driver/mysql to v1.9.0 (open-telemetry#38007)
  fix(deps): update module google.golang.org/api to v0.221.0 (open-telemetry#38027)
  prometheusreceiver: deprecate start time adjustment (open-telemetry#37879)
  fix(deps): update module github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common to v1.0.1100 (open-telemetry#37995)
  chore(deps): update golang docker tag to v1.24 (open-telemetry#37997)
  fix(deps): update all github.com/aws packages (open-telemetry#37983)
  chore(deps): update prom/prometheus docker tag to v3.2.0 (open-telemetry#37998)
  fix(deps): update kubernetes packages to v0.32.2 (open-telemetry#38004)
  fix(deps): update module github.com/clickhouse/clickhouse-go/v2 to v2.32.1 (open-telemetry#38006)
  fix(deps): update module github.com/google/go-github/v69 to v69.2.0 (open-telemetry#38014)
  fix(deps): update module github.com/sap/go-hdb to v1.13.3 (open-telemetry#38021)
  fix(deps): update module go.etcd.io/bbolt to v1.4.0 (open-telemetry#38024)
  [exporter/stefexporter] Fix a context cancellation bug in STEF exporter (open-telemetry#37944)
  fix(deps): update module github.com/spf13/cobra to v1.9.1 (open-telemetry#38023)
  fix(deps): update module github.com/envoyproxy/go-control-plane/envoy to v1.32.4 (open-telemetry#37990)
  fix(deps): update module github.com/hashicorp/consul/api to v1.31.1 (open-telemetry#37991)
  ...
@yiquanzhou yiquanzhou force-pushed the kafka-receiver-backoff branch from 0b633b4 to 4019f7f Compare February 19, 2025 09:54
@MovieStoreGuy MovieStoreGuy added the ready to merge Code review completed; ready to merge by maintainers label Feb 24, 2025
* main: (55 commits)
  [chore] Update core dependencies (open-telemetry#38124)
  Add kafka topics observer implementation (open-telemetry#38060)
  [exporter/splunk_hec] Mute errors from draining the response body (open-telemetry#38118)
  [chore] [exporter/splunk_hec] Remove dead code (open-telemetry#38113)
  Add support for JUnit test results (open-telemetry#37941)
  [chore] amend changelog for prometheus receiver change (open-telemetry#38109)
  [chore] Fix dead links in issue-triaging.md (open-telemetry#38105)
  [chore] fix deprecation (open-telemetry#38107)
  [exporter/coralogix] Add new batch options to Coralogix exporter (open-telemetry#38082)
  [chore][exporter/datadog] fix integration test (open-telemetry#38091)
  [chore] Update otel to unblock contrib test in core repo (open-telemetry#38100)
  [chore] Bump go-version match to 1.23 (open-telemetry#38099)
  [exporter/elasticsearch] Add _metric_names_hash to avoid metric rejections (open-telemetry#37511)
  elasticsearchexporter: refactor encoding; drop metrics support from raw/none/bodymap mapping modes (open-telemetry#37928)
  [exporter/stefexporter] Fix incorrectly implemented STEF exporter zstd compression option (open-telemetry#38089)
  [exporter/clickhouse] Add client info for identifying exporter in `system.query_log` (open-telemetry#37146)
  [chore] Prepare release 0.120.1 (open-telemetry#38055)
  [extension/httpforwarder] Shutdown should wait server exit (open-telemetry#37735)
  receiver/prometheusremotewrite: Add two fields timestamp and value. (open-telemetry#37895)
  [reciver/sqlqueryreceiver] Add support for SapASE (sybase) (open-telemetry#37773)
  ...
@mx-psi mx-psi merged commit 0dc57b6 into open-telemetry:main Feb 24, 2025
162 checks passed
@github-actions github-actions bot added this to the next release milestone Feb 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready to merge Code review completed; ready to merge by maintainers receiver/kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants