Skip to content

Commit ca6f246

Browse files
committed
kafkareceiver: propagate Kafka headers as metadata
Similar to open-telemetry#39132, but for the Kafka receiver, updates the receiver to propagate Kafka headers as client.Info (metadata). Allowing downstream processors and exporters to access the values via the enriched context. Closes open-telemetry#39129 Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 33b352a commit ca6f246

File tree

6 files changed

+233
-21
lines changed

6 files changed

+233
-21
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Propagate Kafka headers as metadata
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39129]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Allwos the Kafka receiver to propagate Kafka headers as client.Info (metadata). Allowing downstream processors and exporters to access the values via the enriched context.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Kafka receiver receives traces, metrics, and logs from Kafka. Message payload en
1717

1818
Note that metrics and logs only support OTLP.
1919

20+
If used in conjunction with the `kafkaexporter` configured with `include_metadata_keys`. The Kafka receiver will also propagate the Kafka headers to the downstream pipeline, giving access to the rest of the pipeline to arbitrary metadata keys and values.
21+
2022
## Getting Started
2123

2224
There are no required settings.

receiver/kafkareceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.123.0
1717
github.com/openzipkin/zipkin-go v0.4.3
1818
github.com/stretchr/testify v1.10.0
19+
go.opentelemetry.io/collector/client v1.29.1-0.20250402200755-cb5c3f4fb9dc
1920
go.opentelemetry.io/collector/component v1.29.1-0.20250402200755-cb5c3f4fb9dc
2021
go.opentelemetry.io/collector/component/componenttest v0.123.1-0.20250402200755-cb5c3f4fb9dc
2122
go.opentelemetry.io/collector/config/configretry v1.29.1-0.20250402200755-cb5c3f4fb9dc

receiver/kafkareceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/IBM/sarama"
1515
"github.com/cenkalti/backoff/v4"
16+
"go.opentelemetry.io/collector/client"
1617
"go.opentelemetry.io/collector/component"
1718
"go.opentelemetry.io/collector/config/configretry"
1819
"go.opentelemetry.io/collector/consumer"
@@ -539,7 +540,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
539540
session.MarkMessage(message, "")
540541
}
541542

