diff --git a/.chloggen/kafka-receiver-backoff.yaml b/.chloggen/kafka-receiver-backoff.yaml new file mode 100644 index 0000000000000..1f3314948518d --- /dev/null +++ b/.chloggen/kafka-receiver-backoff.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add error backoff configuration to kafka receiver which allows to wait and retry a failed message when the next consumer returns some errors. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37009] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 98549bcabe5b3..90118540da314 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -97,6 +97,13 @@ The following settings can be optionally configured: - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel pipeline - `headers` (default = []): List of headers they'd like to extract from kafka record. **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** +- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors + - `enabled`: (default = false) Whether to enable backoff when next consumers return errors + - `initial_interval`: The time to wait after the first error before retrying + - `max_interval`: The upper bound on backoff interval between consecutive retries + - `multiplier`: The value multiplied by the backoff interval bounds + - `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor) + - `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped. Example: diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 5b25c7c7c5f30..50796c0656932 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -7,6 +7,7 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" @@ -85,6 +86,8 @@ type Config struct { DefaultFetchSize int32 `mapstructure:"default_fetch_size"` // The maximum bytes per fetch from Kafka (default "0", no limit) MaxFetchSize int32 `mapstructure:"max_fetch_size"` + // In case of some errors returned by the next consumer, the receiver will wait and retry the failed message + ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"` } const ( diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 063d165c35094..c00220c805782 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/confmap/xconfmap" @@ -66,6 +67,9 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, }, }, { @@ -102,6 +106,13 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 1 * time.Second, + MaxInterval: 10 * time.Second, + MaxElapsedTime: 1 * time.Minute, + Multiplier: 1.5, + }, }, }, } diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index d7680ed899d12..77aa4908eddb8 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/IBM/sarama v1.45.0 github.com/apache/thrift v0.21.0 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/gogo/protobuf v1.3.2 github.com/jaegertracing/jaeger-idl v0.5.0 github.com/json-iterator/go v1.1.12 @@ -18,6 +19,7 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.120.1-0.20250224010654-18e18b21da7a go.opentelemetry.io/collector/component/componenttest v0.120.1-0.20250224010654-18e18b21da7a + go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a go.opentelemetry.io/collector/config/configtls v1.26.1-0.20250224010654-18e18b21da7a go.opentelemetry.io/collector/confmap v1.26.1-0.20250224010654-18e18b21da7a go.opentelemetry.io/collector/confmap/xconfmap v0.120.1-0.20250224010654-18e18b21da7a @@ -53,7 +55,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect github.com/aws/smithy-go v1.22.0 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/eapache/go-resiliency v1.7.0 // indirect @@ -96,7 +97,6 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.26.1-0.20250224010654-18e18b21da7a // indirect - go.opentelemetry.io/collector/config/configretry v1.26.1-0.20250224010654-18e18b21da7a // indirect go.opentelemetry.io/collector/consumer/consumererror v0.120.1-0.20250224010654-18e18b21da7a // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.120.1-0.20250224010654-18e18b21da7a // indirect go.opentelemetry.io/collector/exporter v0.120.1-0.20250224010654-18e18b21da7a // indirect diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 1ec2d5aca6e9e..3bc09797bd610 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -9,9 +9,12 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -35,6 +38,8 @@ const ( var errInvalidInitialOffset = errors.New("invalid initial offset") +var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") + // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { config Config @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { metricsConsumerGroup.headerExtractor = &headerExtractor{ @@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { logsConsumerGroup.headerExtractor = &headerExtractor{ @@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } type metricsConsumerGroupHandler struct { @@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } type logsConsumerGroupHandler struct { @@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } var ( @@ -579,11 +590,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) if err != nil { + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay != backoff.Stop { + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + return err + } + } + } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } + if c.backOff != nil { + c.backOff.Reset() + } if c.messageMarking.After { session.MarkMessage(message, "") } @@ -656,11 +685,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err) if err != nil { + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay != backoff.Stop { + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + return err + } + } + } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } + if c.backOff != nil { + c.backOff.Reset() + } if c.messageMarking.After { session.MarkMessage(message, "") } @@ -732,11 +779,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess err = c.nextConsumer.ConsumeLogs(session.Context(), logs) c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err) if err != nil { + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay != backoff.Stop { + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + return err + } + } + } if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } return err } + if c.backOff != nil { + c.backOff.Reset() + } if c.messageMarking.After { session.MarkMessage(message, "") } @@ -753,6 +818,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } } +func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { + if !config.Enabled { + return nil + } + backOff := backoff.NewExponentialBackOff() + backOff.InitialInterval = config.InitialInterval + backOff.RandomizationFactor = config.RandomizationFactor + backOff.Multiplier = config.Multiplier + backOff.MaxInterval = config.MaxInterval + backOff.MaxElapsedTime = config.MaxElapsedTime + backOff.Reset() + return backOff +} + func toSaramaInitialOffset(initialOffset string) (int64, error) { switch initialOffset { case offsetEarliest: @@ -792,3 +871,7 @@ func encodingToComponentID(encoding string) (*component.ID, error) { id := component.NewID(componentType) return &id, nil } + +func errorRequiresBackoff(err error) bool { + return err.Error() == errMemoryLimiterDataRefused.Error() +} diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index d248e89470a93..0abf2a9fb4998 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -308,35 +309,69 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) require.NoError(t, err) - c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: errMemoryLimiterDataRefused, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty() - unmarshaler := &ptrace.ProtoMarshaler{} - bts, err := unmarshaler.MarshalTraces(td) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := tracesConsumerGroupHandler{ + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty() + unmarshaler := &ptrace.ProtoMarshaler{} + bts, err := unmarshaler.MarshalTraces(td) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } func TestTracesReceiver_encoding_extension(t *testing.T) { @@ -618,34 +653,68 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) require.NoError(t, err) - c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: errMemoryLimiterDataRefused, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - ld := testdata.GenerateMetrics(1) - unmarshaler := &pmetric.ProtoMarshaler{} - bts, err := unmarshaler.MarshalMetrics(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := metricsConsumerGroupHandler{ + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + ld := testdata.GenerateMetrics(1) + unmarshaler := &pmetric.ProtoMarshaler{} + bts, err := unmarshaler.MarshalMetrics(ld) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } func TestMetricsReceiver_encoding_extension(t *testing.T) { @@ -945,34 +1014,68 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) require.NoError(t, err) - c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: errMemoryLimiterDataRefused, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - ld := testdata.GenerateLogs(1) - unmarshaler := &plog.ProtoMarshaler{} - bts, err := unmarshaler.MarshalLogs(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := logsConsumerGroupHandler{ + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + ld := testdata.GenerateLogs(1) + unmarshaler := &plog.ProtoMarshaler{} + bts, err := unmarshaler.MarshalLogs(ld) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } // Test unmarshaler for different charsets and encodings. @@ -1205,7 +1308,6 @@ func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) { } func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) { - panic("implement me") } func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {} diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index a0a744764602b..835b501931985 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -35,3 +35,9 @@ kafka/logs: retry: max: 10 backoff: 5s + error_backoff: + enabled: true + initial_interval: 1s + max_interval: 10s + max_elapsed_time: 1m + multiplier: 1.5 \ No newline at end of file