Skip to content

Commit 241c3b4

Browse files
committed
revert retry on error
1 parent 6009470 commit 241c3b4

File tree

3 files changed

+41
-62
lines changed

3 files changed

+41
-62
lines changed

receiver/kafkareceiver/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22.0
55
require (
66
github.com/IBM/sarama v1.43.3
77
github.com/apache/thrift v0.21.0
8+
github.com/cenkalti/backoff/v4 v4.3.0
89
github.com/gogo/protobuf v1.3.2
910
github.com/jaegertracing/jaeger v1.62.0
1011
github.com/json-iterator/go v1.1.12
@@ -18,6 +19,7 @@ require (
1819
github.com/stretchr/testify v1.10.0
1920
go.opentelemetry.io/collector/component v0.116.1-0.20241220212031-7c2639723f67
2021
go.opentelemetry.io/collector/component/componenttest v0.116.1-0.20241220212031-7c2639723f67
22+
go.opentelemetry.io/collector/config/configretry v1.22.1-0.20241220212031-7c2639723f67
2123
go.opentelemetry.io/collector/config/configtelemetry v0.116.1-0.20241220212031-7c2639723f67
2224
go.opentelemetry.io/collector/config/configtls v1.22.1-0.20241220212031-7c2639723f67
2325
go.opentelemetry.io/collector/confmap v1.22.1-0.20241220212031-7c2639723f67
@@ -51,7 +53,6 @@ require (
5153
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
5254
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
5355
github.com/aws/smithy-go v1.13.5 // indirect
54-
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
5556
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5657
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
5758
github.com/eapache/go-resiliency v1.7.0 // indirect
@@ -92,7 +93,6 @@ require (
9293
github.com/xdg-go/scram v1.1.2 // indirect
9394
github.com/xdg-go/stringprep v1.0.4 // indirect
9495
go.opentelemetry.io/collector/config/configopaque v1.22.1-0.20241220212031-7c2639723f67 // indirect
95-
go.opentelemetry.io/collector/config/configretry v1.22.1-0.20241220212031-7c2639723f67 // indirect
9696
go.opentelemetry.io/collector/consumer/consumererror v0.116.1-0.20241220212031-7c2639723f67 // indirect
9797
go.opentelemetry.io/collector/consumer/xconsumer v0.116.1-0.20241220212031-7c2639723f67 // indirect
9898
go.opentelemetry.io/collector/exporter v0.116.1-0.20241220212031-7c2639723f67 // indirect

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -595,27 +595,21 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
595595
if err != nil {
596596
if errorRequiresBackoff(err) && c.backOff != nil {
597597
backOffDelay := c.backOff.NextBackOff()
598-
if backOffDelay == backoff.Stop {
599-
if c.messageMarking.After && c.messageMarking.OnError {
600-
session.MarkMessage(message, "")
598+
if backOffDelay != backoff.Stop {
599+
select {
600+
case <-session.Context().Done():
601+
return nil
602+
case <-time.After(backOffDelay):
601603
}
602-
return err
603604
}
604-
select {
605-
case <-session.Context().Done():
606-
return nil
607-
case <-time.After(backOffDelay):
608-
if !c.messageMarking.After {
609-
// Unmark the message so it can be retried
610-
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
611-
}
612-
}
613-
} else {
614-
if c.messageMarking.After && c.messageMarking.OnError {
615-
session.MarkMessage(message, "")
616-
}
617-
return err
618605
}
606+
if c.messageMarking.After && c.messageMarking.OnError {
607+
session.MarkMessage(message, "")
608+
}
609+
return err
610+
}
611+
if c.backOff != nil {
612+
c.backOff.Reset()
619613
}
620614
if c.messageMarking.After {
621615
session.MarkMessage(message, "")
@@ -698,27 +692,21 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
698692
if err != nil {
699693
if errorRequiresBackoff(err) && c.backOff != nil {
700694
backOffDelay := c.backOff.NextBackOff()
701-
if backOffDelay == backoff.Stop {
702-
if c.messageMarking.After && c.messageMarking.OnError {
703-
session.MarkMessage(message, "")
704-
}
705-
return err
706-
}
707-
select {
708-
case <-session.Context().Done():
709-
return nil
710-
case <-time.After(backOffDelay):
711-
if !c.messageMarking.After {
712-
// Unmark the message so it can be retried
713-
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
695+
if backOffDelay != backoff.Stop {
696+
select {
697+
case <-session.Context().Done():
698+
return nil
699+
case <-time.After(backOffDelay):
714700
}
715701
}
716-
} else {
717-
if c.messageMarking.After && c.messageMarking.OnError {
718-
session.MarkMessage(message, "")
719-
}
720-
return err
721702
}
703+
if c.messageMarking.After && c.messageMarking.OnError {
704+
session.MarkMessage(message, "")
705+
}
706+
return err
707+
}
708+
if c.backOff != nil {
709+
c.backOff.Reset()
722710
}
723711
if c.messageMarking.After {
724712
session.MarkMessage(message, "")
@@ -754,9 +742,6 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
754742
if !c.autocommitEnabled {
755743
defer session.Commit()
756744
}
757-
if c.backOff != nil {
758-
c.backOff.Reset()
759-
}
760745
for {
761746
select {
762747
case message, ok := <-claim.Messages():
@@ -796,27 +781,21 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
796781
if err != nil {
797782
if errorRequiresBackoff(err) && c.backOff != nil {
798783
backOffDelay := c.backOff.NextBackOff()
799-
if backOffDelay == backoff.Stop {
800-
if c.messageMarking.After && c.messageMarking.OnError {
801-
session.MarkMessage(message, "")
802-
}
803-
return err
804-
}
805-
select {
806-
case <-session.Context().Done():
807-
return nil
808-
case <-time.After(backOffDelay):
809-
if !c.messageMarking.After {
810-
// Unmark the message so it can be retried
811-
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
784+
if backOffDelay != backoff.Stop {
785+
select {
786+
case <-session.Context().Done():
787+
return nil
788+
case <-time.After(backOffDelay):
812789
}
813790
}
814-
} else {
815-
if c.messageMarking.After && c.messageMarking.OnError {
816-
session.MarkMessage(message, "")
817-
}
818-
return err
819791
}
792+
if c.messageMarking.After && c.messageMarking.OnError {
793+
session.MarkMessage(message, "")
794+
}
795+
return err
796+
}
797+
if c.backOff != nil {
798+
c.backOff.Reset()
820799
}
821800
if c.messageMarking.After {
822801
session.MarkMessage(message, "")

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
350350
{
351351
name: "memory limiter data refused error",
352352
err: errMemoryLimiterDataRefused,
353-
expectedError: nil,
353+
expectedError: errMemoryLimiterDataRefused,
354354
expectedBackoff: backoff.DefaultInitialInterval,
355355
},
356356
{
@@ -728,7 +728,7 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
728728
{
729729
name: "memory limiter data refused error",
730730
err: errMemoryLimiterDataRefused,
731-
expectedError: nil,
731+
expectedError: errMemoryLimiterDataRefused,
732732
expectedBackoff: backoff.DefaultInitialInterval,
733733
},
734734
{
@@ -1121,7 +1121,7 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
11211121
{
11221122
name: "memory limiter data refused error",
11231123
err: errMemoryLimiterDataRefused,
1124-
expectedError: nil,
1124+
expectedError: errMemoryLimiterDataRefused,
11251125
expectedBackoff: backoff.DefaultInitialInterval,
11261126
},
11271127
{

0 commit comments

Comments
 (0)