Skip to content

Commit 0dc57b6

Browse files
authored
[receiver/kafka] backoff in case of next consumer error (#37009)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Currently if the next consumer returns an error, kafka receiver simply returns the error and will consume the next message without any backpressure. This behavior is not optimal in case of some errors. For example the memory limiter could return data refused error when the memory usage of the collector is too high. Keeping consuming and sending messages to the memory limiter could further increase the memory usage and cause OutOfMemory error in the collector. This PR provides an optional error backoff config which allows to wait before consuming the next message in case of errors that require backoff. <!--Describe what testing was performed and which tests were added.--> #### Testing - Added a test case to `TestXXXConsumerGroupHandler_error_nextConsumer` tests with an error that requires backoff. <!--Describe the documentation added.--> #### Documentation - Added the configuration for `error_backoff`
1 parent b98968d commit 0dc57b6

File tree

8 files changed

+318
-79
lines changed

8 files changed

+318
-79
lines changed

.chloggen/kafka-receiver-backoff.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add error backoff configuration to kafka receiver which allows to wait and retry a failed message when the next consumer returns some errors.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37009]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ The following settings can be optionally configured:
9797
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel pipeline
9898
- `headers` (default = []): List of headers they'd like to extract from kafka record.
9999
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
100+
- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors
101+
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
102+
- `initial_interval`: The time to wait after the first error before retrying
103+
- `max_interval`: The upper bound on backoff interval between consecutive retries
104+
- `multiplier`: The value multiplied by the backoff interval bounds
105+
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
106+
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
100107

101108
Example:
102109

receiver/kafkareceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/config/configretry"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
1213
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
@@ -85,6 +86,8 @@ type Config struct {
8586
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
8687
// The maximum bytes per fetch from Kafka (default "0", no limit)
8788
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
89+
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
90+
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
8891
}
8992

9093
const (

receiver/kafkareceiver/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configretry"
1415
"go.opentelemetry.io/collector/config/configtls"
1516
"go.opentelemetry.io/collector/confmap/confmaptest"
1617
"go.opentelemetry.io/collector/confmap/xconfmap"
@@ -66,6 +67,9 @@ func TestLoadConfig(t *testing.T) {
6667
MinFetchSize: 1,
6768
DefaultFetchSize: 1048576,
6869
MaxFetchSize: 0,
70+
ErrorBackOff: configretry.BackOffConfig{
71+
Enabled: false,
72+
},
6973
},
7074
},
7175
{
@@ -102,6 +106,13 @@ func TestLoadConfig(t *testing.T) {
102106
MinFetchSize: 1,
103107
DefaultFetchSize: 1048576,
104108
MaxFetchSize: 0,
109+
ErrorBackOff: configretry.BackOffConfig{
110+
Enabled: true,
111+
InitialInterval: 1 * time.Second,
112+
MaxInterval: 10 * time.Second,
113+
MaxElapsedTime: 1 * time.Minute,
114+
Multiplier: 1.5,
115+
},
105116
},
106117
},
107118
}