542-
ctx := c.obsrecv.StartTracesOp(session.Context())
543+
// If the Kafka exporter has propagated headers in the message,
544+
// create a new context with client.Info in it.
545+
ctx := newContextWithHeaders(session.Context(), message.Headers)
546+
ctx = c.obsrecv.StartTracesOp(ctx)
543547
attrs := attribute.NewSet(
544548
attribute.String(attrInstanceName, c.id.String()),
545549
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -560,7 +564,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
560564

561565
c.headerExtractor.extractHeadersTraces(traces, message)
562566
spanCount := traces.SpanCount()
563-
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
567+
err = c.nextConsumer.ConsumeTraces(ctx, traces)
564568
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
565569
if err != nil {
566570
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -653,7 +657,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
653657
session.MarkMessage(message, "")
654658
}
655659

656-
ctx := c.obsrecv.StartMetricsOp(session.Context())
660+
// If the Kafka exporter has propagated headers in the message,
661+
// create a new context with client.Info in it.
662+
ctx := newContextWithHeaders(session.Context(), message.Headers)
663+
ctx = c.obsrecv.StartMetricsOp(ctx)
657664
attrs := attribute.NewSet(
658665
attribute.String(attrInstanceName, c.id.String()),
659666
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -674,7 +681,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
674681
c.headerExtractor.extractHeadersMetrics(metrics, message)
675682

676683
dataPointCount := metrics.DataPointCount()
677-
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
684+
err = c.nextConsumer.ConsumeMetrics(ctx, metrics)
678685
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
679686
if err != nil {
680687
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -767,7 +774,10 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
767774
session.MarkMessage(message, "")
768775
}
769776

770-
ctx := c.obsrecv.StartLogsOp(session.Context())
777+
// If the Kafka exporter has propagated headers in the message,
778+
// create a new context with client.Info in it.
779+
ctx := newContextWithHeaders(session.Context(), message.Headers)
780+
ctx = c.obsrecv.StartLogsOp(ctx)
771781
attrs := attribute.NewSet(
772782
attribute.String(attrInstanceName, c.id.String()),
773783
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
@@ -787,7 +797,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
787797
}
788798
c.headerExtractor.extractHeadersLogs(logs, message)
789799
logRecordCount := logs.LogRecordCount()
790-
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
800+
err = c.nextConsumer.ConsumeLogs(ctx, logs)
791801
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
792802
if err != nil {
793803
if errorRequiresBackoff(err) && c.backOff != nil {
@@ -892,3 +902,18 @@ func encodingToComponentID(encoding string) (*component.ID, error) {
892902
func errorRequiresBackoff(err error) bool {
893903
return err.Error() == errMemoryLimiterDataRefused.Error()
894904
}
905+
906+
func newContextWithHeaders(ctx context.Context,
907+
headers []*sarama.RecordHeader,
908+
) context.Context {
909+
if len(headers) == 0 {
910+
return ctx
911+
}
912+
m := make(map[string][]string, len(headers))
913+
for _, header := range headers {
914+
key := string(header.Key)
915+
value := string(header.Value)
916+
m[key] = append(m[key], value)
917+
}
918+
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)})
919+
}

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 170 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@ import (
77
"context"
88
"errors"
99
"sync"
10+
"sync/atomic"
1011
"testing"
1112
"time"
1213

1314
"github.com/IBM/sarama"
1415
"github.com/cenkalti/backoff/v4"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
18+
"go.opentelemetry.io/collector/client"
1719
"go.opentelemetry.io/collector/component"
1820
"go.opentelemetry.io/collector/component/componenttest"
21+
"go.opentelemetry.io/collector/consumer"
1922
"go.opentelemetry.io/collector/consumer/consumertest"
2023
"go.opentelemetry.io/collector/pdata/plog"
2124
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -111,11 +114,22 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
111114

112115
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
113116
require.NoError(t, err)
117+
var called atomic.Bool
114118
c := tracesConsumerGroupHandler{
115-
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
116-
logger: zap.NewNop(),
117-
ready: make(chan bool),
118-
nextConsumer: consumertest.NewNop(),
119+
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
120+
logger: zap.NewNop(),
121+
ready: make(chan bool),
122+
nextConsumer: func() consumer.Traces {
123+
c, err := consumer.NewTraces(func(ctx context.Context, _ ptrace.Traces) error {
124+
defer called.CompareAndSwap(false, true)
125+
info := client.FromContext(ctx)
126+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
127+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
128+
return nil
129+
})
130+
require.NoError(t, err)
131+
return c
132+
}(),
119133
obsrecv: obsrecv,
120134
headerExtractor: &nopHeaderExtractor{},
121135
telemetryBuilder: telemetryBuilder,
@@ -141,9 +155,25 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
141155
wg.Done()
142156
}()
143157

144-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
158+
groupClaim.messageChan <- &sarama.ConsumerMessage{
159+
Headers: []*sarama.RecordHeader{
160+
{
161+
Key: []byte("x-tenant-id"),
162+
Value: []byte("abcdefg"),
163+
},
164+
{
165+
Key: []byte("x-request-ids"),
166+
Value: []byte("1234"),
167+
},
168+
{
169+
Key: []byte("x-request-ids"),
170+
Value: []byte("5678"),
171+
},
172+
},
173+
}
145174
close(groupClaim.messageChan)
146175
wg.Wait()
176+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
147177
}
148178

149179
func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
@@ -410,11 +440,22 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
410440

411441
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
412442
require.NoError(t, err)
443+
var called atomic.Bool
413444
c := metricsConsumerGroupHandler{
414-
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
415-
logger: zap.NewNop(),
416-
ready: make(chan bool),
417-
nextConsumer: consumertest.NewNop(),
445+
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
446+
logger: zap.NewNop(),
447+
ready: make(chan bool),
448+
nextConsumer: func() consumer.Metrics {
449+
c, err := consumer.NewMetrics(func(ctx context.Context, _ pmetric.Metrics) error {
450+
defer called.CompareAndSwap(false, true)
451+
info := client.FromContext(ctx)
452+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
453+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
454+
return nil
455+
})
456+
require.NoError(t, err)
457+
return c
458+
}(),
418459
obsrecv: obsrecv,
419460
headerExtractor: &nopHeaderExtractor{},
420461
telemetryBuilder: telemetryBuilder,
@@ -440,9 +481,25 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
440481
wg.Done()
441482
}()
442483

443-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
484+
groupClaim.messageChan <- &sarama.ConsumerMessage{
485+
Headers: []*sarama.RecordHeader{
486+
{
487+
Key: []byte("x-tenant-id"),
488+
Value: []byte("abcdefg"),
489+
},
490+
{
491+
Key: []byte("x-request-ids"),
492+
Value: []byte("1234"),
493+
},
494+
{
495+
Key: []byte("x-request-ids"),
496+
Value: []byte("5678"),
497+
},
498+
},
499+
}
444500
close(groupClaim.messageChan)
445501
wg.Wait()
502+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
446503
}
447504

448505
func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
@@ -722,11 +779,22 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
722779

723780
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)})
724781
require.NoError(t, err)
782+
var called atomic.Bool
725783
c := logsConsumerGroupHandler{
726-
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
727-
logger: zap.NewNop(),
728-
ready: make(chan bool),
729-
nextConsumer: consumertest.NewNop(),
784+
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
785+
logger: zap.NewNop(),
786+
ready: make(chan bool),
787+
nextConsumer: func() consumer.Logs {
788+
c, err := consumer.NewLogs(func(ctx context.Context, _ plog.Logs) error {
789+
defer called.CompareAndSwap(false, true)
790+
info := client.FromContext(ctx)
791+
assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id"))
792+
assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids"))
793+
return nil
794+
})
795+
require.NoError(t, err)
796+
return c
797+
}(),
730798
obsrecv: obsrecv,
731799
headerExtractor: &nopHeaderExtractor{},
732800
telemetryBuilder: telemetryBuilder,
@@ -752,9 +820,25 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
752820
wg.Done()
753821
}()
754822

