Skip to content

Commit 9b68993

Browse files
an-mmxaxwMovieStoreGuy
authored
fix(kafkareceiver): enforce a backoff mechanism on exporterhelper.ErrQueueIsFull error (#39581)
#### Description In the current implementation, the BackOff mechanism is trigerred only by [memory limiter error](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/kafkareceiver/kafka_receiver.go#L562-L564). However, the same behavior is expected for the `exporterhelper.ErrQueueIsFull` error, which occurs when there is a sending queue overflow (both [memory queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/internal/queuebatch/memory_queue.go#L109-L112) and [persistent queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/internal/queuebatch/persistent_queue.go#L244-L247)). #### Link to tracking issue #39580 #### Testing Unit coverage added #### Documentation No documentation updated --------- Co-authored-by: Andrew Wilkins <[email protected]> Co-authored-by: Sean Marciniak <[email protected]>
1 parent f2392bd commit 9b68993

File tree

5 files changed

+49
-14
lines changed

5 files changed

+49
-14
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: enforce a backoff mechanism on non-permanent errors, such as when the queue is full
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39580]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ require (
2828
go.opentelemetry.io/collector/confmap v1.33.0
2929
go.opentelemetry.io/collector/confmap/xconfmap v0.127.0
3030
go.opentelemetry.io/collector/consumer v1.33.0
31+
go.opentelemetry.io/collector/consumer/consumererror v0.127.0
3132
go.opentelemetry.io/collector/consumer/consumertest v0.127.0
33+
go.opentelemetry.io/collector/exporter v0.127.0
3234
go.opentelemetry.io/collector/pdata v1.33.0
3335
go.opentelemetry.io/collector/pdata/testdata v0.127.0
3436
go.opentelemetry.io/collector/receiver v1.33.0
@@ -104,8 +106,9 @@ require (
104106
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
105107
go.opentelemetry.io/collector/config/configcompression v1.33.0 // indirect
106108
go.opentelemetry.io/collector/config/configopaque v1.33.0 // indirect
107-
go.opentelemetry.io/collector/consumer/consumererror v0.127.0 // indirect
108109
go.opentelemetry.io/collector/consumer/xconsumer v0.127.0 // indirect
110+
go.opentelemetry.io/collector/extension v1.33.0 // indirect
111+
go.opentelemetry.io/collector/extension/xextension v0.127.0 // indirect
109112
go.opentelemetry.io/collector/featuregate v1.33.0 // indirect
110113
go.opentelemetry.io/collector/internal/telemetry v0.127.0 // indirect
111114
go.opentelemetry.io/collector/pdata/pprofile v0.127.0 // indirect

receiver/kafkareceiver/go.sum

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.opentelemetry.io/collector/component"
1818
"go.opentelemetry.io/collector/config/configretry"
1919
"go.opentelemetry.io/collector/consumer"
20+
"go.opentelemetry.io/collector/consumer/consumererror"
2021
"go.opentelemetry.io/collector/pdata/pcommon"
2122
"go.opentelemetry.io/collector/pdata/plog"
2223
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -39,8 +40,6 @@ const (
3940
attrPartition = "partition"
4041
)
4142

42-
var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")
43-
4443
type consumeMessageFunc func(ctx context.Context, message *sarama.ConsumerMessage) error
4544

4645
// messageHandler provides a generic interface for handling messages for a pdata type.
@@ -506,7 +505,7 @@ func (c *consumerGroupHandler) handleMessage(
506505
c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs))
507506

508507
if err := c.consumeMessage(ctx, message); err != nil {
509-
if errorRequiresBackoff(err) && c.backOff != nil {
508+
if c.backOff != nil && !consumererror.IsPermanent(err) {
510509
backOffDelay := c.getNextBackoff()
511510
if backOffDelay != backoff.Stop {
512511
c.logger.Info("Backing off due to error from the next consumer.",
@@ -580,10 +579,6 @@ func newExponentialBackOff(config configretry.BackOffConfig) *backoff.Exponentia
580579
return backOff
581580
}
582581

583-
func errorRequiresBackoff(err error) bool {
584-
return err.Error() == errMemoryLimiterDataRefused.Error()
585-
}
586-
587582
func newContextWithHeaders(ctx context.Context,
588583
headers []*sarama.RecordHeader,
589584
) context.Context {

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import (
1919
"go.opentelemetry.io/collector/client"
2020
"go.opentelemetry.io/collector/component/componenttest"
2121
"go.opentelemetry.io/collector/consumer"
22+
"go.opentelemetry.io/collector/consumer/consumererror"
2223
"go.opentelemetry.io/collector/consumer/consumertest"
24+
"go.opentelemetry.io/collector/exporter/exporterhelper"
2325
"go.opentelemetry.io/collector/pdata/plog"
2426
"go.opentelemetry.io/collector/pdata/pmetric"
2527
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -189,15 +191,11 @@ func TestReceiver_ConsumeError(t *testing.T) {
189191
shouldRetry bool
190192
}{
191193
"retryable error": {
192-
// FIXME the receiver checks a specific error message
193-
// from a different component, which is a bit brittle.
194-
// Let's revisit this in the future; we might want to check
195-
// for permanent vs. transient errors instead.
196-
err: errMemoryLimiterDataRefused,
194+
err: exporterhelper.ErrQueueIsFull,
197195
shouldRetry: true,
198196
},
199197
"permanent error": {
200-
err: errors.New("failed to consume"),
198+
err: consumererror.NewPermanent(errors.New("failed to consume")),
201199
},
202200
} {
203201
t.Run(name, func(t *testing.T) {

0 commit comments

Comments
 (0)