Skip to content

Commit 7204f7a

Browse files
committed
retry error again
1 parent 4ade9fa commit 7204f7a

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,11 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
600600
case <-session.Context().Done():
601601
return nil
602602
case <-time.After(backOffDelay):
603+
if !c.messageMarking.After {
604+
// Unmark the message so it can be retried
605+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
606+
}
607+
return err
603608
}
604609
}
605610
}
@@ -628,7 +633,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
628633
}
629634

630635
func errorRequiresBackoff(err error) bool {
631-
return errors.Is(err, errMemoryLimiterDataRefused)
636+
return err.Error() == errMemoryLimiterDataRefused.Error()
632637
}
633638

634639
func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
@@ -649,9 +654,6 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
649654
if !c.autocommitEnabled {
650655
defer session.Commit()
651656
}
652-
if c.backOff != nil {
653-
c.backOff.Reset()
654-
}
655657
for {
656658
select {
657659
case message, ok := <-claim.Messages():
@@ -697,6 +699,11 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
697699
case <-session.Context().Done():
698700
return nil
699701
case <-time.After(backOffDelay):
702+
if !c.messageMarking.After {
703+
// Unmark the message so it can be retried
704+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
705+
}
706+
return err
700707
}
701708
}
702709
}
@@ -786,6 +793,11 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
786793
case <-session.Context().Done():
787794
return nil
788795
case <-time.After(backOffDelay):
796+
if !c.messageMarking.After {
797+
// Unmark the message so it can be retried
798+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
799+
}
800+
return err
789801
}
790802
}
791803
}

0 commit comments

Comments
 (0)