Skip to content

Commit c5ca24b

Browse files
committed
kafkaexporter: Propagate metadata keys as headers
Adds a new config option specifying a list of metadata keys that should be propagated as Kafka message headers. Closes open-telemetry#39130 Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 4e84b33 commit c5ca24b

File tree

6 files changed

+182
-33
lines changed

6 files changed

+182
-33
lines changed

exporter/kafkaexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The following settings can be optionally configured:
2626
- `client_id` (default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.
2727
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
2828
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
29+
- `include_metadata_keys` (default = []): Specifies a list of metadata keys to propagate as Kafka message headers. If one or more keys aren't found in the metadata, they are ignored. Additionally, if the metadata key contains more than one item, it won't be propagated.
2930
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
3031
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
3132
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.

exporter/kafkaexporter/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Config struct {
2424
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
2525
Topic string `mapstructure:"topic"`
2626

27+
// IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers.
28+
IncludeMetadataKeys []string `mapstructure:"include_metadata_keys"`
29+
2730
// TopicFromAttribute is the name of the attribute to use as the topic name.
2831
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
2932

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
1616
github.com/openzipkin/zipkin-go v0.4.3
1717
github.com/stretchr/testify v1.10.0
18+
go.opentelemetry.io/collector/client v1.29.1-0.20250402200755-cb5c3f4fb9dc
1819
go.opentelemetry.io/collector/component v1.29.1-0.20250402200755-cb5c3f4fb9dc
1920
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250402200755-cb5c3f4fb9dc
2021
go.opentelemetry.io/collector/config/configretry v1.29.1-0.20250402200755-cb5c3f4fb9dc

exporter/kafkaexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/client"
1213
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/consumer/consumererror"
1415
"go.opentelemetry.io/collector/exporter"
@@ -46,6 +47,9 @@ func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces
4647
if err != nil {
4748
return consumererror.NewPermanent(err)
4849
}
50+
messagesWithHeaders(messages,
51+
metadataToHeaders(ctx, e.cfg.IncludeMetadataKeys),
52+
)
4953
err = e.producer.SendMessages(messages)
5054
if err != nil {
5155
var prodErr sarama.ProducerErrors
@@ -104,6 +108,9 @@ func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric
104108
if err != nil {
105109
return consumererror.NewPermanent(err)
106110
}
111+
messagesWithHeaders(messages,
112+
metadataToHeaders(ctx, e.cfg.IncludeMetadataKeys),
113+
)
107114
err = e.producer.SendMessages(messages)
108115
if err != nil {
109116
var prodErr sarama.ProducerErrors
@@ -162,6 +169,9 @@ func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) er
162169
if err != nil {
163170
return consumererror.NewPermanent(err)
164171
}
172+
messagesWithHeaders(messages,
173+
metadataToHeaders(ctx, e.cfg.IncludeMetadataKeys),
174+
)
165175
err = e.producer.SendMessages(messages)
166176
if err != nil {
167177
var prodErr sarama.ProducerErrors
@@ -287,3 +297,35 @@ func encodingToComponentID(encoding string) (*component.ID, error) {
287297
id := component.NewID(componentType)
288298
return &id, nil
289299
}
300+
301+
func messagesWithHeaders(msg []*sarama.ProducerMessage, h []sarama.RecordHeader) {
302+
if len(h) == 0 || len(msg) == 0 {
303+
return
304+
}
305+
for i := range msg {
306+
if len(msg[i].Headers) == 0 {
307+
msg[i].Headers = h
308+
continue
309+
}
310+
msg[i].Headers = append(msg[i].Headers, h...)
311+
}
312+
}
313+
314+
func metadataToHeaders(ctx context.Context, keys []string) []sarama.RecordHeader {
315+
if len(keys) == 0 {
316+
return nil
317+
}
318+
info := client.FromContext(ctx)
319+
headers := make([]sarama.RecordHeader, 0, len(keys))
320+
for _, key := range keys {
321+
v := info.Metadata.Get(key)
322+
if len(v) != 1 {
323+
continue
324+
}
325+
headers = append(headers, sarama.RecordHeader{
326+
Key: []byte(key),
327+
Value: []byte(v[0]),
328+
})
329+
}
330+
return headers
331+
}

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 133 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/IBM/sarama/mocks"
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
15+
"go.opentelemetry.io/collector/client"
1516
"go.opentelemetry.io/collector/component"
1617
"go.opentelemetry.io/collector/component/componenttest"
1718
"go.opentelemetry.io/collector/exporter/exportertest"
@@ -134,19 +135,52 @@ func TestTracesPusher_attr(t *testing.T) {
134135
}
135136

136137
func TestTracesPusher_ctx(t *testing.T) {
137-
c := sarama.NewConfig()
138-
producer := mocks.NewSyncProducer(t, c)
139-
producer.ExpectSendMessageAndSucceed()
138+
t.Run("WithTopic", func(t *testing.T) {
139+
c := sarama.NewConfig()
140+
producer := mocks.NewSyncProducer(t, c)
141+
producer.ExpectSendMessageAndSucceed()
142+
143+
p := kafkaTracesProducer{
144+
producer: producer,
145+
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
146+
}
147+
t.Cleanup(func() {
148+
require.NoError(t, p.Close(context.Background()))
149+
})
150+
err := p.tracesPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
151+
require.NoError(t, err)
152+
})
153+
t.Run("WithMetadata", func(t *testing.T) {
154+
c := sarama.NewConfig()
155+
producer := mocks.NewSyncProducer(t, c)
156+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
157+
assert.Equal(t, []sarama.RecordHeader{
158+
{Key: []byte("x-tenant-id"), Value: []byte("my_tenant_id")},
159+
{Key: []byte("x-request-id"), Value: []byte("123456789")},
160+
}, pm.Headers)
161+
return nil
162+
})
140163

141-
p := kafkaTracesProducer{
142-
producer: producer,
143-
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
144-
}
145-
t.Cleanup(func() {
146-
require.NoError(t, p.Close(context.Background()))
164+
p := kafkaTracesProducer{
165+
cfg: Config{
166+
IncludeMetadataKeys: []string{"x-tenant-id", "x-request-id"},
167+
},
168+
producer: producer,
169+
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
170+
}
171+
t.Cleanup(func() {
172+
require.NoError(t, p.Close(context.Background()))
173+
})
174+
ctx := client.NewContext(context.Background(), client.Info{
175+
Metadata: client.NewMetadata(map[string][]string{
176+
"x-tenant-id": {"my_tenant_id"},
177+
"x-request-id": {"123456789"},
178+
"discarded-meta": {"my-meta"}, // This will be ignored.
179+
}),
180+
})
181+
err := p.tracesPusher(ctx, testdata.GenerateTraces(10))
182+
require.NoError(t, err)
147183
})
148-
err := p.tracesPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
149-
require.NoError(t, err)
150184
}
151185

152186
func TestTracesPusher_err(t *testing.T) {
@@ -215,19 +249,52 @@ func TestMetricsDataPusher_attr(t *testing.T) {
215249
}
216250

217251
func TestMetricsDataPusher_ctx(t *testing.T) {
218-
c := sarama.NewConfig()
219-
producer := mocks.NewSyncProducer(t, c)
220-
producer.ExpectSendMessageAndSucceed()
252+
t.Run("WithTopic", func(t *testing.T) {
253+
c := sarama.NewConfig()
254+
producer := mocks.NewSyncProducer(t, c)
255+
producer.ExpectSendMessageAndSucceed()
256+
257+
p := kafkaMetricsProducer{
258+
producer: producer,
259+
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
260+
}
261+
t.Cleanup(func() {
262+
require.NoError(t, p.Close(context.Background()))
263+
})
264+
err := p.metricsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
265+
require.NoError(t, err)
266+
})
267+
t.Run("WithMetadata", func(t *testing.T) {
268+
c := sarama.NewConfig()
269+
producer := mocks.NewSyncProducer(t, c)
270+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
271+
assert.Equal(t, []sarama.RecordHeader{
272+
{Key: []byte("x-tenant-id"), Value: []byte("anoter_tenant_id")},
273+
{Key: []byte("x-request-id"), Value: []byte("987654321")},
274+
}, pm.Headers)
275+
return nil
276+
})
221277

222-
p := kafkaMetricsProducer{
223-
producer: producer,
224-
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
225-
}
226-
t.Cleanup(func() {
227-
require.NoError(t, p.Close(context.Background()))
278+
p := kafkaMetricsProducer{
279+
cfg: Config{
280+
IncludeMetadataKeys: []string{"x-tenant-id", "x-request-id"},
281+
},
282+
producer: producer,
283+
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
284+
}
285+
t.Cleanup(func() {
286+
require.NoError(t, p.Close(context.Background()))
287+
})
288+
ctx := client.NewContext(context.Background(), client.Info{
289+
Metadata: client.NewMetadata(map[string][]string{
290+
"x-tenant-id": {"anoter_tenant_id"},
291+
"x-request-id": {"987654321"},
292+
"discarded-meta": {"my-meta"}, // This will be ignored.
293+
}),
294+
})
295+
err := p.metricsDataPusher(ctx, testdata.GenerateMetrics(5))
296+
require.NoError(t, err)
228297
})
229-
err := p.metricsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
230-
require.NoError(t, err)
231298
}
232299

233300
func TestMetricsDataPusher_err(t *testing.T) {
@@ -296,19 +363,52 @@ func TestLogsDataPusher_attr(t *testing.T) {
296363
}
297364

298365
func TestLogsDataPusher_ctx(t *testing.T) {
299-
c := sarama.NewConfig()
300-
producer := mocks.NewSyncProducer(t, c)
301-
producer.ExpectSendMessageAndSucceed()
366+
t.Run("WithTopic", func(t *testing.T) {
367+
c := sarama.NewConfig()
368+
producer := mocks.NewSyncProducer(t, c)
369+
producer.ExpectSendMessageAndSucceed()
370+
371+
p := kafkaLogsProducer{
372+
producer: producer,
373+
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
374+
}
375+
t.Cleanup(func() {
376+
require.NoError(t, p.Close(context.Background()))
377+
})
378+
err := p.logsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(1))
379+
require.NoError(t, err)
380+
})
381+
t.Run("WithMetadata", func(t *testing.T) {
382+
c := sarama.NewConfig()
383+
producer := mocks.NewSyncProducer(t, c)
384+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(func(pm *sarama.ProducerMessage) error {
385+
assert.Equal(t, []sarama.RecordHeader{
386+
{Key: []byte("x-tenant-id"), Value: []byte("yet_another_tenant_id")},
387+
{Key: []byte("x-request-id"), Value: []byte("01234")},
388+
}, pm.Headers)
389+
return nil
390+
})
302391

303-
p := kafkaLogsProducer{
304-
producer: producer,
305-
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
306-
}
307-
t.Cleanup(func() {
308-
require.NoError(t, p.Close(context.Background()))
392+
p := kafkaLogsProducer{
393+
cfg: Config{
394+
IncludeMetadataKeys: []string{"x-tenant-id", "x-request-id"},
395+
},
396+
producer: producer,
397+
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
398+
}
399+
t.Cleanup(func() {
400+
require.NoError(t, p.Close(context.Background()))
401+
})
402+
ctx := client.NewContext(context.Background(), client.Info{
403+
Metadata: client.NewMetadata(map[string][]string{
404+
"x-tenant-id": {"yet_another_tenant_id"},
405+
"x-request-id": {"01234"},
406+
"discarded-meta": {"my-meta"}, // This will be ignored.
407+
}),
408+
})
409+
err := p.logsDataPusher(ctx, testdata.GenerateLogs(15))
410+
require.NoError(t, err)
309411
})
310-
err := p.logsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(1))
311-
require.NoError(t, err)
312412
}
313413

314414
func TestLogsDataPusher_err(t *testing.T) {

0 commit comments

Comments
 (0)