755-
groupClaim.messageChan <- &sarama.ConsumerMessage{}
823+
groupClaim.messageChan <- &sarama.ConsumerMessage{
824+
Headers: []*sarama.RecordHeader{
825+
{
826+
Key: []byte("x-tenant-id"),
827+
Value: []byte("abcdefg"),
828+
},
829+
{
830+
Key: []byte("x-request-ids"),
831+
Value: []byte("1234"),
832+
},
833+
{
834+
Key: []byte("x-request-ids"),
835+
Value: []byte("5678"),
836+
},
837+
},
838+
}
756839
close(groupClaim.messageChan)
757840
wg.Wait()
841+
assert.True(t, called.Load()) // Ensure nextConsumer was called.
758842
}
759843

760844
func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
@@ -1268,3 +1352,74 @@ func (c *nopNoUnmarshalerComponent) Start(_ context.Context, _ component.Host) e
12681352
func (c *nopNoUnmarshalerComponent) Shutdown(_ context.Context) error {
12691353
return nil
12701354
}
1355+
1356+
func Test_newContextWithHeaders(t *testing.T) {
1357+
type args struct {
1358+
ctx context.Context
1359+
headers []*sarama.RecordHeader
1360+
}
1361+
tests := []struct {
1362+
name string
1363+
args args
1364+
want map[string][]string
1365+
}{
1366+
{
1367+
name: "no headers",
1368+
args: args{
1369+
ctx: context.Background(),
1370+
headers: []*sarama.RecordHeader{},
1371+
},
1372+
want: map[string][]string{},
1373+
},
1374+
{
1375+
name: "single header",
1376+
args: args{
1377+
ctx: context.Background(),
1378+
headers: []*sarama.RecordHeader{
1379+
{Key: []byte("key1"), Value: []byte("value1")},
1380+
},
1381+
},
1382+
want: map[string][]string{
1383+
"key1": {"value1"},
1384+
},
1385+
},
1386+
{
1387+
name: "multiple headers",
1388+
args: args{
1389+
ctx: context.Background(),
1390+
headers: []*sarama.RecordHeader{
1391+
{Key: []byte("key1"), Value: []byte("value1")},
1392+
{Key: []byte("key2"), Value: []byte("value2")},
1393+
},
1394+
},
1395+
want: map[string][]string{
1396+
"key1": {"value1"},
1397+
"key2": {"value2"},
1398+
},
1399+
},
1400+
{
1401+
name: "duplicate keys",
1402+
args: args{
1403+
ctx: context.Background(),
1404+
headers: []*sarama.RecordHeader{
1405+
{Key: []byte("key1"), Value: []byte("value1")},
1406+
{Key: []byte("key1"), Value: []byte("value2")},
1407+
},
1408+
},
1409+
want: map[string][]string{
1410+
"key1": {"value1", "value2"},
1411+
},
1412+
},
1413+
}
1414+
1415+
for _, tt := range tests {
1416+
t.Run(tt.name, func(t *testing.T) {
1417+
ctx := newContextWithHeaders(tt.args.ctx, tt.args.headers)
1418+
clientInfo := client.FromContext(ctx)
1419+
for k, wantVal := range tt.want {
1420+
val := clientInfo.Metadata.Get(k)
1421+
assert.Equal(t, wantVal, val)
1422+
}
1423+
})
1424+
}
1425+
}

0 commit comments

Comments
 (0)