Skip to content

Commit ffc31a9

Browse files
an-mmxatoulmeMovieStoreGuy
authored andcommitted
fix(kafkaexporter): Make Sarama's ConfigurationError as permanent to prevent retries (open-telemetry#38608)
#### Description This fix unifies message send error handling for all types of telemetry. It is designed to identify whether the error was caused by a ConfigurationError and then reclassify it as a permanent consumer error to prevent further retries. #### Link to tracking issue open-telemetry#38604 #### Testing Unit test coverage added #### Documentation No changes Co-authored-by: Antoine Toulme <[email protected]> Co-authored-by: Sean Marciniak <[email protected]>
1 parent eea9dba commit ffc31a9

File tree

3 files changed

+174
-7
lines changed

3 files changed

+174
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Make Sarama's ConfigurationError as permanent to prevent retries"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38604]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,7 @@ func (e *kafkaExporter[T]) exportData(ctx context.Context, data T) error {
117117
ctx, e.cfg.IncludeMetadataKeys,
118118
))
119119
if err := e.producer.SendMessages(allSaramaMessages); err != nil {
120-
var prodErr sarama.ProducerErrors
121-
if errors.As(err, &prodErr) {
122-
if len(prodErr) > 0 {
123-
return kafkaErrors{len(prodErr), prodErr[0].Err.Error()}
124-
}
125-
}
126-
return err
120+
return wrapKafkaProducerError(err)
127121
}
128122
return nil
129123
}
@@ -342,3 +336,24 @@ func metadataToHeaders(ctx context.Context, keys []string) []sarama.RecordHeader
342336
}
343337
return headers
344338
}
339+
340+
func wrapKafkaProducerError(err error) error {
341+
var prodErr sarama.ProducerErrors
342+
if !errors.As(err, &prodErr) || len(prodErr) == 0 {
343+
return err
344+
}
345+
346+
var areConfigErrs bool
347+
var confErr sarama.ConfigurationError
348+
for _, producerErr := range prodErr {
349+
if areConfigErrs = errors.As(producerErr.Err, &confErr); !areConfigErrs {
350+
break
351+
}
352+
}
353+
354+
if areConfigErrs {
355+
return consumererror.NewPermanent(confErr)
356+
}
357+
358+
return kafkaErrors{len(prodErr), prodErr[0].Err.Error()}
359+
}

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/client"
1717
"go.opentelemetry.io/collector/component"
1818
"go.opentelemetry.io/collector/component/componenttest"
19+
"go.opentelemetry.io/collector/consumer/consumererror"
1920
"go.opentelemetry.io/collector/exporter/exportertest"
2021
"go.opentelemetry.io/collector/pdata/pcommon"
2122
"go.opentelemetry.io/collector/pdata/plog"
@@ -123,6 +124,27 @@ func TestTracesPusher_err(t *testing.T) {
123124
assert.EqualError(t, err, expErr.Error())
124125
}
125126

