Skip to content

Commit 33b352a

Browse files
authored
kafkaexporter: Propagate metadata keys as headers (#39132)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds a new config option specifying a list of metadata keys that should be propagated as Kafka message headers. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #39130 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tests added <!--Describe the documentation added.--> #### Documentation Updated the exporter's README. --------- Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent c5cc1c0 commit 33b352a

File tree

7 files changed

+226
-15
lines changed

7 files changed

+226
-15
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Propagate metadata keys as headers
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39130]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Adds a new config option specifying a list of metadata keys that should be propagated as Kafka message headers.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The following settings can be optionally configured:
2626
- `client_id` (default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.
2727
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
2828
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
29+
- `include_metadata_keys` (default = []): Specifies a list of metadata keys to propagate as Kafka message headers. If one or more keys aren't found in the metadata, they are ignored.
2930
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
3031
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
3132
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.

exporter/kafkaexporter/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Config struct {
2424
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
2525
Topic string `mapstructure:"topic"`
2626

27+
// IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers.
28+
IncludeMetadataKeys []string `mapstructure:"include_metadata_keys"`
29+
2730
// TopicFromAttribute is the name of the attribute to use as the topic name.
2831
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
2932

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.123.0
1616
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
1717
github.com/stretchr/testify v1.10.0
18+
go.opentelemetry.io/collector/client v1.29.1-0.20250402200755-cb5c3f4fb9dc
1819
go.opentelemetry.io/collector/component v1.29.1-0.20250402200755-cb5c3f4fb9dc
1920
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250402200755-cb5c3f4fb9dc
2021
go.opentelemetry.io/collector/config/configretry v1.29.1-0.20250402200755-cb5c3f4fb9dc

exporter/kafkaexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"iter"
1111

1212
"github.com/IBM/sarama"
13+
"go.opentelemetry.io/collector/client"
1314
"go.opentelemetry.io/collector/component"
1415
"go.opentelemetry.io/collector/consumer/consumererror"
1516
"go.opentelemetry.io/collector/exporter"
@@ -112,6 +113,9 @@ func (e *kafkaExporter[T]) exportData(ctx context.Context, data T) error {
112113
saramaMessages := makeSaramaMessages(partitionMessages, e.messager.getTopic(ctx, data))
113114
allSaramaMessages = append(allSaramaMessages, saramaMessages...)
114115
}
116+
messagesWithHeaders(allSaramaMessages, metadataToHeaders(
117+
ctx, e.cfg.IncludeMetadataKeys,
118+
))
115119
if err := e.producer.SendMessages(allSaramaMessages); err != nil {
116120
var prodErr sarama.ProducerErrors
117121
if errors.As(err, &prodErr) {
@@ -297,3 +301,34 @@ func makeSaramaMessages(messages []marshaler.Message, topic string) []*sarama.Pr
297301
}
298302
return saramaMessages
299303
}
304+
305+
func messagesWithHeaders(msg []*sarama.ProducerMessage, h []sarama.RecordHeader) {
306+
if len(h) == 0 || len(msg) == 0 {
307+
return
308+
}
309+
for i := range msg {
310+
if len(msg[i].Headers) == 0 {
311+
msg[i].Headers = h
312+
continue
313+
}
314+
msg[i].Headers = append(msg[i].Headers, h...)
315+
}
316+
}
317+
318+
func metadataToHeaders(ctx context.Context, keys []string) []sarama.RecordHeader {
319+
if len(keys) == 0 {
320+
return nil
321+
}
322+
info := client.FromContext(ctx)
323+
headers := make([]sarama.RecordHeader, 0, len(keys))
324+
for _, key := range keys {
325+
valueSlice := info.Metadata.Get(key)
326+
for _, v := range valueSlice {
327+
headers = append(headers, sarama.RecordHeader{
328+
Key: []byte(key),
329+
Value: []byte(v),
330+
})
331+
}
332+
}
333+
return headers
334+
}

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 157 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/IBM/sarama/mocks"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16+
"go.opentelemetry.io/collector/client"
1617
"go.opentelemetry.io/collector/component"
1718
"go.opentelemetry.io/collector/component/componenttest"
1819
"go.opentelemetry.io/collector/exporter/exportertest"
@@ -49,12 +50,59 @@ func TestTracesPusher_attr(t *testing.T) {
4950
}
5051

5152
func TestTracesPusher_ctx(t *testing.T) {
52-
config := createDefaultConfig().(*Config)
53-
exp, producer := newMockTracesExporter(t, *config, componenttest.NewNopHost())
54-
producer.ExpectSendMessageAndSucceed()
53+
t.Run("WithTopic", func(t *testing.T) {
54+
config := createDefaultConfig().(*Config)
55+
exp, producer := newMockTracesExporter(t, *config, componenttest.NewNopHost())
56+
producer.ExpectSendMessageAndSucceed()
5557

56-
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
57-
require.NoError(t, err)
58+
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
59+
require.NoError(t, err)
60+
})
61+
t.Run("WithMetadata", func(t *testing.T) {
62+
config := createDefaultConfig().(*Config)
63+
config.IncludeMetadataKeys = []string{"x-tenant-id", "x-request-ids"}
64+
exp, producer := newMockTracesExporter(t, *config, componenttest.NewNopHost())
65+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
66+
assert.Equal(t, []sarama.RecordHeader{
67+
{Key: []byte("x-tenant-id"), Value: []byte("my_tenant_id")},
68+
{Key: []byte("x-request-ids"), Value: []byte("987654321")},
69+
{Key: []byte("x-request-ids"), Value: []byte("0187262")},
70+
}, pm.Headers)
71+
return nil
72+
})
73+
t.Cleanup(func() {
74+
require.NoError(t, exp.Close(context.Background()))
75+
})
76+
ctx := client.NewContext(context.Background(), client.Info{
77+
Metadata: client.NewMetadata(map[string][]string{
78+
"x-tenant-id": {"my_tenant_id"},
79+
"x-request-ids": {"987654321", "0187262"},
80+
"discarded-meta": {"my-meta"}, // This will be ignored.
81+
}),
82+
})
83+
err := exp.exportData(ctx, testdata.GenerateTraces(10))
84+
require.NoError(t, err)
85+
})
86+
t.Run("WithMetadataDisabled", func(t *testing.T) {
87+
config := createDefaultConfig().(*Config)
88+
exp, producer := newMockTracesExporter(t, *config, componenttest.NewNopHost())
89+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
90+
assert.Nil(t, pm.Headers)
91+
return nil
92+
})
93+
t.Cleanup(func() {
94+
require.NoError(t, exp.Close(context.Background()))
95+
})
96+
ctx := client.NewContext(context.Background(), client.Info{
97+
Metadata: client.NewMetadata(map[string][]string{
98+
"x-tenant-id": {"my_tenant_id"},
99+
"x-request-ids": {"123456789", "0187262"},
100+
"discarded-meta": {"my-meta"},
101+
}),
102+
})
103+
err := exp.exportData(ctx, testdata.GenerateTraces(5))
104+
require.NoError(t, err)
105+
})
58106
}
59107

