Skip to content

Commit 6009470

Browse files
committed
retry the failed message
1 parent 704560b commit 6009470

File tree

4 files changed

+80
-39
lines changed

4 files changed

+80
-39
lines changed

receiver/kafkareceiver/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,14 @@ The following settings can be optionally configured:
9797
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
9898
- `headers` (default = []): List of headers they'd like to extract from kafka record.
9999
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
100-
- `error_backoff`:
100+
- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors
101101
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
102-
- `initial_interval`: The time to wait after the first error before consuming the next message
103-
- `max_interval`: The upper bound on backoff interval between consecutive message consumption
102+
- `initial_interval`: The time to wait after the first error before retrying
103+
- `max_interval`: The upper bound on backoff interval between consecutive retries
104104
- `multiplier`: The value multiplied by the backoff interval bounds
105105
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
106-
- `max_elapsed_time`: The maximum time trying to backoff before giving up. If set to 0, the backoff is never stopped.
106+
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
107+
107108
Example:
108109

109110
```yaml

receiver/kafkareceiver/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ type Config struct {
8686
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
8787
// The maximum bytes per fetch from Kafka (default "0", no limit)
8888
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
89-
9089
// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
9190
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
9291
}

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
552552
if !c.autocommitEnabled {
553553
defer session.Commit()
554554
}
555+
if c.backOff != nil {
556+
c.backOff.Reset()
557+
}
555558
for {
556559
select {
557560
case message, ok := <-claim.Messages():
@@ -590,24 +593,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
590593
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
591594
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
592595
if err != nil {
593-
if c.messageMarking.After && c.messageMarking.OnError {
594-
session.MarkMessage(message, "")
595-
}
596596
if errorRequiresBackoff(err) && c.backOff != nil {
597597
backOffDelay := c.backOff.NextBackOff()
598598
if backOffDelay == backoff.Stop {
599+
if c.messageMarking.After && c.messageMarking.OnError {
600+
session.MarkMessage(message, "")
601+
}
599602
return err
600603
}
601604
select {
602605
case <-session.Context().Done():
603606
return nil
604607
case <-time.After(backOffDelay):
608+
if !c.messageMarking.After {
609+
// Unmark the message so it can be retried
610+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
611+
}
612+
}
613+
} else {
614+
if c.messageMarking.After && c.messageMarking.OnError {
615+
session.MarkMessage(message, "")
605616
}
617+
return err
606618
}
607-
return err
608-
}
609-
if c.backOff != nil {
610-
c.backOff.Reset()
611619
}
612620
if c.messageMarking.After {
613621
session.MarkMessage(message, "")
@@ -647,6 +655,9 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
647655
if !c.autocommitEnabled {
648656
defer session.Commit()
649657
}
658+
if c.backOff != nil {
659+
c.backOff.Reset()
660+
}
650661
for {
651662
select {
652663
case message, ok := <-claim.Messages():
@@ -685,24 +696,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
685696
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
686697
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
687698
if err != nil {
688-
if c.messageMarking.After && c.messageMarking.OnError {
689-
session.MarkMessage(message, "")
690-
}
691699
if errorRequiresBackoff(err) && c.backOff != nil {
692700
backOffDelay := c.backOff.NextBackOff()
693701
if backOffDelay == backoff.Stop {
702+
if c.messageMarking.After && c.messageMarking.OnError {
703+
session.MarkMessage(message, "")
704+
}
694705
return err
695706
}
696707
select {
697708
case <-session.Context().Done():
698709
return nil
699710
case <-time.After(backOffDelay):
711+
if !c.messageMarking.After {
712+
// Unmark the message so it can be retried
713+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
714+
}
700715
}
716+
} else {
717+
if c.messageMarking.After && c.messageMarking.OnError {
718+
session.MarkMessage(message, "")
719+
}
720+
return err
701721
}
702-
return err
703-
}
704-
if c.backOff != nil {
705-
c.backOff.Reset()
706722
}
707723
if c.messageMarking.After {
708724
session.MarkMessage(message, "")
@@ -738,6 +754,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
738754
if !c.autocommitEnabled {
739755
defer session.Commit()
740756
}
757+
if c.backOff != nil {
758+
c.backOff.Reset()
759+
}
741760
for {
742761
select {
743762
case message, ok := <-claim.Messages():
@@ -775,24 +794,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
775794
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
776795
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
777796
if err != nil {
778-
if c.messageMarking.After && c.messageMarking.OnError {
779-
session.MarkMessage(message, "")
780-
}
781797
if errorRequiresBackoff(err) && c.backOff != nil {
782798
backOffDelay := c.backOff.NextBackOff()
783799
if backOffDelay == backoff.Stop {
800+
if c.messageMarking.After && c.messageMarking.OnError {
801+
session.MarkMessage(message, "")
802+
}
784803
return err
785804
}
786805
select {
787806
case <-session.Context().Done():
788807
return nil
789808
case <-time.After(backOffDelay):
809+
if !c.messageMarking.After {
810+
// Unmark the message so it can be retried
811+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
812+
}
813+
}
814+
} else {
815+
if c.messageMarking.After && c.messageMarking.OnError {
816+
session.MarkMessage(message, "")
790817
}
818+
return err
791819
}
792-
return err
793-
}
794-
if c.backOff != nil {
795-
c.backOff.Reset()
796820
}
797821
if c.messageMarking.After {
798822
session.MarkMessage(message, "")

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -343,18 +343,20 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
343343
require.NoError(t, err)
344344

345345
tests := []struct {
346-
name string
347-
err error
348-
expectedBackoff time.Duration
346+
name string
347+
err, expectedError error
348+
expectedBackoff time.Duration
349349
}{
350350
{
351351
name: "memory limiter data refused error",
352352
err: errMemoryLimiterDataRefused,
353+
expectedError: nil,
353354
expectedBackoff: backoff.DefaultInitialInterval,
354355
},
355356
{
356357
name: "consumer error that does not require backoff",
357358
err: consumerError,
359+
expectedError: consumerError,
358360
expectedBackoff: 0,
359361
},
360362
}
@@ -383,7 +385,11 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
383385
start := time.Now()
384386
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
385387
end := time.Now()
386-
assert.EqualError(t, e, tt.err.Error())
388+
if tt.expectedError != nil {
389+
assert.EqualError(t, e, tt.expectedError.Error())
390+
} else {
391+
assert.NoError(t, e)
392+
}
387393
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
388394
wg.Done()
389395
}()
@@ -715,18 +721,20 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
715721
require.NoError(t, err)
716722

717723
tests := []struct {
718-
name string
719-
err error
720-
expectedBackoff time.Duration
724+
name string
725+
err, expectedError error
726+
expectedBackoff time.Duration
721727
}{
722728
{
723729
name: "memory limiter data refused error",
724730
err: errMemoryLimiterDataRefused,
731+
expectedError: nil,
725732
expectedBackoff: backoff.DefaultInitialInterval,
726733
},
727734
{
728735
name: "consumer error that does not require backoff",
729736
err: consumerError,
737+
expectedError: consumerError,
730738
expectedBackoff: 0,
731739
},
732740
}
@@ -755,7 +763,11 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
755763
start := time.Now()
756764
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
757765
end := time.Now()
758-
assert.EqualError(t, e, tt.err.Error())
766+
if tt.expectedError != nil {
767+
assert.EqualError(t, e, tt.expectedError.Error())
768+
} else {
769+
assert.NoError(t, e)
770+
}
759771
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
760772
wg.Done()
761773
}()
@@ -1102,18 +1114,20 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
11021114
require.NoError(t, err)
11031115

11041116
tests := []struct {
1105-
name string
1106-
err error
1107-
expectedBackoff time.Duration
1117+
name string
1118+
err, expectedError error
1119+
expectedBackoff time.Duration
11081120
}{
11091121
{
11101122
name: "memory limiter data refused error",
11111123
err: errMemoryLimiterDataRefused,
1124+
expectedError: nil,
11121125
expectedBackoff: backoff.DefaultInitialInterval,
11131126
},
11141127
{
11151128
name: "consumer error that does not require backoff",
11161129
err: consumerError,
1130+
expectedError: consumerError,
11171131
expectedBackoff: 0,
11181132
},
11191133
}
@@ -1142,7 +1156,11 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
11421156
start := time.Now()
11431157
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
11441158
end := time.Now()
1145-
assert.EqualError(t, e, tt.err.Error())
1159+
if tt.expectedError != nil {
1160+
assert.EqualError(t, e, tt.expectedError.Error())
1161+
} else {
1162+
assert.NoError(t, e)
1163+
}
11461164
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
11471165
wg.Done()
11481166
}()
@@ -1390,7 +1408,6 @@ func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) {
13901408
}
13911409

13921410
func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) {
1393-
panic("implement me")
13941411
}
13951412

13961413
func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {}

0 commit comments

Comments
 (0)