Skip to content

[receiver/kafka] backoff in case of next consumer error #37009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 24, 2025
Merged
27 changes: 27 additions & 0 deletions .chloggen/kafka-receiver-backoff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add error backoff configuration to kafka receiver which allows to wait and retry a failed message when the next consumer returns some errors.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37009]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 7 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ The following settings can be optionally configured:
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel pipeline
- `headers` (default = []): List of headers they'd like to extract from kafka record.
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
- `initial_interval`: The time to wait after the first error before retrying
- `max_interval`: The upper bound on backoff interval between consecutive retries
- `multiplier`: The value multiplied by the backoff interval bounds
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.

Example:

Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
Expand Down Expand Up @@ -85,6 +86,8 @@ type Config struct {
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"
Expand Down Expand Up @@ -66,6 +67,9 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
{
Expand Down Expand Up @@ -102,6 +106,13 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 1 * time.Minute,
Multiplier: 1.5,
},
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.0
require (
github.com/IBM/sarama v1.45.0
github.com/apache/thrift v0.21.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger-idl v0.5.0
github.com/json-iterator/go v1.1.12
Expand All @@ -18,6 +19,7 @@ require (
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.120.1-0.20250224010654-18e18b21da7a
go.opentelemetry.io/collector/component/componenttest v0.120.1-0.20250224010654-18e18b21da7a
go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a
go.opentelemetry.io/collector/config/configtls v1.26.1-0.20250224010654-18e18b21da7a
go.opentelemetry.io/collector/confmap v1.26.1-0.20250224010654-18e18b21da7a
go.opentelemetry.io/collector/confmap/xconfmap v0.120.1-0.20250224010654-18e18b21da7a
Expand Down Expand Up @@ -53,7 +55,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
Expand Down Expand Up @@ -96,7 +97,6 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.26.1-0.20250224010654-18e18b21da7a // indirect
go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.120.1-0.20250224010654-18e18b21da7a // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.120.1-0.20250224010654-18e18b21da7a // indirect
go.opentelemetry.io/collector/exporter v0.120.1-0.20250224010654-18e18b21da7a // indirect
Expand Down
83 changes: 83 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -35,6 +38,8 @@ const (

var errInvalidInitialOffset = errors.New("invalid initial offset")

var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
config Config
Expand Down Expand Up @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
metricsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
logsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type metricsConsumerGroupHandler struct {
Expand All @@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type logsConsumerGroupHandler struct {
Expand All @@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

var (
Expand Down Expand Up @@ -579,11 +590,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay != backoff.Stop {
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method return any form of error, and what is the purpose of the empty string here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this method does not return error.
https://github.com/IBM/sarama/blob/main/consumer_group.go#L788

The empty string is the metadata parameter. We are also passing an empty string as metadata when calling session.MarkMessage at
https://github.com/dash0hq/opentelemetry-collector-contrib/blob/47a699cb7f71d6bd1f1ea50e237596e4673e92a1/receiver/kafkareceiver/kafka_receiver.go#L609

}
return err
}
}
}
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand Down Expand Up @@ -656,11 +685,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
if err != nil {
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay != backoff.Stop {
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
return err
}
}
}
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand Down Expand Up @@ -732,11 +779,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
if err != nil {
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay != backoff.Stop {
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
return err
}
}
}
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand All @@ -753,6 +818,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
}
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = config.InitialInterval
backOff.RandomizationFactor = config.RandomizationFactor
backOff.Multiplier = config.Multiplier
backOff.MaxInterval = config.MaxInterval
backOff.MaxElapsedTime = config.MaxElapsedTime
backOff.Reset()
return backOff
}

func toSaramaInitialOffset(initialOffset string) (int64, error) {
switch initialOffset {
case offsetEarliest:
Expand Down Expand Up @@ -792,3 +871,7 @@ func encodingToComponentID(encoding string) (*component.ID, error) {
id := component.NewID(componentType)
return &id, nil
}

func errorRequiresBackoff(err error) bool {
return err.Error() == errMemoryLimiterDataRefused.Error()
}
Loading