60108
func TestTracesPusher_err(t *testing.T) {
@@ -214,12 +262,59 @@ func TestMetricsDataPusher_attr(t *testing.T) {
214262
}
215263

216264
func TestMetricsDataPusher_ctx(t *testing.T) {
217-
config := createDefaultConfig().(*Config)
218-
exp, producer := newMockMetricsExporter(t, *config, componenttest.NewNopHost())
219-
producer.ExpectSendMessageAndSucceed()
265+
t.Run("WithTopic", func(t *testing.T) {
266+
config := createDefaultConfig().(*Config)
267+
exp, producer := newMockMetricsExporter(t, *config, componenttest.NewNopHost())
268+
producer.ExpectSendMessageAndSucceed()
220269

221-
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
222-
require.NoError(t, err)
270+
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
271+
require.NoError(t, err)
272+
})
273+
t.Run("WithMetadata", func(t *testing.T) {
274+
config := createDefaultConfig().(*Config)
275+
config.IncludeMetadataKeys = []string{"x-tenant-id", "x-request-ids"}
276+
exp, producer := newMockMetricsExporter(t, *config, componenttest.NewNopHost())
277+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
278+
assert.Equal(t, []sarama.RecordHeader{
279+
{Key: []byte("x-tenant-id"), Value: []byte("my_tenant_id")},
280+
{Key: []byte("x-request-ids"), Value: []byte("123456789")},
281+
{Key: []byte("x-request-ids"), Value: []byte("123141")},
282+
}, pm.Headers)
283+
return nil
284+
})
285+
t.Cleanup(func() {
286+
require.NoError(t, exp.Close(context.Background()))
287+
})
288+
ctx := client.NewContext(context.Background(), client.Info{
289+
Metadata: client.NewMetadata(map[string][]string{
290+
"x-tenant-id": {"my_tenant_id"},
291+
"x-request-ids": {"123456789", "123141"},
292+
"discarded-meta": {"my-meta"}, // This will be ignored.
293+
}),
294+
})
295+
err := exp.exportData(ctx, testdata.GenerateMetrics(10))
296+
require.NoError(t, err)
297+
})
298+
t.Run("WithMetadataDisabled", func(t *testing.T) {
299+
config := createDefaultConfig().(*Config)
300+
exp, producer := newMockMetricsExporter(t, *config, componenttest.NewNopHost())
301+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
302+
assert.Nil(t, pm.Headers)
303+
return nil
304+
})
305+
t.Cleanup(func() {
306+
require.NoError(t, exp.Close(context.Background()))
307+
})
308+
ctx := client.NewContext(context.Background(), client.Info{
309+
Metadata: client.NewMetadata(map[string][]string{
310+
"x-tenant-id": {"my_tenant_id"},
311+
"x-request-ids": {"123456789", "123141"},
312+
"discarded-meta": {"my-meta"},
313+
}),
314+
})
315+
err := exp.exportData(ctx, testdata.GenerateMetrics(5))
316+
require.NoError(t, err)
317+
})
223318
}
224319

