Skip to content

[receiver/kafka] backoff in case of next consumer error #37009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 24, 2025
Merged
8 changes: 7 additions & 1 deletion receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ The following settings can be optionally configured:
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
- `headers` (default = []): List of headers they'd like to extract from kafka record.
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**

- `error_backoff`:
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
- `initial_interval`: The time to wait after the first error before consuming the next message
- `max_interval`: The upper bound on backoff interval between consecutive message consumption
- `multiplier`: The value multiplied by the backoff interval bounds
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
- `max_elapsed_time`: The maximum time trying to backoff before giving up. If set to 0, the backoff is never stopped.
Example:

```yaml
Expand Down
4 changes: 4 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
Expand Down Expand Up @@ -85,6 +86,9 @@ type Config struct {
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`

// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

Expand Down Expand Up @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
{
Expand Down Expand Up @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 1 * time.Minute,
Multiplier: 1.5,
},
},
},
}
Expand Down
71 changes: 71 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -35,6 +38,8 @@ const (

var errInvalidInitialOffset = errors.New("invalid initial offset")

var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
config Config
Expand Down Expand Up @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
metricsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
logsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type metricsConsumerGroupHandler struct {
Expand All @@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type logsConsumerGroupHandler struct {
Expand All @@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

var (
Expand Down Expand Up @@ -582,8 +593,22 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should messages be marked if you're going to retry them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if the next consumer returns an error, the kafka receiver simply drops the message and returns the error. At the beginning I intended to keep this behavior and only implement backoff delays without retrying the message. But then I went through the code again and I think we could implement the retry without introducing too much complexity.

I've updated the code and added some comments to explain. Let me know if that makes sense.

I've updated the unit test but I wonder how I could test this change in a integration or e2e test, in particular the retry logic if the offset is correctly reset and the failed message is consumed again in the next loop. Do you have any suggestion?

Copy link
Contributor Author

@yiquanzhou yiquanzhou Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @atoulme , could you please take another look at the changes that I added for the retry logic ? Also I'd appreciate some guidance on the testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoulme could you please check out the changes and my comments? I'd like to move this PR forward

Copy link
Contributor Author

@yiquanzhou yiquanzhou Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested the change by running the collector locally with the following config:

  kafka:
    encoding: text_utf-8 // the payload are decoded as text and inserted as the body of a log record
    error_backoff:
      enabled: true
      initial_interval: 5s
      max_interval: 1m
      multiplier: 1.5
      randomization_factor: 0
      max_elapsed_time: 1m
...
  memory_limiter:
    check_interval: 1s
    limit_mib: 30 // set a low memory limit to trigger the "data refused due to high memory usage" error 
    spike_limit_mib: 0

...
  pipelines:
    logs:
      receivers:
      - otlp
      - kafka
      processors:
      - memory_limiter
      - batch
      exporters:
      - debug

I also ran a kafka broker locally and use kafka-producer-perf-test tool to send messages to otlp_logs topic

$ ./kafka-producer-perf-test \
  --topic otlp_logs \
  --throughput 100000 \
  --num-records 1000000 \
  --record-size 1024 \
  --producer-props bootstrap.servers=localhost:9092

By adding some temporary logging, I can validate that when memory limit processor is returning data refused due to high memory usage error, the backoff is triggered and the same message will be retried after the backoff interval. Once the max_elapsed_time is exceeded, the backoff and retry is skipped and the receiver will continue consuming next messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added an entry to the change log.
@atoulme @MovieStoreGuy @pavolloffay Could any of you please give a final review of the PR?

}
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
return err
}
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
}
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand All @@ -600,6 +625,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}
}

func errorRequiresBackoff(err error) bool {
return errors.Is(err, errMemoryLimiterDataRefused)
}

func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
Expand Down Expand Up @@ -659,8 +688,22 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
return err
}
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
}
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand Down Expand Up @@ -735,8 +778,22 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
return err
}
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
}
}
return err
}
if c.backOff != nil {
c.backOff.Reset()
}
if c.messageMarking.After {
session.MarkMessage(message, "")
}
Expand All @@ -753,6 +810,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
}
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = config.InitialInterval
backOff.RandomizationFactor = config.RandomizationFactor
backOff.Multiplier = config.Multiplier
backOff.MaxInterval = config.MaxInterval
backOff.MaxElapsedTime = config.MaxElapsedTime
backOff.Reset()
return backOff
}

func toSaramaInitialOffset(initialOffset string) (int64, error) {
switch initialOffset {
case offsetEarliest:
Expand Down
Loading
Loading