receiver/kafkareceiver/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23.0
55
require (
66
github.com/IBM/sarama v1.45.0
77
github.com/apache/thrift v0.21.0
8+
github.com/cenkalti/backoff/v4 v4.3.0
89
github.com/gogo/protobuf v1.3.2
910
github.com/jaegertracing/jaeger-idl v0.5.0
1011
github.com/json-iterator/go v1.1.12
@@ -18,6 +19,7 @@ require (
1819
github.com/stretchr/testify v1.10.0
1920
go.opentelemetry.io/collector/component v0.120.1-0.20250224010654-18e18b21da7a
2021
go.opentelemetry.io/collector/component/componenttest v0.120.1-0.20250224010654-18e18b21da7a
22+
go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a
2123
go.opentelemetry.io/collector/config/configtls v1.26.1-0.20250224010654-18e18b21da7a
2224
go.opentelemetry.io/collector/confmap v1.26.1-0.20250224010654-18e18b21da7a
2325
go.opentelemetry.io/collector/confmap/xconfmap v0.120.1-0.20250224010654-18e18b21da7a
@@ -53,7 +55,6 @@ require (
5355
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
5456
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
5557
github.com/aws/smithy-go v1.22.0 // indirect
56-
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
5758
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5859
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
5960
github.com/eapache/go-resiliency v1.7.0 // indirect
@@ -96,7 +97,6 @@ require (
9697
github.com/xdg-go/stringprep v1.0.4 // indirect
9798
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
9899
go.opentelemetry.io/collector/config/configopaque v1.26.1-0.20250224010654-18e18b21da7a // indirect
99-
go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a // indirect
100100
go.opentelemetry.io/collector/consumer/consumererror v0.120.1-0.20250224010654-18e18b21da7a // indirect
101101
go.opentelemetry.io/collector/consumer/xconsumer v0.120.1-0.20250224010654-18e18b21da7a // indirect
102102
go.opentelemetry.io/collector/exporter v0.120.1-0.20250224010654-18e18b21da7a // indirect

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ import (
99
"fmt"
1010
"strconv"
1111
"sync"
12+
"time"
1213

1314
"github.com/IBM/sarama"
15+
"github.com/cenkalti/backoff/v4"
1416
"go.opentelemetry.io/collector/component"
17+
"go.opentelemetry.io/collector/config/configretry"
1518
"go.opentelemetry.io/collector/consumer"
1619
"go.opentelemetry.io/collector/pdata/plog"
1720
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -35,6 +38,8 @@ const (
3538

3639
var errInvalidInitialOffset = errors.New("invalid initial offset")
3740

41+
var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")
42+
3843
// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
3944
type kafkaTracesConsumer struct {
4045
config Config
@@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
205210
messageMarking: c.messageMarking,
206211
headerExtractor: &nopHeaderExtractor{},
207212
telemetryBuilder: c.telemetryBuilder,
213+
backOff: newExponentialBackOff(c.config.ErrorBackOff),
208214
}
209215
if c.headerExtraction {
210216
consumerGroup.headerExtractor = &headerExtractor{
@@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
313319
messageMarking: c.messageMarking,
314320
headerExtractor: &nopHeaderExtractor{},
315321
telemetryBuilder: c.telemetryBuilder,
322+
backOff: newExponentialBackOff(c.config.ErrorBackOff),
316323
}
317324
if c.headerExtraction {
318325
metricsConsumerGroup.headerExtractor = &headerExtractor{
@@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
424431
messageMarking: c.messageMarking,
425432
headerExtractor: &nopHeaderExtractor{},
426433
telemetryBuilder: c.telemetryBuilder,
434+
backOff: newExponentialBackOff(c.config.ErrorBackOff),
427435
}
428436
if c.headerExtraction {
429437
logsConsumerGroup.headerExtractor = &headerExtractor{
@@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct {
481489
autocommitEnabled bool
482490
messageMarking MessageMarking
483491
headerExtractor HeaderExtractor
492+
backOff *backoff.ExponentialBackOff
484493
}
485494

486495
type metricsConsumerGroupHandler struct {
@@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct {
498507
autocommitEnabled bool
499508
messageMarking MessageMarking
500509
headerExtractor HeaderExtractor
510+
backOff *backoff.ExponentialBackOff
501511
}
502512

503513
type logsConsumerGroupHandler struct {
@@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct {
515525
autocommitEnabled bool
516526
messageMarking MessageMarking
517527
headerExtractor HeaderExtractor
528+
backOff *backoff.ExponentialBackOff
518529
}
519530

520531
var (
@@ -579,11 +590,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
579590
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
580591
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
581592
if err != nil {
593+
if errorRequiresBackoff(err) && c.backOff != nil {
594+
backOffDelay := c.backOff.NextBackOff()
595+
if backOffDelay != backoff.Stop {
596+
select {
597+
case <-session.Context().Done():
598+
return nil
599+
case <-time.After(backOffDelay):
600+
if !c.messageMarking.After {
601+
// Unmark the message so it can be retried
602+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
603+
}
604+
return err
605+
}
606+
}
607+
}
582608
if c.messageMarking.After && c.messageMarking.OnError {
583609
session.MarkMessage(message, "")
584610
}
585611
return err
586612
}
613+
if c.backOff != nil {
614+
c.backOff.Reset()
615+
}
587616
if c.messageMarking.After {
588617
session.MarkMessage(message, "")
589618
}
@@ -656,11 +685,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
656685
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
657686
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
658687
if err != nil {
688+
if errorRequiresBackoff(err) && c.backOff != nil {
689+
backOffDelay := c.backOff.NextBackOff()
690+
if backOffDelay != backoff.Stop {
691+
select {
692+
case <-session.Context().Done():
693+
return nil
694+
case <-time.After(backOffDelay):
695+
if !c.messageMarking.After {
696+
// Unmark the message so it can be retried
697+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
698+
}
699+
return err
700+
}
701+
}
702+
}
659703
if c.messageMarking.After && c.messageMarking.OnError {
660704
session.MarkMessage(message, "")
661705
}
662706
return err
663707
}
708+
if c.backOff != nil {
709+
c.backOff.Reset()
710+
}
664711
if c.messageMarking.After {
665712
session.MarkMessage(message, "")
666713
}
@@ -732,11 +779,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
732779
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
733780
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
734781
if err != nil {
782+
if errorRequiresBackoff(err) && c.backOff != nil {
783+
backOffDelay := c.backOff.NextBackOff()
784+
if backOffDelay != backoff.Stop {
785+
select {
786+
case <-session.Context().Done():
787+
return nil
788+
case <-time.After(backOffDelay):
789+
if !c.messageMarking.After {
790+
// Unmark the message so it can be retried
791+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
792+
}
793+
return err
794+
}
795+
}
796+
}
735797
if c.messageMarking.After && c.messageMarking.OnError {
736798
session.MarkMessage(message, "")
737799
}
738800
return err
739801
}
802+
if c.backOff != nil {
803+
c.backOff.Reset()
804+
}
740805
if c.messageMarking.After {
741806
session.MarkMessage(message, "")
742807
}
@@ -753,6 +818,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
753818
}
754819
}
755820

821+
func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
822+
if !config.Enabled {
823+
return nil
824+
}
825+
backOff := backoff.NewExponentialBackOff()
826+
backOff.InitialInterval = config.InitialInterval
827+
backOff.RandomizationFactor = config.RandomizationFactor
828+
backOff.Multiplier = config.Multiplier
829+
backOff.MaxInterval = config.MaxInterval
830+
backOff.MaxElapsedTime = config.MaxElapsedTime
831+
backOff.Reset()
832+
return backOff
833+
}
834+
756835
func toSaramaInitialOffset(initialOffset string) (int64, error) {
757836
switch initialOffset {
758837
case offsetEarliest:
@@ -792,3 +871,7 @@ func encodingToComponentID(encoding string) (*component.ID, error) {
792871
id := component.NewID(componentType)
793872
return &id, nil
794873
}
874+
875+
func errorRequiresBackoff(err error) bool {
876+
return err.Error() == errMemoryLimiterDataRefused.Error()
877+
}

0 commit comments

Comments
 (0)