225320
func TestMetricsPusher_err(t *testing.T) {
@@ -325,12 +420,59 @@ func TestLogsDataPusher_attr(t *testing.T) {
325420
}
326421

327422
func TestLogsDataPusher_ctx(t *testing.T) {
328-
config := createDefaultConfig().(*Config)
329-
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
330-
producer.ExpectSendMessageAndSucceed()
423+
t.Run("WithTopic", func(t *testing.T) {
424+
config := createDefaultConfig().(*Config)
425+
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
426+
producer.ExpectSendMessageAndSucceed()
331427

332-
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(2))
333-
require.NoError(t, err)
428+
err := exp.exportData(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(2))
429+
require.NoError(t, err)
430+
})
431+
t.Run("WithMetadata", func(t *testing.T) {
432+
config := createDefaultConfig().(*Config)
433+
config.IncludeMetadataKeys = []string{"x-tenant-id", "x-request-ids"}
434+
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
435+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
436+
assert.Equal(t, []sarama.RecordHeader{
437+
{Key: []byte("x-tenant-id"), Value: []byte("my_tenant_id")},
438+
{Key: []byte("x-request-ids"), Value: []byte("123456789")},
439+
{Key: []byte("x-request-ids"), Value: []byte("123141")},
440+
}, pm.Headers)
441+
return nil
442+
})
443+
t.Cleanup(func() {
444+
require.NoError(t, exp.Close(context.Background()))
445+
})
446+
ctx := client.NewContext(context.Background(), client.Info{
447+
Metadata: client.NewMetadata(map[string][]string{
448+
"x-tenant-id": {"my_tenant_id"},
449+
"x-request-ids": {"123456789", "123141"},
450+
"discarded-meta": {"my-meta"}, // This will be ignored.
451+
}),
452+
})
453+
err := exp.exportData(ctx, testdata.GenerateLogs(10))
454+
require.NoError(t, err)
455+
})
456+
t.Run("WithMetadataDisabled", func(t *testing.T) {
457+
config := createDefaultConfig().(*Config)
458+
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
459+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
460+
assert.Nil(t, pm.Headers)
461+
return nil
462+
})
463+
t.Cleanup(func() {
464+
require.NoError(t, exp.Close(context.Background()))
465+
})
466+
ctx := client.NewContext(context.Background(), client.Info{
467+
Metadata: client.NewMetadata(map[string][]string{
468+
"x-tenant-id": {"my_tenant_id"},
469+
"x-request-ids": {"123456789", "123141"},
470+
"discarded-meta": {"my-meta"},
471+
}),
472+
})
473+
err := exp.exportData(ctx, testdata.GenerateLogs(5))
474+
require.NoError(t, err)
475+
})
334476
}
335477

336478
func TestLogsPusher_err(t *testing.T) {

0 commit comments

Comments
 (0)