Skip to content

Commit 704560b

Browse files
committed
handle backoff stop, add documentation
1 parent ca00ad8 commit 704560b

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

receiver/kafkareceiver/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,13 @@ The following settings can be optionally configured:
9797
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
9898
- `headers` (default = []): List of headers they'd like to extract from kafka record.
9999
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
100-
100+
- `error_backoff`:
101+
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
102+
- `initial_interval`: The time to wait after the first error before consuming the next message
103+
- `max_interval`: The upper bound on backoff interval between consecutive message consumption
104+
- `multiplier`: The value multiplied by the backoff interval bounds
105+
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
106+
- `max_elapsed_time`: The maximum time trying to backoff before giving up. If set to 0, the backoff is never stopped.
101107
Example:
102108

103109
```yaml

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,10 +594,14 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
594594
session.MarkMessage(message, "")
595595
}
596596
if errorRequiresBackoff(err) && c.backOff != nil {
597+
backOffDelay := c.backOff.NextBackOff()
598+
if backOffDelay == backoff.Stop {
599+
return err
600+
}
597601
select {
598602
case <-session.Context().Done():
599603
return nil
600-
case <-time.After(c.backOff.NextBackOff()):
604+
case <-time.After(backOffDelay):
601605
}
602606
}
603607
return err
@@ -685,10 +689,14 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
685689
session.MarkMessage(message, "")
686690
}
687691
if errorRequiresBackoff(err) && c.backOff != nil {
692+
backOffDelay := c.backOff.NextBackOff()
693+
if backOffDelay == backoff.Stop {
694+
return err
695+
}
688696
select {
689697
case <-session.Context().Done():
690698
return nil
691-
case <-time.After(c.backOff.NextBackOff()):
699+
case <-time.After(backOffDelay):
692700
}
693701
}
694702
return err
@@ -771,10 +779,14 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
771779
session.MarkMessage(message, "")
772780
}
773781
if errorRequiresBackoff(err) && c.backOff != nil {
782+
backOffDelay := c.backOff.NextBackOff()
783+
if backOffDelay == backoff.Stop {
784+
return err
785+
}
774786
select {
775787
case <-session.Context().Done():
776788
return nil
777-
case <-time.After(c.backOff.NextBackOff()):
789+
case <-time.After(backOffDelay):
778790
}
779791
}
780792
return err

0 commit comments

Comments
 (0)