127+
func TestTracesPusher_conf_err(t *testing.T) {
128+
t.Run("should return permanent err on config error", func(t *testing.T) {
129+
expErr := sarama.ConfigurationError("configuration error")
130+
prodErrs := sarama.ProducerErrors{
131+
&sarama.ProducerError{Err: expErr},
132+
}
133+
host := extensionsHost{
134+
component.MustNewID("trace_encoding"): ptraceMarshalerFuncExtension(func(ptrace.Traces) ([]byte, error) {
135+
return nil, prodErrs
136+
}),
137+
}
138+
config := createDefaultConfig().(*Config)
139+
config.Traces.Encoding = "trace_encoding"
140+
exp, _ := newMockTracesExporter(t, *config, host)
141+
142+
err := exp.exportData(context.Background(), testdata.GenerateTraces(2))
143+
144+
assert.True(t, consumererror.IsPermanent(err))
145+
})
146+
}
147+
126148
func TestTracesPusher_marshal_error(t *testing.T) {
127149
marshalErr := errors.New("failed to marshal")
128150
host := extensionsHost{
@@ -349,6 +371,27 @@ func TestMetricsPusher_err(t *testing.T) {
349371
assert.EqualError(t, err, expErr.Error())
350372
}
351373

374+
func TestMetricsPusher_conf_err(t *testing.T) {
375+
t.Run("should return permanent err on config error", func(t *testing.T) {
376+
expErr := sarama.ConfigurationError("configuration error")
377+
prodErrs := sarama.ProducerErrors{
378+
&sarama.ProducerError{Err: expErr},
379+
}
380+
host := extensionsHost{
381+
component.MustNewID("metric_encoding"): ptraceMarshalerFuncExtension(func(ptrace.Traces) ([]byte, error) {
382+
return nil, prodErrs
383+
}),
384+
}
385+
config := createDefaultConfig().(*Config)
386+
config.Traces.Encoding = "metric_encoding"
387+
exp, _ := newMockTracesExporter(t, *config, host)
388+
389+
err := exp.exportData(context.Background(), testdata.GenerateTraces(2))
390+
391+
assert.True(t, consumererror.IsPermanent(err))
392+
})
393+
}
394+
352395
func TestMetricsPusher_marshal_error(t *testing.T) {
353396
marshalErr := errors.New("failed to marshal")
354397
host := extensionsHost{
@@ -521,6 +564,27 @@ func TestLogsPusher_err(t *testing.T) {
521564
assert.EqualError(t, err, expErr.Error())
522565
}
523566

567+
func TestLogsPusher_conf_err(t *testing.T) {
568+
t.Run("should return permanent err on config error", func(t *testing.T) {
569+
expErr := sarama.ConfigurationError("configuration error")
570+
prodErrs := sarama.ProducerErrors{
571+
&sarama.ProducerError{Err: expErr},
572+
}
573+
host := extensionsHost{
574+
component.MustNewID("log_encoding"): ptraceMarshalerFuncExtension(func(ptrace.Traces) ([]byte, error) {
575+
return nil, prodErrs
576+
}),
577+
}
578+
config := createDefaultConfig().(*Config)
579+
config.Traces.Encoding = "log_encoding"
580+
exp, _ := newMockTracesExporter(t, *config, host)
581+
582+
err := exp.exportData(context.Background(), testdata.GenerateTraces(2))
583+
584+
assert.True(t, consumererror.IsPermanent(err))
585+
})
586+
}
587+
524588
func TestLogsPusher_marshal_error(t *testing.T) {
525589
marshalErr := errors.New("failed to marshal")
526590
host := extensionsHost{
@@ -808,3 +872,64 @@ func newMockLogsExporter(t *testing.T, cfg Config, host component.Host) (*kafkaE
808872
})
809873
return exp, producer
810874
}
875+
876+
func TestWrapKafkaProducerError(t *testing.T) {
877+
t.Run("should return permanent error on configuration error", func(t *testing.T) {
878+
err := sarama.ConfigurationError("configuration error")
879+
prodErrs := sarama.ProducerErrors{
880+
&sarama.ProducerError{Err: err},
881+
}
882+
883+
got := wrapKafkaProducerError(prodErrs)
884+
885+
assert.True(t, consumererror.IsPermanent(got))
886+
assert.Contains(t, got.Error(), err.Error())
887+
})
888+
889+
t.Run("should return permanent error whne multiple configuration error", func(t *testing.T) {
890+
err := sarama.ConfigurationError("configuration error")
891+
prodErrs := sarama.ProducerErrors{
892+
&sarama.ProducerError{Err: err},
893+
&sarama.ProducerError{Err: err},
894+
}
895+
896+
got := wrapKafkaProducerError(prodErrs)
897+
898+
assert.True(t, consumererror.IsPermanent(got))
899+
assert.Contains(t, got.Error(), err.Error())
900+
})
901+
902+
t.Run("should return not permanent error when at least one not configuration error", func(t *testing.T) {
903+
err := sarama.ConfigurationError("configuration error")
904+
prodErrs := sarama.ProducerErrors{
905+
&sarama.ProducerError{Err: err},
906+
&sarama.ProducerError{Err: errors.New("other producer error")},
907+
}
908+
909+
got := wrapKafkaProducerError(prodErrs)
910+
911+
assert.False(t, consumererror.IsPermanent(got))
912+
assert.Contains(t, got.Error(), err.Error())
913+
})
914+
915+
t.Run("should return not permanent error on other producer error", func(t *testing.T) {
916+
err := errors.New("other producer error")
917+
prodErrs := sarama.ProducerErrors{
918+
&sarama.ProducerError{Err: err},
919+
}
920+
921+
got := wrapKafkaProducerError(prodErrs)
922+
923+
assert.False(t, consumererror.IsPermanent(got))
924+
assert.Contains(t, got.Error(), err.Error())
925+
})
926+
927+
t.Run("should return not permanent error when other error", func(t *testing.T) {
928+
err := errors.New("other error")
929+
930+
got := wrapKafkaProducerError(err)
931+
932+
assert.False(t, consumererror.IsPermanent(got))
933+
assert.Contains(t, got.Error(), err.Error())
934+
})
935+
}

0 commit comments

Comments
 (0)