From 7bc2f19b239c16f81a32cf83ae379d7f8bb4c640 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 18 Jul 2023 14:36:22 +0530 Subject: [PATCH 01/12] chore: add header extractor --- receiver/kafkareceiver/factory.go | 1 + receiver/kafkareceiver/header_extraction.go | 77 +++++++++++++++++++++ receiver/kafkareceiver/kafka_receiver.go | 43 +++++++++++- 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 receiver/kafkareceiver/header_extraction.go diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 00ff43e4fa987..b3278eb9a1871 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -111,6 +111,7 @@ func createDefaultConfig() component.Config { After: false, OnError: false, }, + HeaderExtraction: false, } } diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go new file mode 100644 index 0000000000000..8d5f35fbdb35c --- /dev/null +++ b/receiver/kafkareceiver/header_extraction.go @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" + +import ( + "fmt" + + "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +var getAttribute = func(key string) string { + return fmt.Sprintf("kafka.header.%s", key) +} + +type headerExtractor struct { + logger *zap.Logger + headers []string +} + +func (ex *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { + for _, header := range ex.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + ex.logger.Warn("Header key not found in the trace: ", zap.String("key", header)) + continue + } + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + rs.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func (ex *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { + for _, header := range ex.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + ex.logger.Warn("Header key not found in the logger: ", zap.String("key", header)) + continue + } + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rl := logs.ResourceLogs().At(i) + rl.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func (ex *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { + for _, header := range ex.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + ex.logger.Warn("Header key not found in the metric: ", zap.String("key", header)) + continue + } + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + rm.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) { + for _, kafkaHeader := range headers { + headerKey := string(kafkaHeader.Key) + if headerKey == header { + // matching header found + return string(kafkaHeader.Value), true + } + } + // no header found matching the key, report to the user + return "", false +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 5f76e5295a5f3..4bf762461cfc5 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -40,6 +40,8 @@ type kafkaTracesConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. @@ -54,6 +56,8 @@ type kafkaMetricsConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. @@ -68,6 +72,8 @@ type kafkaLogsConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } var _ receiver.Traces = (*kafkaTracesConsumer)(nil) @@ -114,6 +120,8 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headerExtraction: config.HeaderExtraction, + headers: config.Headers, }, nil } @@ -137,6 +145,12 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, } + if c.headerExtraction { + consumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } + } go func() { if err := c.consumeLoop(ctx, consumerGroup); err != nil { host.ReportFatalError(err) @@ -207,6 +221,8 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headerExtraction: config.HeaderExtraction, + headers: config.Headers, }, nil } @@ -230,6 +246,12 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, } + if c.headerExtraction { + metricsConsumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } + } go func() { if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { host.ReportFatalError(err) @@ -300,6 +322,8 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headers: config.Headers, + headerExtraction: config.HeaderExtraction, }, nil } @@ -351,6 +375,12 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, } + if c.headerExtraction { + logsConsumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } + } go func() { if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil { host.ReportFatalError(err) @@ -394,6 +424,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor *headerExtractor } type metricsConsumerGroupHandler struct { @@ -409,6 +440,7 @@ type metricsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor *headerExtractor } type logsConsumerGroupHandler struct { @@ -424,6 +456,7 @@ type logsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor *headerExtractor } var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) @@ -480,6 +513,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe return err } + if c.headerExtractor != nil { + c.headerExtractor.extractHeadersTraces(traces, message) + } spanCount := traces.SpanCount() err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) @@ -554,6 +590,9 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } return err } + if c.headerExtractor != nil { + c.headerExtractor.extractHeadersMetrics(metrics, message) + } dataPointCount := metrics.DataPointCount() err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) @@ -634,7 +673,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } return err } - + if c.headerExtractor != nil { + c.headerExtractor.extractHeadersLogs(logs, message) + } err = c.nextConsumer.ConsumeLogs(session.Context(), logs) // TODO c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) From b50f28e6317fdf6d84bc650be51bb8a84891fdec Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 18 Jul 2023 14:36:51 +0530 Subject: [PATCH 02/12] chore: add test cases --- receiver/kafkareceiver/config.go | 4 + .../kafkareceiver/header_extraction_test.go | 211 ++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 receiver/kafkareceiver/header_extraction_test.go diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 1ed844b2ad890..99395ec9951f1 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -60,6 +60,10 @@ type Config struct { // Controls the way the messages are marked as consumed MessageMarking MessageMarking `mapstructure:"message_marking"` + + // Extract headers + HeaderExtraction bool `mapstructure:"header_extraction"` + Headers []string `mapstructure:"headers"` } const ( diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go new file mode 100644 index 0000000000000..ded15e0c5e37e --- /dev/null +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -0,0 +1,211 @@ +package kafkareceiver + +import ( + "context" + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/receiver/receivertest" + + "bytes" + "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + "math" + "strings" + "sync" + "testing" +) + +func TestHeaderExtractionTraces(t *testing.T) { + obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + nextConsumer := (new(consumertest.TracesSink)) + c := tracesConsumerGroupHandler{ + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: nextConsumer, + // nextConsumer: &mockConsumer{}, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zap.NewNop(), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, trace := range nextConsumer.AllTraces() { + for i := 0; i < trace.ResourceSpans().Len(); i++ { + rs := trace.ResourceSpans().At(i) + val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue1") + val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue2") + } + } + assert.NoError(t, err) + wg.Done() + }() + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().Resource() + td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty() + unmarshaler := &ptrace.ProtoMarshaler{} + bts, err := unmarshaler.MarshalTraces(td) + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValue1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValue2"), + }, + }, + Value: bts, + } + cancelFunc() + wg.Wait() + +} + +func TestHeaderExtractionLogs(t *testing.T) { + obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + nextConsumer := new(consumertest.LogsSink) + unmarshaler := newTextLogsUnmarshaler() + unmarshaler, err = unmarshaler.WithEnc("utf-8") + c := logsConsumerGroupHandler{ + unmarshaler: unmarshaler, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zap.NewNop(), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, logs := range nextConsumer.AllLogs() { + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rs := logs.ResourceLogs().At(i) + val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue1") + val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue2") + } + } + assert.NoError(t, err) + wg.Done() + }() + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValue1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValue2"), + }, + }, + Value: []byte("Message"), + } + cancelFunc() + wg.Wait() + +} + +func TestHeaderExtractionMetrics(t *testing.T) { + obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + nextConsumer := new(consumertest.MetricsSink) + c := metricsConsumerGroupHandler{ + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zap.NewNop(), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, metric := range nextConsumer.AllMetrics() { + for i := 0; i < metric.ResourceMetrics().Len(); i++ { + rs := metric.ResourceMetrics().At(i) + val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue1") + val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), "headerValue2") + } + } + assert.NoError(t, err) + wg.Done() + }() + ld := testdata.GenerateMetricsOneMetric() + unmarshaler := &pmetric.ProtoMarshaler{} + bts, err := unmarshaler.MarshalMetrics(ld) + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValue1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValue2"), + }, + }, + Value: bts, + } + // groupClaim.messageChan <- &sarama.ConsumerMessage{} + cancelFunc() + wg.Wait() + +} From 998494088f3e637fec92fbbf3482f5479afbf63d Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:00:36 +0530 Subject: [PATCH 03/12] chore: address review comments, fix testcases and improve readability --- receiver/kafkareceiver/config.go | 10 ++- receiver/kafkareceiver/factory.go | 4 +- receiver/kafkareceiver/header_extraction.go | 20 +++--- .../kafkareceiver/header_extraction_test.go | 66 ++++++++----------- receiver/kafkareceiver/kafka_receiver.go | 12 ++-- 5 files changed, 53 insertions(+), 59 deletions(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 99395ec9951f1..7487e06b0be6f 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -31,6 +31,11 @@ type MessageMarking struct { OnError bool `mapstructure:"on_error"` } +type HeaderExtraction struct { + ExtractHeaders bool `mapstructure:"extract_headers"` + Headers []string `mapstructure:"headers"` +} + // Config defines configuration for Kafka receiver. type Config struct { // The list of kafka brokers (default localhost:9092) @@ -61,9 +66,8 @@ type Config struct { // Controls the way the messages are marked as consumed MessageMarking MessageMarking `mapstructure:"message_marking"` - // Extract headers - HeaderExtraction bool `mapstructure:"header_extraction"` - Headers []string `mapstructure:"headers"` + // Extract headers from kafka records + HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"` } const ( diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index b3278eb9a1871..cbcadf3233a88 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -111,7 +111,9 @@ func createDefaultConfig() component.Config { After: false, OnError: false, }, - HeaderExtraction: false, + HeaderExtraction: HeaderExtraction{ + ExtractHeaders: false, + }, } } diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go index 8d5f35fbdb35c..90387bc77bc62 100644 --- a/receiver/kafkareceiver/header_extraction.go +++ b/receiver/kafkareceiver/header_extraction.go @@ -13,7 +13,7 @@ import ( "go.uber.org/zap" ) -var getAttribute = func(key string) string { +func getAttribute(key string) string { return fmt.Sprintf("kafka.header.%s", key) } @@ -22,11 +22,11 @@ type headerExtractor struct { headers []string } -func (ex *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { - for _, header := range ex.headers { +func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { + for _, header := range he.headers { value, ok := getHeaderValue(message.Headers, header) if !ok { - ex.logger.Warn("Header key not found in the trace: ", zap.String("key", header)) + he.logger.Debug("Header key not found in the trace: ", zap.String("key", header)) continue } for i := 0; i < traces.ResourceSpans().Len(); i++ { @@ -36,11 +36,11 @@ func (ex *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *s } } -func (ex *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { - for _, header := range ex.headers { +func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { + for _, header := range he.headers { value, ok := getHeaderValue(message.Headers, header) if !ok { - ex.logger.Warn("Header key not found in the logger: ", zap.String("key", header)) + he.logger.Debug("Header key not found in the logger: ", zap.String("key", header)) continue } for i := 0; i < logs.ResourceLogs().Len(); i++ { @@ -50,11 +50,11 @@ func (ex *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.Co } } -func (ex *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { - for _, header := range ex.headers { +func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { + for _, header := range he.headers { value, ok := getHeaderValue(message.Headers, header) if !ok { - ex.logger.Warn("Header key not found in the metric: ", zap.String("key", header)) + he.logger.Debug("Header key not found in the metric: ", zap.String("key", header)) continue } for i := 0; i < metrics.ResourceMetrics().Len(); i++ { diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index ded15e0c5e37e..4eaf435cc9856 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -2,38 +2,32 @@ package kafkareceiver import ( "context" + "sync" + "testing" + "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/obsreport" - "go.opentelemetry.io/collector/receiver/receivertest" - - "bytes" - "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" - "math" - "strings" - "sync" - "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) func TestHeaderExtractionTraces(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) - nextConsumer := (new(consumertest.TracesSink)) + nextConsumer := &consumertest.TracesSink{} c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: nextConsumer, - // nextConsumer: &mockConsumer{}, - obsrecv: obsrecv, + obsrecv: obsrecv, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ @@ -54,12 +48,8 @@ func TestHeaderExtractionTraces(t *testing.T) { for _, trace := range nextConsumer.AllTraces() { for i := 0; i < trace.ResourceSpans().Len(); i++ { rs := trace.ResourceSpans().At(i) - val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue1") - val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue2") + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValue1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValue2") } } assert.NoError(t, err) @@ -90,7 +80,7 @@ func TestHeaderExtractionTraces(t *testing.T) { func TestHeaderExtractionLogs(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) - nextConsumer := new(consumertest.LogsSink) + nextConsumer := &consumertest.LogsSink{} unmarshaler := newTextLogsUnmarshaler() unmarshaler, err = unmarshaler.WithEnc("utf-8") c := logsConsumerGroupHandler{ @@ -119,12 +109,8 @@ func TestHeaderExtractionLogs(t *testing.T) { for _, logs := range nextConsumer.AllLogs() { for i := 0; i < logs.ResourceLogs().Len(); i++ { rs := logs.ResourceLogs().At(i) - val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue1") - val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue2") + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueLog1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueLog2") } } assert.NoError(t, err) @@ -134,11 +120,11 @@ func TestHeaderExtractionLogs(t *testing.T) { Headers: []*sarama.RecordHeader{ { Key: []byte("headerKey1"), - Value: []byte("headerValue1"), + Value: []byte("headerValueLog1"), }, { Key: []byte("headerKey2"), - Value: []byte("headerValue2"), + Value: []byte("headerValueLog2"), }, }, Value: []byte("Message"), @@ -150,7 +136,7 @@ func TestHeaderExtractionLogs(t *testing.T) { func TestHeaderExtractionMetrics(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) - nextConsumer := new(consumertest.MetricsSink) + nextConsumer := &consumertest.MetricsSink{} c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), @@ -177,12 +163,8 @@ func TestHeaderExtractionMetrics(t *testing.T) { for _, metric := range nextConsumer.AllMetrics() { for i := 0; i < metric.ResourceMetrics().Len(); i++ { rs := metric.ResourceMetrics().At(i) - val, ok := rs.Resource().Attributes().Get("kafka.header.headerKey1") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue1") - val, ok = rs.Resource().Attributes().Get("kafka.header.headerKey2") - assert.Equal(t, ok, true) - assert.Equal(t, val.Str(), "headerValue2") + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueMetric1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueMetric2") } } assert.NoError(t, err) @@ -195,11 +177,11 @@ func TestHeaderExtractionMetrics(t *testing.T) { Headers: []*sarama.RecordHeader{ { Key: []byte("headerKey1"), - Value: []byte("headerValue1"), + Value: []byte("headerValueMetric1"), }, { Key: []byte("headerKey2"), - Value: []byte("headerValue2"), + Value: []byte("headerValueMetric2"), }, }, Value: bts, @@ -209,3 +191,9 @@ func TestHeaderExtractionMetrics(t *testing.T) { wg.Wait() } + +func validateHeader(t *testing.T, rs pcommon.Resource, headerKey string, headerValue string) { + val, ok := rs.Attributes().Get(headerKey) + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), headerValue) +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 4bf762461cfc5..626e35c64846a 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -120,8 +120,8 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, - headerExtraction: config.HeaderExtraction, - headers: config.Headers, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } @@ -221,8 +221,8 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, - headerExtraction: config.HeaderExtraction, - headers: config.Headers, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } @@ -322,8 +322,8 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, - headers: config.Headers, - headerExtraction: config.HeaderExtraction, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } From b8769b62250ce2a6ab342588d75bd7b8f925d086 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:06:15 +0530 Subject: [PATCH 04/12] chore: add doc --- receiver/kafkareceiver/README.md | 41 +++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 37d808c4b8261..480ce1600557e 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -85,7 +85,10 @@ The following settings can be optionally configured: - `after`: (default = false) If true, the messages are marked after the pipeline execution - `on_error`: (default = false) If false, only the successfully processed messages are marked **Note: this can block the entire partition in case a message processing returns a permanent error** - +- `header_extraction`: + - `extract_headers` (default = false): Allows user to attach header fields to resourse attributes in otel piepline + - `headers` (default = []): List of headers they'd like to extract from kafka record. + **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** Example: ```yaml @@ -93,3 +96,39 @@ receivers: kafka: protocol_version: 2.0.0 ``` + +Example of header extraction: + +```yaml +receivers: + kafka: + topic: test + header_extraction: + extract_headers: true + headers: ["header1"] +``` + +- If we feed following kafka record to `test` topic and use above configs: +```yaml +{ + event: Hello, + headers: { + header1: value1, + header2: value2, + } +} +``` +we will get a log record in collector similar to: +```yaml{ + ... + body: Hello, + resource: { + kafka.header.header1: value1, + kafka.header.header2: value2, + }, + ... +} +``` + +- Here you can see the kafka record header `header1` being added to resource attribute. +- Every kafka header key is prefixed with `kafka.header` string and attached to resource attributes. \ No newline at end of file From 019d5dc3ceda98084aed49c9ac1357245b922cc2 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:10:44 +0530 Subject: [PATCH 05/12] chore: add changelog --- .chloggen/header-extraction-kafka.yaml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .chloggen/header-extraction-kafka.yaml diff --git a/.chloggen/header-extraction-kafka.yaml b/.chloggen/header-extraction-kafka.yaml new file mode 100644 index 0000000000000..1501652f34645 --- /dev/null +++ b/.chloggen/header-extraction-kafka.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow users to attach kafka header metadata with the log/metric/trace record in the pipeline. Introduce a new config param, 'header_extraction' and some examples. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 2c283ee67044bbe7aa71a037631d997ef9f61e0c Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:15:34 +0530 Subject: [PATCH 06/12] fix: use test logger --- receiver/kafkareceiver/header_extraction_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index 4eaf435cc9856..1309607311d4f 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -14,7 +14,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" + "go.uber.org/zap/zaptest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) @@ -24,14 +24,14 @@ func TestHeaderExtractionTraces(t *testing.T) { nextConsumer := &consumertest.TracesSink{} c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), ready: make(chan bool), nextConsumer: nextConsumer, obsrecv: obsrecv, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), headers: headers, } groupClaim := &testConsumerGroupClaim{ @@ -85,14 +85,14 @@ func TestHeaderExtractionLogs(t *testing.T) { unmarshaler, err = unmarshaler.WithEnc("utf-8") c := logsConsumerGroupHandler{ unmarshaler: unmarshaler, - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), ready: make(chan bool), nextConsumer: nextConsumer, obsrecv: obsrecv, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), headers: headers, } groupClaim := &testConsumerGroupClaim{ @@ -139,14 +139,14 @@ func TestHeaderExtractionMetrics(t *testing.T) { nextConsumer := &consumertest.MetricsSink{} c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), ready: make(chan bool), nextConsumer: nextConsumer, obsrecv: obsrecv, } headers := []string{"headerKey1", "headerKey2"} c.headerExtractor = &headerExtractor{ - logger: zap.NewNop(), + logger: zaptest.NewLogger(t), headers: headers, } groupClaim := &testConsumerGroupClaim{ From d7e367f4891bda781eb304b61851d7331c74228c Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:43:45 +0530 Subject: [PATCH 07/12] chore: use nop-header extractir --- receiver/kafkareceiver/go.mod | 1 + receiver/kafkareceiver/go.sum | 1 + receiver/kafkareceiver/header_extraction.go | 19 ++- .../kafkareceiver/header_extraction_test.go | 1 - receiver/kafkareceiver/kafka_receiver.go | 21 ++- receiver/kafkareceiver/kafka_receiver_test.go | 143 ++++++++++-------- 6 files changed, 107 insertions(+), 79 deletions(-) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 2efaef5f37f86..b32b08530e0cb 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -28,6 +28,7 @@ require ( require ( github.com/aws/aws-sdk-go v1.44.299 // indirect + github.com/benbjohnson/clock v1.3.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.3.0 // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index d2241f845ace0..ee151c0a28427 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -31,6 +31,7 @@ github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+ github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go index 90387bc77bc62..6fed6119fcf93 100644 --- a/receiver/kafkareceiver/header_extraction.go +++ b/receiver/kafkareceiver/header_extraction.go @@ -17,6 +17,12 @@ func getAttribute(key string) string { return fmt.Sprintf("kafka.header.%s", key) } +type HeaderExtractor interface { + extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage) + extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage) + extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage) +} + type headerExtractor struct { logger *zap.Logger headers []string @@ -40,7 +46,7 @@ func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.Co for _, header := range he.headers { value, ok := getHeaderValue(message.Headers, header) if !ok { - he.logger.Debug("Header key not found in the logger: ", zap.String("key", header)) + he.logger.Debug("Header key not found in the log: ", zap.String("key", header)) continue } for i := 0; i < logs.ResourceLogs().Len(); i++ { @@ -75,3 +81,14 @@ func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool // no header found matching the key, report to the user return "", false } + +type nopHeaderExtractor struct{} + +func (he *nopHeaderExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { +} + +func (he *nopHeaderExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { +} + +func (he *nopHeaderExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { +} diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index 1309607311d4f..4b48b0f0f5e9f 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -186,7 +186,6 @@ func TestHeaderExtractionMetrics(t *testing.T) { }, Value: bts, } - // groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 626e35c64846a..83ac64ba2c8b6 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -144,6 +144,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -245,6 +246,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, } if c.headerExtraction { metricsConsumerGroup.headerExtractor = &headerExtractor{ @@ -374,6 +376,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, } if c.headerExtraction { logsConsumerGroup.headerExtractor = &headerExtractor{ @@ -424,7 +427,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking - headerExtractor *headerExtractor + headerExtractor HeaderExtractor } type metricsConsumerGroupHandler struct { @@ -440,7 +443,7 @@ type metricsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking - headerExtractor *headerExtractor + headerExtractor HeaderExtractor } type logsConsumerGroupHandler struct { @@ -456,7 +459,7 @@ type logsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking - headerExtractor *headerExtractor + headerExtractor HeaderExtractor } var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) @@ -513,9 +516,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe return err } - if c.headerExtractor != nil { - c.headerExtractor.extractHeadersTraces(traces, message) - } + c.headerExtractor.extractHeadersTraces(traces, message) spanCount := traces.SpanCount() err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) @@ -590,9 +591,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } return err } - if c.headerExtractor != nil { - c.headerExtractor.extractHeadersMetrics(metrics, message) - } + c.headerExtractor.extractHeadersMetrics(metrics, message) dataPointCount := metrics.DataPointCount() err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) @@ -673,9 +672,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } return err } - if c.headerExtractor != nil { - c.headerExtractor.extractHeadersLogs(logs, message) - } + c.headerExtractor.extractHeadersLogs(logs, message) err = c.nextConsumer.ConsumeLogs(session.Context(), logs) // TODO c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 2497c436faf74..0c7cc523dcb2f 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -139,11 +139,12 @@ func TestTracesConsumerGroupHandler(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -188,11 +189,12 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -234,11 +236,12 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -261,11 +264,12 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -396,11 +400,12 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -445,11 +450,12 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -490,11 +496,12 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -517,11 +524,12 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -651,11 +659,12 @@ func TestLogsConsumerGroupHandler(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -700,11 +709,12 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -745,11 +755,12 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -772,11 +783,12 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -842,11 +854,12 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) { require.NoError(t, err) sink := &consumertest.LogsSink{} c := logsConsumerGroupHandler{ - unmarshaler: unmarshaler, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: sink, - obsrecv: obsrecv, + unmarshaler: unmarshaler, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: sink, + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} From 1b129268971cc6e5ae8ef3eb8fee8f8b1c70e035 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Tue, 8 Aug 2023 18:58:47 +0530 Subject: [PATCH 08/12] update changelog --- .chloggen/header-extraction-kafka.yaml | 2 +- receiver/kafkareceiver/README.md | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.chloggen/header-extraction-kafka.yaml b/.chloggen/header-extraction-kafka.yaml index 1501652f34645..196a5f362a060 100644 --- a/.chloggen/header-extraction-kafka.yaml +++ b/.chloggen/header-extraction-kafka.yaml @@ -12,7 +12,7 @@ component: kafkareceiver note: Allow users to attach kafka header metadata with the log/metric/trace record in the pipeline. Introduce a new config param, 'header_extraction' and some examples. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [24367] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index ad99e43401d67..8e2e08446c3f9 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -107,7 +107,7 @@ receivers: topic: test header_extraction: extract_headers: true - headers: ["header1"] + headers: ["header1", "header2"] ``` - If we feed following kafka record to `test` topic and use above configs: @@ -121,7 +121,8 @@ receivers: } ``` we will get a log record in collector similar to: -```yaml{ +```yaml +{ ... body: Hello, resource: { @@ -132,5 +133,5 @@ we will get a log record in collector similar to: } ``` -- Here you can see the kafka record header `header1` being added to resource attribute. -- Every kafka header key is prefixed with `kafka.header` string and attached to resource attributes. \ No newline at end of file +- Here you can see the kafka record header `header1` and `header2` being added to resource attribute. +- Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes. \ No newline at end of file From 07e672040b764ee55ea249991683c56ae27c8e1f Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Wed, 23 Aug 2023 15:06:14 +0530 Subject: [PATCH 09/12] fix: Shopify -> IBM --- receiver/kafkareceiver/header_extraction.go | 2 +- receiver/kafkareceiver/header_extraction_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go index 6fed6119fcf93..dbbe581fb85b8 100644 --- a/receiver/kafkareceiver/header_extraction.go +++ b/receiver/kafkareceiver/header_extraction.go @@ -6,7 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "fmt" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index 4b48b0f0f5e9f..a4ad701a99c73 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -5,7 +5,7 @@ import ( "sync" "testing" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" From 7286eeed8208f2d8d293e3a15b39ad9107db084d Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Wed, 23 Aug 2023 15:36:59 +0530 Subject: [PATCH 10/12] fix: linting --- receiver/kafkareceiver/README.md | 2 +- receiver/kafkareceiver/header_extraction.go | 6 +++--- receiver/kafkareceiver/header_extraction_test.go | 6 ++++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 8e2e08446c3f9..1f366ac729cdd 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -88,7 +88,7 @@ The following settings can be optionally configured: - `on_error`: (default = false) If false, only the successfully processed messages are marked **Note: this can block the entire partition in case a message processing returns a permanent error** - `header_extraction`: - - `extract_headers` (default = false): Allows user to attach header fields to resourse attributes in otel piepline + - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline - `headers` (default = []): List of headers they'd like to extract from kafka record. **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** Example: diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go index dbbe581fb85b8..265c84fb33db8 100644 --- a/receiver/kafkareceiver/header_extraction.go +++ b/receiver/kafkareceiver/header_extraction.go @@ -84,11 +84,11 @@ func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool type nopHeaderExtractor struct{} -func (he *nopHeaderExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { +func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) { } -func (he *nopHeaderExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { +func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) { } -func (he *nopHeaderExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { +func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) { } diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index a4ad701a99c73..a9e26bff6be05 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package kafkareceiver import ( @@ -21,6 +24,7 @@ import ( func TestHeaderExtractionTraces(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + require.NoError(t, err) nextConsumer := &consumertest.TracesSink{} c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), @@ -80,6 +84,7 @@ func TestHeaderExtractionTraces(t *testing.T) { func TestHeaderExtractionLogs(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + require.NoError(t, err) nextConsumer := &consumertest.LogsSink{} unmarshaler := newTextLogsUnmarshaler() unmarshaler, err = unmarshaler.WithEnc("utf-8") @@ -136,6 +141,7 @@ func TestHeaderExtractionLogs(t *testing.T) { func TestHeaderExtractionMetrics(t *testing.T) { obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + require.NoError(t, err) nextConsumer := &consumertest.MetricsSink{} c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), From 8c81cfea8f6ea68d23f523c3fc476b59806b85a6 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 22 Sep 2023 02:57:14 +0530 Subject: [PATCH 11/12] fix: run go mod tidy --- receiver/kafkareceiver/go.mod | 1 - receiver/kafkareceiver/go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index c6f1405ed4635..77712fe594161 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -27,7 +27,6 @@ require ( require ( github.com/aws/aws-sdk-go v1.45.12 // indirect - github.com/benbjohnson/clock v1.3.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.4.0 // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 7694cc9204c62..2ce409f71dc14 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -29,8 +29,6 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72H github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+IkPchKS7p7c2YPKwHmBOk= github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= From bc01580e1f8ec9b887499f10cf08689aad2684e6 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 22 Sep 2023 03:32:29 +0530 Subject: [PATCH 12/12] remove deprecated obsreport.NewReceiver --- receiver/kafkareceiver/go.mod | 3 +-- receiver/kafkareceiver/go.sum | 1 - receiver/kafkareceiver/header_extraction_test.go | 14 ++++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 9b00e8a5ae8e0..77712fe594161 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -15,7 +15,6 @@ require ( github.com/openzipkin/zipkin-go v0.4.2 github.com/stretchr/testify v1.8.4 go.opencensus.io v0.24.0 - go.opentelemetry.io/collector v0.85.1-0.20230921012510-68dd7d763b59 go.opentelemetry.io/collector/component v0.85.1-0.20230921012510-68dd7d763b59 go.opentelemetry.io/collector/config/configtls v0.85.1-0.20230921012510-68dd7d763b59 go.opentelemetry.io/collector/confmap v0.85.1-0.20230921012510-68dd7d763b59 @@ -64,12 +63,12 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + go.opentelemetry.io/collector v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/collector/config/configopaque v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/collector/exporter v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/collector/extension v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014.0.20230921012510-68dd7d763b59 // indirect - go.opentelemetry.io/collector/processor v0.85.1-0.20230921012510-68dd7d763b59 // indirect go.opentelemetry.io/otel v1.18.0 // indirect go.opentelemetry.io/otel/metric v1.18.0 // indirect go.opentelemetry.io/otel/sdk v1.18.0 // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index a2f578ffbc2f9..2ce409f71dc14 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -361,7 +361,6 @@ go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014.0.20230921012510-68dd7d go.opentelemetry.io/collector/pdata v1.0.0-rcv0014.0.20230921012510-68dd7d763b59 h1:PyJRh5f3tIo9aJlT/SjJQGARhGPwKvDQvEns9hDWlzg= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014.0.20230921012510-68dd7d763b59/go.mod h1:I1PqyHJlsXjANC73tp43nDId7/jiv82NoZZ6uS0xdwM= go.opentelemetry.io/collector/processor v0.85.1-0.20230921012510-68dd7d763b59 h1:Xe2EAhB4fsvKCxo+ih8Qemecg2KBHsHxcezUCoUKAGg= -go.opentelemetry.io/collector/processor v0.85.1-0.20230921012510-68dd7d763b59/go.mod h1:QliJdrjNuRklEmVbybxCBgmBc/FCQM0OeHUkGr467sE= go.opentelemetry.io/collector/receiver v0.85.1-0.20230921012510-68dd7d763b59 h1:WN84hQgGR46w3hxdbNb5bD9gOcLZA0nfQHKvpOVN9vM= go.opentelemetry.io/collector/receiver v0.85.1-0.20230921012510-68dd7d763b59/go.mod h1:+mPZilZtjM3SZJRyzV+A/QcJeBaEH0SeDsvH+/Tls7Y= go.opentelemetry.io/collector/semconv v0.85.1-0.20230921012510-68dd7d763b59 h1:S6ZiU/Ol6+rf1B+xBvogCYs2Ac82I3vYhxi0aInBth4= diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go index a9e26bff6be05..c5219c219b55e 100644 --- a/receiver/kafkareceiver/header_extraction_test.go +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -12,10 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap/zaptest" @@ -23,7 +23,9 @@ import ( ) func TestHeaderExtractionTraces(t *testing.T) { - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) require.NoError(t, err) nextConsumer := &consumertest.TracesSink{} c := tracesConsumerGroupHandler{ @@ -83,7 +85,9 @@ func TestHeaderExtractionTraces(t *testing.T) { } func TestHeaderExtractionLogs(t *testing.T) { - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) require.NoError(t, err) nextConsumer := &consumertest.LogsSink{} unmarshaler := newTextLogsUnmarshaler() @@ -140,7 +144,9 @@ func TestHeaderExtractionLogs(t *testing.T) { } func TestHeaderExtractionMetrics(t *testing.T) { - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) require.NoError(t, err) nextConsumer := &consumertest.MetricsSink{} c := metricsConsumerGroupHandler{