diff --git a/.chloggen/kafkareceiver-generic.yaml b/.chloggen/kafkareceiver-generic.yaml new file mode 100644 index 0000000000000..60684750e1a07 --- /dev/null +++ b/.chloggen/kafkareceiver-generic.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `max_fetch_wait` config setting + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [39360] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This setting allows you to specify the maximum time that the broker will wait for + min_fetch_size bytes of data to be available before sending a response to the client. + Defaults to 250ms. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/kafka/client.go b/internal/kafka/client.go index f4bb842fd55a6..89ba4be94952c 100644 --- a/internal/kafka/client.go +++ b/internal/kafka/client.go @@ -59,6 +59,7 @@ func NewSaramaConsumerGroup( saramaConfig.Consumer.Fetch.Min = consumerConfig.MinFetchSize saramaConfig.Consumer.Fetch.Default = consumerConfig.DefaultFetchSize saramaConfig.Consumer.Fetch.Max = consumerConfig.MaxFetchSize + saramaConfig.Consumer.MaxWaitTime = consumerConfig.MaxFetchWait saramaConfig.Consumer.Offsets.AutoCommit.Enable = consumerConfig.AutoCommit.Enable saramaConfig.Consumer.Offsets.AutoCommit.Interval = consumerConfig.AutoCommit.Interval saramaConfig.Consumer.Offsets.Initial = saramaInitialOffsets[consumerConfig.InitialOffset] diff --git a/internal/kafka/configkafka/config.go b/internal/kafka/configkafka/config.go index aab21ab9155fb..927ea07cf5954 100644 --- a/internal/kafka/configkafka/config.go +++ b/internal/kafka/configkafka/config.go @@ -98,6 +98,10 @@ type ConsumerConfig struct { // The maximum bytes per fetch from Kafka (default "0", no limit) MaxFetchSize int32 `mapstructure:"max_fetch_size"` + + // The maximum amount of time to wait for MinFetchSize bytes to be + // available before the broker returns a response (default 250ms) + MaxFetchWait time.Duration `mapstructure:"max_fetch_wait"` } func NewDefaultConsumerConfig() ConsumerConfig { @@ -112,6 +116,7 @@ func NewDefaultConsumerConfig() ConsumerConfig { }, MinFetchSize: 1, MaxFetchSize: 0, + MaxFetchWait: 250 * time.Millisecond, DefaultFetchSize: 1048576, } } diff --git a/internal/kafka/configkafka/config_test.go b/internal/kafka/configkafka/config_test.go index 32c86341f8894..68bda2247e317 100644 --- a/internal/kafka/configkafka/config_test.go +++ b/internal/kafka/configkafka/config_test.go @@ -141,6 +141,7 @@ func TestConsumerConfig(t *testing.T) { MinFetchSize: 10, DefaultFetchSize: 1024, MaxFetchSize: 4096, + MaxFetchWait: 1 * time.Second, }, }, diff --git a/internal/kafka/configkafka/testdata/consumer_config.yaml b/internal/kafka/configkafka/testdata/consumer_config.yaml index ea585f9496243..61cc8f11fc4bd 100644 --- a/internal/kafka/configkafka/testdata/consumer_config.yaml +++ b/internal/kafka/configkafka/testdata/consumer_config.yaml @@ -10,6 +10,7 @@ kafka/full: min_fetch_size: 10 default_fetch_size: 1024 max_fetch_size: 4096 + max_fetch_wait: 1s # Invalid configurations kafka/invalid_initial_offset: diff --git a/internal/kafka/kafkatest/cluster.go b/internal/kafka/kafkatest/cluster.go index 2f6e9e8a6e49a..315a5c7a7141c 100644 --- a/internal/kafka/kafkatest/cluster.go +++ b/internal/kafka/kafkatest/cluster.go @@ -21,5 +21,7 @@ func NewCluster(tb testing.TB, opts ...kfake.Opt) (*kfake.Cluster, configkafka.C cfg := configkafka.NewDefaultClientConfig() cfg.Brokers = cluster.ListenAddrs() + // We need to set the protocol version to 2.3.0 to make Sarama happy. + cfg.ProtocolVersion = "2.3.0" return cluster, cfg } diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index ef8859267ed96..89612314c7ac2 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -47,6 +47,7 @@ The following settings can be optionally configured: - `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte. - `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB. - `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited. +- `max_fetch_wait` (default = `250ms`): The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway. - `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options. - `auth` - `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.) diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 3dbf17c69e4da..c907afca29bc5 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -41,7 +41,7 @@ func TestWithTracesUnmarshalers(t *testing.T) { cfg := createDefaultConfig() cfg.Traces.Encoding = "custom" receiver, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - tracesConsumer, ok := receiver.(*kafkaTracesConsumer) + tracesConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, "custom", tracesConsumer.config.Traces.Encoding) require.NoError(t, err) @@ -50,7 +50,7 @@ func TestWithTracesUnmarshalers(t *testing.T) { t.Run("default_encoding", func(t *testing.T) { cfg := createDefaultConfig() receiver, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - tracesConsumer, ok := receiver.(*kafkaTracesConsumer) + tracesConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, defaultTracesEncoding, tracesConsumer.config.Traces.Encoding) require.NoError(t, err) @@ -75,7 +75,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) { cfg := createDefaultConfig() cfg.Metrics.Encoding = "custom" receiver, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - metricsConsumer, ok := receiver.(*kafkaMetricsConsumer) + metricsConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, "custom", metricsConsumer.config.Metrics.Encoding) require.NoError(t, err) @@ -84,7 +84,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) { t.Run("default_encoding", func(t *testing.T) { cfg := createDefaultConfig() receiver, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - metricsConsumer, ok := receiver.(*kafkaMetricsConsumer) + metricsConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, defaultMetricsEncoding, metricsConsumer.config.Metrics.Encoding) require.NoError(t, err) @@ -109,7 +109,7 @@ func TestWithLogsUnmarshalers(t *testing.T) { cfg := createDefaultConfig() cfg.Logs.Encoding = "custom" receiver, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - logsConsumer, ok := receiver.(*kafkaLogsConsumer) + logsConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, "custom", logsConsumer.config.Logs.Encoding) require.NoError(t, err) @@ -118,7 +118,7 @@ func TestWithLogsUnmarshalers(t *testing.T) { t.Run("default_encoding", func(t *testing.T) { cfg := createDefaultConfig() receiver, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) - logsConsumer, ok := receiver.(*kafkaLogsConsumer) + logsConsumer, ok := receiver.(*kafkaConsumer) require.True(t, ok) require.Equal(t, defaultLogsEncoding, logsConsumer.config.Logs.Encoding) require.NoError(t, err) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 4ca27626fd848..21ed39f5a624f 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -14,7 +14,11 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.124.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.124.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.124.1 + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/stretchr/testify v1.10.0 + github.com/twmb/franz-go v1.18.1 + github.com/twmb/franz-go/pkg/kadm v1.16.0 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327 go.opentelemetry.io/collector/client v1.30.1-0.20250422165940-c47951a8bf71 go.opentelemetry.io/collector/component v1.30.1-0.20250422165940-c47951a8bf71 go.opentelemetry.io/collector/component/componenttest v0.124.1-0.20250422165940-c47951a8bf71 @@ -88,8 +92,8 @@ require ( github.com/openzipkin/zipkin-go v0.4.3 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/relvacode/iso8601 v1.6.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 0730909566536..64f3aea8e3d0c 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -145,6 +145,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= +github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= +github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327 h1:E2rCVOpwEnB6F0cUpwPNyzfRYfHee0IfHbUVSB5rH6I= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327/go.mod h1:zCgWGv7Rg9B70WV6T+tUbifRJnx60gGTFU/U4xZpyUA= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go deleted file mode 100644 index efae723c2011f..0000000000000 --- a/receiver/kafkareceiver/header_extraction.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" - -import ( - "github.com/IBM/sarama" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.uber.org/zap" -) - -func getAttribute(key string) string { - return "kafka.header." + key -} - -type HeaderExtractor interface { - extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage) - extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage) - extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage) -} - -type headerExtractor struct { - logger *zap.Logger - headers []string -} - -func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { - for _, header := range he.headers { - value, ok := getHeaderValue(message.Headers, header) - if !ok { - he.logger.Debug("Header key not found in the trace: ", zap.String("key", header)) - continue - } - for i := 0; i < traces.ResourceSpans().Len(); i++ { - rs := traces.ResourceSpans().At(i) - rs.Resource().Attributes().PutStr(getAttribute(header), value) - } - } -} - -func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { - for _, header := range he.headers { - value, ok := getHeaderValue(message.Headers, header) - if !ok { - he.logger.Debug("Header key not found in the log: ", zap.String("key", header)) - continue - } - for i := 0; i < logs.ResourceLogs().Len(); i++ { - rl := logs.ResourceLogs().At(i) - rl.Resource().Attributes().PutStr(getAttribute(header), value) - } - } -} - -func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { - for _, header := range he.headers { - value, ok := getHeaderValue(message.Headers, header) - if !ok { - he.logger.Debug("Header key not found in the metric: ", zap.String("key", header)) - continue - } - for i := 0; i < metrics.ResourceMetrics().Len(); i++ { - rm := metrics.ResourceMetrics().At(i) - rm.Resource().Attributes().PutStr(getAttribute(header), value) - } - } -} - -func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) { - for _, kafkaHeader := range headers { - headerKey := string(kafkaHeader.Key) - if headerKey == header { - // matching header found - return string(kafkaHeader.Value), true - } - } - // no header found matching the key, report to the user - return "", false -} - -type nopHeaderExtractor struct{} - -func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) { -} - -func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) { -} - -func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) { -} diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go deleted file mode 100644 index 7a05045ef6499..0000000000000 --- a/receiver/kafkareceiver/header_extraction_test.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver - -import ( - "context" - "sync" - "testing" - - "github.com/IBM/sarama" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/pdata/testdata" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap/zaptest" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/unmarshaler" -) - -func TestHeaderExtractionTraces(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), - }) - require.NoError(t, err) - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) - require.NoError(t, err) - nextConsumer := &consumertest.TracesSink{} - c := tracesConsumerGroupHandler{ - unmarshaler: &ptrace.ProtoUnmarshaler{}, - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, - telemetryBuilder: telemetryBuilder, - } - headers := []string{"headerKey1", "headerKey2"} - c.headerExtractor = &headerExtractor{ - logger: zaptest.NewLogger(t), - headers: headers, - } - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - ctx, cancelFunc := context.WithCancel(context.Background()) - defer close(groupClaim.messageChan) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - err = c.ConsumeClaim(testSession, groupClaim) - for _, trace := range nextConsumer.AllTraces() { - for i := 0; i < trace.ResourceSpans().Len(); i++ { - rs := trace.ResourceSpans().At(i) - validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValue1") - validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValue2") - } - } - assert.NoError(t, err) - wg.Done() - }() - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty().Resource() - td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty() - unmarshaler := &ptrace.ProtoMarshaler{} - bts, err := unmarshaler.MarshalTraces(td) - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("headerKey1"), - Value: []byte("headerValue1"), - }, - { - Key: []byte("headerKey2"), - Value: []byte("headerValue2"), - }, - }, - Value: bts, - } - cancelFunc() - wg.Wait() -} - -func TestHeaderExtractionLogs(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), - }) - require.NoError(t, err) - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) - require.NoError(t, err) - nextConsumer := &consumertest.LogsSink{} - unmarshaler, err := unmarshaler.NewTextLogsUnmarshaler("utf-8") - require.NoError(t, err) - c := logsConsumerGroupHandler{ - unmarshaler: unmarshaler, - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, - telemetryBuilder: telemetryBuilder, - } - headers := []string{"headerKey1", "headerKey2"} - c.headerExtractor = &headerExtractor{ - logger: zaptest.NewLogger(t), - headers: headers, - } - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - ctx, cancelFunc := context.WithCancel(context.Background()) - defer close(groupClaim.messageChan) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - err = c.ConsumeClaim(testSession, groupClaim) - for _, logs := range nextConsumer.AllLogs() { - for i := 0; i < logs.ResourceLogs().Len(); i++ { - rs := logs.ResourceLogs().At(i) - validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueLog1") - validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueLog2") - } - } - assert.NoError(t, err) - wg.Done() - }() - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("headerKey1"), - Value: []byte("headerValueLog1"), - }, - { - Key: []byte("headerKey2"), - Value: []byte("headerValueLog2"), - }, - }, - Value: []byte("Message"), - } - cancelFunc() - wg.Wait() -} - -func TestHeaderExtractionMetrics(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), - }) - require.NoError(t, err) - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) - require.NoError(t, err) - nextConsumer := &consumertest.MetricsSink{} - c := metricsConsumerGroupHandler{ - unmarshaler: &pmetric.ProtoUnmarshaler{}, - logger: zaptest.NewLogger(t), - ready: make(chan bool), - nextConsumer: nextConsumer, - obsrecv: obsrecv, - telemetryBuilder: telemetryBuilder, - } - headers := []string{"headerKey1", "headerKey2"} - c.headerExtractor = &headerExtractor{ - logger: zaptest.NewLogger(t), - headers: headers, - } - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - ctx, cancelFunc := context.WithCancel(context.Background()) - defer close(groupClaim.messageChan) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - err = c.ConsumeClaim(testSession, groupClaim) - for _, metric := range nextConsumer.AllMetrics() { - for i := 0; i < metric.ResourceMetrics().Len(); i++ { - rs := metric.ResourceMetrics().At(i) - validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueMetric1") - validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueMetric2") - } - } - assert.NoError(t, err) - wg.Done() - }() - ld := testdata.GenerateMetrics(1) - unmarshaler := &pmetric.ProtoMarshaler{} - bts, err := unmarshaler.MarshalMetrics(ld) - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("headerKey1"), - Value: []byte("headerValueMetric1"), - }, - { - Key: []byte("headerKey2"), - Value: []byte("headerValueMetric2"), - }, - }, - Value: bts, - } - cancelFunc() - wg.Wait() -} - -func validateHeader(t *testing.T, rs pcommon.Resource, headerKey string, headerValue string) { - val, ok := rs.Attributes().Get(headerKey) - assert.True(t, ok) - assert.Equal(t, val.Str(), headerValue) -} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 5f1c4303d272e..c06405759a29f 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" "errors" + "iter" "strconv" "sync" "time" @@ -16,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -38,312 +40,297 @@ const ( var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") -// kafkaTracesConsumer uses sarama to consume and handle messages from kafka. -type kafkaTracesConsumer struct { - config *Config - consumerGroup sarama.ConsumerGroup - nextConsumer consumer.Traces - topics []string - cancelConsumeLoop context.CancelFunc - unmarshaler ptrace.Unmarshaler - consumeLoopWG *sync.WaitGroup +type consumeMessageFunc func(ctx context.Context, message *sarama.ConsumerMessage) error - settings receiver.Settings - telemetryBuilder *metadata.TelemetryBuilder +// messageHandler provides a generic interface for handling messages for a pdata type. +type messageHandler[T plog.Logs | pmetric.Metrics | ptrace.Traces] interface { + // unmarshalData unmarshals the message payload into a pdata type (plog.Logs, etc.) + // and returns the number of items (log records, metric data points, spans) within it. + unmarshalData(data []byte) (T, int, error) - autocommitEnabled bool - messageMarking MessageMarking - headerExtraction bool - headers []string - minFetchSize int32 - defaultFetchSize int32 - maxFetchSize int32 -} + // consumeData passes the unmarshaled data to the next consumer for the signal type. + // This simply calls the signal-specific Consume* method. + consumeData(ctx context.Context, data T) error -// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. -type kafkaMetricsConsumer struct { - config *Config - consumerGroup sarama.ConsumerGroup - nextConsumer consumer.Metrics - topics []string - cancelConsumeLoop context.CancelFunc - unmarshaler pmetric.Unmarshaler - consumeLoopWG *sync.WaitGroup + // getResources returns the resources associated with the unmarshaled data. + // This is used for header extraction for adding resource attributes. + getResources(T) iter.Seq[pcommon.Resource] - settings receiver.Settings - telemetryBuilder *metadata.TelemetryBuilder + // startObsReport starts an observation report for the unmarshaled data. + // + // This simply calls the the signal-specific receiverhelper.ObsReport.Start*Op method. + startObsReport(ctx context.Context) context.Context - autocommitEnabled bool - messageMarking MessageMarking - headerExtraction bool - headers []string - minFetchSize int32 - defaultFetchSize int32 - maxFetchSize int32 + // endObsReport ends the observation report for the unmarshaled data. + // + // This simply calls the signal-specific receiverherlper.ObsReport.End*Op method, + // passing the configured encoding and number of items returned by unmarshalData. + endObsReport(ctx context.Context, n int, err error) } -// kafkaLogsConsumer uses sarama to consume and handle messages from kafka. -type kafkaLogsConsumer struct { - config *Config - consumerGroup sarama.ConsumerGroup - nextConsumer consumer.Logs - topics []string - cancelConsumeLoop context.CancelFunc - unmarshaler plog.Unmarshaler - consumeLoopWG *sync.WaitGroup +func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Logs) (receiver.Logs, error) { + newConsumeMessageFunc := func(c *consumerGroupHandler, host component.Host) (consumeMessageFunc, error) { + unmarshaler, err := newLogsUnmarshaler(config.Logs.Encoding, set, host) + if err != nil { + return nil, err + } + return newMessageHandlerConsumeFunc( + config, set.Logger, + c.telemetryBuilder.KafkaReceiverUnmarshalFailedLogRecords, + []metric.AddOption{ + metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())), + }, + &logsHandler{ + unmarshaler: unmarshaler, + obsrecv: c.obsrecv, + consumer: nextConsumer, + encoding: config.Logs.Encoding, + }, + ), nil + } + return newKafkaConsumer(config, set, []string{config.Logs.Topic}, newConsumeMessageFunc) +} + +func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Metrics) (receiver.Metrics, error) { + newConsumeMessageFunc := func(c *consumerGroupHandler, host component.Host) (consumeMessageFunc, error) { + unmarshaler, err := newMetricsUnmarshaler(config.Metrics.Encoding, set, host) + if err != nil { + return nil, err + } + return newMessageHandlerConsumeFunc( + config, set.Logger, + c.telemetryBuilder.KafkaReceiverUnmarshalFailedMetricPoints, + []metric.AddOption{ + metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())), + }, + &metricsHandler{ + unmarshaler: unmarshaler, + obsrecv: c.obsrecv, + consumer: nextConsumer, + encoding: config.Metrics.Encoding, + }, + ), nil + } + return newKafkaConsumer(config, set, []string{config.Metrics.Topic}, newConsumeMessageFunc) +} + +func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (receiver.Traces, error) { + newConsumeMessageFunc := func(c *consumerGroupHandler, host component.Host) (consumeMessageFunc, error) { + unmarshaler, err := newTracesUnmarshaler(config.Traces.Encoding, set, host) + if err != nil { + return nil, err + } + return newMessageHandlerConsumeFunc( + config, set.Logger, + c.telemetryBuilder.KafkaReceiverUnmarshalFailedSpans, + []metric.AddOption{ + metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())), + }, + &tracesHandler{ + unmarshaler: unmarshaler, + obsrecv: c.obsrecv, + consumer: nextConsumer, + encoding: config.Traces.Encoding, + }, + ), nil + } + return newKafkaConsumer(config, set, []string{config.Traces.Topic}, newConsumeMessageFunc) +} + +func newMessageHandlerConsumeFunc[T plog.Logs | pmetric.Metrics | ptrace.Traces]( + config *Config, + logger *zap.Logger, + unmarshalFailedCounter metric.Int64Counter, + metricAddOpts []metric.AddOption, + h messageHandler[T], +) consumeMessageFunc { + return func(ctx context.Context, message *sarama.ConsumerMessage) (err error) { + ctx = h.startObsReport(ctx) + var data T + var n int + defer func() { + h.endObsReport(ctx, n, err) + }() + + data, n, err = h.unmarshalData(message.Value) + if err != nil { + logger.Error("failed to unmarshal message", zap.Error(err)) + unmarshalFailedCounter.Add(ctx, 1, metricAddOpts...) + return err + } - settings receiver.Settings - telemetryBuilder *metadata.TelemetryBuilder + if config.HeaderExtraction.ExtractHeaders { + for key, value := range getMessageHeaderResourceAttributes( + message, config.HeaderExtraction.Headers, + ) { + for resource := range h.getResources(data) { + resource.Attributes().PutStr(key, value) + } + } + } - autocommitEnabled bool - messageMarking MessageMarking - headerExtraction bool - headers []string - minFetchSize int32 - defaultFetchSize int32 - maxFetchSize int32 + return h.consumeData(ctx, data) + } } -var ( - _ receiver.Traces = (*kafkaTracesConsumer)(nil) - _ receiver.Metrics = (*kafkaMetricsConsumer)(nil) - _ receiver.Logs = (*kafkaLogsConsumer)(nil) -) +type logsHandler struct { + unmarshaler plog.Unmarshaler + obsrecv *receiverhelper.ObsReport + consumer consumer.Logs + encoding string +} -func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) +func (h *logsHandler) unmarshalData(data []byte) (plog.Logs, int, error) { + logs, err := h.unmarshaler.UnmarshalLogs(data) if err != nil { - return nil, err + return plog.Logs{}, 0, err } + return logs, logs.LogRecordCount(), nil +} - return &kafkaTracesConsumer{ - config: config, - topics: []string{config.Traces.Topic}, - nextConsumer: nextConsumer, - consumeLoopWG: &sync.WaitGroup{}, - settings: set, - autocommitEnabled: config.AutoCommit.Enable, - messageMarking: config.MessageMarking, - headerExtraction: config.HeaderExtraction.ExtractHeaders, - headers: config.HeaderExtraction.Headers, - telemetryBuilder: telemetryBuilder, - minFetchSize: config.MinFetchSize, - defaultFetchSize: config.DefaultFetchSize, - maxFetchSize: config.MaxFetchSize, - }, nil +func (h *logsHandler) consumeData(ctx context.Context, data plog.Logs) error { + return h.consumer.ConsumeLogs(ctx, data) } -func createKafkaClient(ctx context.Context, config *Config) (sarama.ConsumerGroup, error) { - return kafka.NewSaramaConsumerGroup(ctx, config.ClientConfig, config.ConsumerConfig) +func (h *logsHandler) startObsReport(ctx context.Context) context.Context { + return h.obsrecv.StartLogsOp(ctx) } -func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancel - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: c.settings.ID, - Transport: transport, - ReceiverCreateSettings: c.settings, - }) - if err != nil { - return err +func (h *logsHandler) endObsReport(ctx context.Context, n int, err error) { + h.obsrecv.EndLogsOp(ctx, h.encoding, n, err) +} + +func (h *logsHandler) getResources(data plog.Logs) iter.Seq[pcommon.Resource] { + return func(yield func(pcommon.Resource) bool) { + for _, rm := range data.ResourceLogs().All() { + if !yield(rm.Resource()) { + return + } + } } +} - unmarshaler, err := newTracesUnmarshaler(c.config.Traces.Encoding, c.settings, host) +type metricsHandler struct { + unmarshaler pmetric.Unmarshaler + obsrecv *receiverhelper.ObsReport + consumer consumer.Metrics + encoding string +} + +func (h *metricsHandler) unmarshalData(data []byte) (pmetric.Metrics, int, error) { + metrics, err := h.unmarshaler.UnmarshalMetrics(data) if err != nil { - return err + return pmetric.Metrics{}, 0, err } - c.unmarshaler = unmarshaler + return metrics, metrics.DataPointCount(), nil +} - // consumerGroup may be set in tests to inject fake implementation. - if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { - return err - } - } - consumerGroup := &tracesConsumerGroupHandler{ - id: c.settings.ID, - logger: c.settings.Logger, - encoding: c.config.Traces.Encoding, - unmarshaler: c.unmarshaler, - nextConsumer: c.nextConsumer, - ready: make(chan bool), - obsrecv: obsrecv, - autocommitEnabled: c.autocommitEnabled, - messageMarking: c.messageMarking, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: c.telemetryBuilder, - backOff: newExponentialBackOff(c.config.ErrorBackOff), - } - if c.headerExtraction { - consumerGroup.headerExtractor = &headerExtractor{ - logger: c.settings.Logger, - headers: c.headers, - } - } - c.consumeLoopWG.Add(1) - go c.consumeLoop(ctx, consumerGroup) - <-consumerGroup.ready - return nil +func (h *metricsHandler) consumeData(ctx context.Context, data pmetric.Metrics) error { + return h.consumer.ConsumeMetrics(ctx, data) } -func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { - defer c.consumeLoopWG.Done() - for { - // `Consume` should be called inside an infinite loop, when a - // server-side rebalance happens, the consumer session will need to be - // recreated to get the new claims - if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { - c.settings.Logger.Error("Error from consumer", zap.Error(err)) - } - // check if context was cancelled, signaling that the consumer should stop - if ctx.Err() != nil { - c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return - } - } +func (h *metricsHandler) startObsReport(ctx context.Context) context.Context { + return h.obsrecv.StartMetricsOp(ctx) } -func (c *kafkaTracesConsumer) Shutdown(context.Context) error { - if c.cancelConsumeLoop == nil { - return nil - } - c.cancelConsumeLoop() - c.consumeLoopWG.Wait() - if c.consumerGroup == nil { - return nil - } - return c.consumerGroup.Close() +func (h *metricsHandler) endObsReport(ctx context.Context, n int, err error) { + h.obsrecv.EndMetricsOp(ctx, h.encoding, n, err) } -func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) - if err != nil { - return nil, err +func (h *metricsHandler) getResources(data pmetric.Metrics) iter.Seq[pcommon.Resource] { + return func(yield func(pcommon.Resource) bool) { + for _, rm := range data.ResourceMetrics().All() { + if !yield(rm.Resource()) { + return + } + } } +} - return &kafkaMetricsConsumer{ - config: config, - topics: []string{config.Metrics.Topic}, - nextConsumer: nextConsumer, - consumeLoopWG: &sync.WaitGroup{}, - settings: set, - autocommitEnabled: config.AutoCommit.Enable, - messageMarking: config.MessageMarking, - headerExtraction: config.HeaderExtraction.ExtractHeaders, - headers: config.HeaderExtraction.Headers, - telemetryBuilder: telemetryBuilder, - minFetchSize: config.MinFetchSize, - defaultFetchSize: config.DefaultFetchSize, - maxFetchSize: config.MaxFetchSize, - }, nil +type tracesHandler struct { + unmarshaler ptrace.Unmarshaler + obsrecv *receiverhelper.ObsReport + consumer consumer.Traces + encoding string } -func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancel - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: c.settings.ID, - Transport: transport, - ReceiverCreateSettings: c.settings, - }) +func (h *tracesHandler) unmarshalData(data []byte) (ptrace.Traces, int, error) { + traces, err := h.unmarshaler.UnmarshalTraces(data) if err != nil { - return err + return ptrace.Traces{}, 0, err } + return traces, traces.SpanCount(), nil +} - unmarshaler, err := newMetricsUnmarshaler(c.config.Metrics.Encoding, c.settings, host) - if err != nil { - return err - } - c.unmarshaler = unmarshaler +func (h *tracesHandler) consumeData(ctx context.Context, data ptrace.Traces) error { + return h.consumer.ConsumeTraces(ctx, data) +} - // consumerGroup may be set in tests to inject fake implementation. - if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { - return err - } - } - metricsConsumerGroup := &metricsConsumerGroupHandler{ - id: c.settings.ID, - logger: c.settings.Logger, - encoding: c.config.Metrics.Encoding, - unmarshaler: c.unmarshaler, - nextConsumer: c.nextConsumer, - ready: make(chan bool), - obsrecv: obsrecv, - autocommitEnabled: c.autocommitEnabled, - messageMarking: c.messageMarking, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: c.telemetryBuilder, - backOff: newExponentialBackOff(c.config.ErrorBackOff), - } - if c.headerExtraction { - metricsConsumerGroup.headerExtractor = &headerExtractor{ - logger: c.settings.Logger, - headers: c.headers, - } - } - c.consumeLoopWG.Add(1) - go c.consumeLoop(ctx, metricsConsumerGroup) - <-metricsConsumerGroup.ready - return nil +func (h *tracesHandler) startObsReport(ctx context.Context) context.Context { + return h.obsrecv.StartTracesOp(ctx) } -func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { - defer c.consumeLoopWG.Done() - for { - // `Consume` should be called inside an infinite loop, when a - // server-side rebalance happens, the consumer session will need to be - // recreated to get the new claims - if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { - c.settings.Logger.Error("Error from consumer", zap.Error(err)) - } - // check if context was cancelled, signaling that the consumer should stop - if ctx.Err() != nil { - c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return - } - } +func (h *tracesHandler) endObsReport(ctx context.Context, n int, err error) { + h.obsrecv.EndTracesOp(ctx, h.encoding, n, err) } -func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { - if c.cancelConsumeLoop == nil { - return nil - } - c.cancelConsumeLoop() - c.consumeLoopWG.Wait() - if c.consumerGroup == nil { - return nil +func (h *tracesHandler) getResources(data ptrace.Traces) iter.Seq[pcommon.Resource] { + return func(yield func(pcommon.Resource) bool) { + for _, rm := range data.ResourceSpans().All() { + if !yield(rm.Resource()) { + return + } + } } - return c.consumerGroup.Close() } -func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { +func newKafkaConsumer( + config *Config, + set receiver.Settings, + topics []string, + newConsumeMessageFunc func(*consumerGroupHandler, component.Host) (consumeMessageFunc, error), +) (*kafkaConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } - return &kafkaLogsConsumer{ - config: config, - topics: []string{config.Logs.Topic}, - nextConsumer: nextConsumer, - consumeLoopWG: &sync.WaitGroup{}, - settings: set, - autocommitEnabled: config.AutoCommit.Enable, - messageMarking: config.MessageMarking, - headerExtraction: config.HeaderExtraction.ExtractHeaders, - headers: config.HeaderExtraction.Headers, - telemetryBuilder: telemetryBuilder, - minFetchSize: config.MinFetchSize, - defaultFetchSize: config.DefaultFetchSize, - maxFetchSize: config.MaxFetchSize, + return &kafkaConsumer{ + config: config, + topics: topics, + newConsumeMessageFunc: newConsumeMessageFunc, + settings: set, + telemetryBuilder: telemetryBuilder, }, nil } -func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error { - ctx, cancel := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancel +// kafkaConsumer consumes messages from a set of Kafka topics, +// decodes telemetry data using a given unmarshaler, and passes +// them to a consumer. +type kafkaConsumer struct { + config *Config + topics []string + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder + newConsumeMessageFunc func(*consumerGroupHandler, component.Host) (consumeMessageFunc, error) + + mu sync.Mutex + started bool + shutdown bool + consumeLoopClosed chan struct{} + consumerGroup sarama.ConsumerGroup +} + +func (c *kafkaConsumer) Start(_ context.Context, host component.Host) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.shutdown { + return errors.New("kafka consumer already shut down") + } + if c.started { + return errors.New("kafka consumer already started") + } + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: c.settings.ID, Transport: transport, @@ -353,484 +340,206 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error return err } - unmarshaler, err := newLogsUnmarshaler(c.config.Logs.Encoding, c.settings, host) + consumerGroup, err := kafka.NewSaramaConsumerGroup( + context.Background(), + c.config.ClientConfig, + c.config.ConsumerConfig, + ) if err != nil { return err } - c.unmarshaler = unmarshaler + c.consumerGroup = consumerGroup - // consumerGroup may be set in tests to inject fake implementation. - if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { - return err - } - } - logsConsumerGroup := &logsConsumerGroupHandler{ + handler := &consumerGroupHandler{ id: c.settings.ID, logger: c.settings.Logger, - encoding: c.config.Logs.Encoding, - unmarshaler: c.unmarshaler, - nextConsumer: c.nextConsumer, ready: make(chan bool), obsrecv: obsrecv, - autocommitEnabled: c.autocommitEnabled, - messageMarking: c.messageMarking, - headerExtractor: &nopHeaderExtractor{}, + autocommitEnabled: c.config.AutoCommit.Enable, + messageMarking: c.config.MessageMarking, telemetryBuilder: c.telemetryBuilder, backOff: newExponentialBackOff(c.config.ErrorBackOff), } - if c.headerExtraction { - logsConsumerGroup.headerExtractor = &headerExtractor{ - logger: c.settings.Logger, - headers: c.headers, - } + consumeMessage, err := c.newConsumeMessageFunc(handler, host) + if err != nil { + return err } - c.consumeLoopWG.Add(1) - go c.consumeLoop(ctx, logsConsumerGroup) - <-logsConsumerGroup.ready + handler.consumeMessage = consumeMessage + + c.consumeLoopClosed = make(chan struct{}) + c.started = true + go c.consumeLoop(handler) return nil } -func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { - defer c.consumeLoopWG.Done() +func (c *kafkaConsumer) consumeLoop(handler sarama.ConsumerGroupHandler) { + defer close(c.consumeLoopClosed) + + ctx := context.Background() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) + return + } c.settings.Logger.Error("Error from consumer", zap.Error(err)) } - // check if context was cancelled, signaling that the consumer should stop - if ctx.Err() != nil { - c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return - } } } -func (c *kafkaLogsConsumer) Shutdown(context.Context) error { - if c.cancelConsumeLoop == nil { +func (c *kafkaConsumer) Shutdown(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.shutdown { return nil } - c.cancelConsumeLoop() - c.consumeLoopWG.Wait() - if c.consumerGroup == nil { + c.shutdown = true + if !c.started { return nil } - return c.consumerGroup.Close() -} - -type tracesConsumerGroupHandler struct { - id component.ID - encoding string - unmarshaler ptrace.Unmarshaler - nextConsumer consumer.Traces - ready chan bool - readyCloser sync.Once - logger *zap.Logger - - obsrecv *receiverhelper.ObsReport - telemetryBuilder *metadata.TelemetryBuilder - - autocommitEnabled bool - messageMarking MessageMarking - headerExtractor HeaderExtractor - backOff *backoff.ExponentialBackOff - backOffMutex sync.Mutex -} - -type metricsConsumerGroupHandler struct { - id component.ID - encoding string - unmarshaler pmetric.Unmarshaler - nextConsumer consumer.Metrics - ready chan bool - readyCloser sync.Once - - logger *zap.Logger - - obsrecv *receiverhelper.ObsReport - telemetryBuilder *metadata.TelemetryBuilder - - autocommitEnabled bool - messageMarking MessageMarking - headerExtractor HeaderExtractor - backOff *backoff.ExponentialBackOff - backOffMutex sync.Mutex + if err := c.consumerGroup.Close(); err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.consumeLoopClosed: + } + return nil } -type logsConsumerGroupHandler struct { - id component.ID - encoding string - unmarshaler plog.Unmarshaler - nextConsumer consumer.Logs - ready chan bool - readyCloser sync.Once - - logger *zap.Logger +type consumerGroupHandler struct { + id component.ID + consumeMessage consumeMessageFunc + ready chan bool + readyCloser sync.Once + logger *zap.Logger obsrecv *receiverhelper.ObsReport telemetryBuilder *metadata.TelemetryBuilder autocommitEnabled bool messageMarking MessageMarking - headerExtractor HeaderExtractor backOff *backoff.ExponentialBackOff backOffMutex sync.Mutex } -var ( - _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) - _ sarama.ConsumerGroupHandler = (*metricsConsumerGroupHandler)(nil) - _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil) -) - -func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { +func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) }) - c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) + c.telemetryBuilder.KafkaReceiverPartitionStart.Add( + session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name())), + ) return nil } -func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) +func (c *consumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + c.telemetryBuilder.KafkaReceiverPartitionClose.Add( + session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name())), + ) return nil } -func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) if !c.autocommitEnabled { defer session.Commit() } for { select { - case message, ok := <-claim.Messages(): - if !ok { - return nil - } - c.logger.Debug("Kafka message claimed", - zap.String("value", string(message.Value)), - zap.Time("timestamp", message.Timestamp), - zap.String("topic", message.Topic)) - if !c.messageMarking.After { - session.MarkMessage(message, "") - } - - // If the Kafka exporter has propagated headers in the message, - // create a new context with client.Info in it. - ctx := newContextWithHeaders(session.Context(), message.Headers) - ctx = c.obsrecv.StartTracesOp(ctx) - attrs := attribute.NewSet( - attribute.String(attrInstanceName, c.id.String()), - attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), - ) - c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) - - traces, err := c.unmarshaler.UnmarshalTraces(message.Value) - if err != nil { - c.logger.Error("failed to unmarshal message", zap.Error(err)) - c.telemetryBuilder.KafkaReceiverUnmarshalFailedSpans.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } - return err - } - - c.headerExtractor.extractHeadersTraces(traces, message) - spanCount := traces.SpanCount() - err = c.nextConsumer.ConsumeTraces(ctx, traces) - c.obsrecv.EndTracesOp(ctx, c.encoding, spanCount, err) - if err != nil { - if errorRequiresBackoff(err) && c.backOff != nil { - backOffDelay := c.getNextBackoff() - if backOffDelay != backoff.Stop { - c.logger.Info("Backing off due to error from the next consumer.", - zap.Error(err), - zap.Duration("delay", backOffDelay), - zap.String("topic", message.Topic), - zap.Int32("partition", claim.Partition())) - select { - case <-session.Context().Done(): - return nil - case <-time.After(backOffDelay): - if !c.messageMarking.After { - // Unmark the message so it can be retried - session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") - } - return err - } - } - c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", - zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) - } - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } - return err - } - if c.backOff != nil { - c.resetBackoff() - } - if c.messageMarking.After { - session.MarkMessage(message, "") - } - if !c.autocommitEnabled { - session.Commit() - } - - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): + // Should return when the session's context is canceled. + // + // If we do not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` + // when rebalancing. See: https://github.com/IBM/sarama/issues/1192 return nil - } - } -} - -func (c *tracesConsumerGroupHandler) getNextBackoff() time.Duration { - c.backOffMutex.Lock() - defer c.backOffMutex.Unlock() - return c.backOff.NextBackOff() -} - -func (c *tracesConsumerGroupHandler) resetBackoff() { - c.backOffMutex.Lock() - defer c.backOffMutex.Unlock() - c.backOff.Reset() -} - -func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { - c.readyCloser.Do(func() { - close(c.ready) - }) - c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) - return nil -} - -func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name()))) - return nil -} - -func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) - if !c.autocommitEnabled { - defer session.Commit() - } - for { - select { case message, ok := <-claim.Messages(): if !ok { return nil } - c.logger.Debug("Kafka message claimed", - zap.String("value", string(message.Value)), - zap.Time("timestamp", message.Timestamp), - zap.String("topic", message.Topic)) - if !c.messageMarking.After { - session.MarkMessage(message, "") - } - - // If the Kafka exporter has propagated headers in the message, - // create a new context with client.Info in it. - ctx := newContextWithHeaders(session.Context(), message.Headers) - ctx = c.obsrecv.StartMetricsOp(ctx) - attrs := attribute.NewSet( - attribute.String(attrInstanceName, c.id.String()), - attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), - ) - c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) - - metrics, err := c.unmarshaler.UnmarshalMetrics(message.Value) - if err != nil { - c.logger.Error("failed to unmarshal message", zap.Error(err)) - c.telemetryBuilder.KafkaReceiverUnmarshalFailedMetricPoints.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } - return err - } - c.headerExtractor.extractHeadersMetrics(metrics, message) - - dataPointCount := metrics.DataPointCount() - err = c.nextConsumer.ConsumeMetrics(ctx, metrics) - c.obsrecv.EndMetricsOp(ctx, c.encoding, dataPointCount, err) - if err != nil { - if errorRequiresBackoff(err) && c.backOff != nil { - backOffDelay := c.getNextBackoff() - if backOffDelay != backoff.Stop { - c.logger.Info("Backing off due to error from the next consumer.", - zap.Error(err), - zap.Duration("delay", backOffDelay), - zap.String("topic", message.Topic), - zap.Int32("partition", claim.Partition())) - select { - case <-session.Context().Done(): - return nil - case <-time.After(backOffDelay): - if !c.messageMarking.After { - // Unmark the message so it can be retried - session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") - } - return err - } - } - c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", - zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) - } - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } + if err := c.handleMessage(session, claim, message); err != nil { return err } - if c.backOff != nil { - c.resetBackoff() - } - if c.messageMarking.After { - session.MarkMessage(message, "") - } - if !c.autocommitEnabled { - session.Commit() - } - - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 - case <-session.Context().Done(): - return nil } } } -func (c *metricsConsumerGroupHandler) getNextBackoff() time.Duration { - c.backOffMutex.Lock() - defer c.backOffMutex.Unlock() - return c.backOff.NextBackOff() -} - -func (c *metricsConsumerGroupHandler) resetBackoff() { - c.backOffMutex.Lock() - defer c.backOffMutex.Unlock() - c.backOff.Reset() -} - -func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { - c.readyCloser.Do(func() { - close(c.ready) - }) - c.telemetryBuilder.KafkaReceiverPartitionStart.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) - return nil -} - -func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - c.telemetryBuilder.KafkaReceiverPartitionClose.Add(session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) - return nil -} - -func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition())) - if !c.autocommitEnabled { - defer session.Commit() - } - for { - select { - case message, ok := <-claim.Messages(): - if !ok { - return nil - } - c.logger.Debug("Kafka message claimed", - zap.String("value", string(message.Value)), - zap.Time("timestamp", message.Timestamp), - zap.String("topic", message.Topic)) - if !c.messageMarking.After { - session.MarkMessage(message, "") - } - - // If the Kafka exporter has propagated headers in the message, - // create a new context with client.Info in it. - ctx := newContextWithHeaders(session.Context(), message.Headers) - ctx = c.obsrecv.StartLogsOp(ctx) - attrs := attribute.NewSet( - attribute.String(attrInstanceName, c.id.String()), - attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), - ) - c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) - c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) - - logs, err := c.unmarshaler.UnmarshalLogs(message.Value) - if err != nil { - c.logger.Error("failed to unmarshal message", zap.Error(err)) - c.telemetryBuilder.KafkaReceiverUnmarshalFailedLogRecords.Add(ctx, 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.String()))) - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } - return err - } - c.headerExtractor.extractHeadersLogs(logs, message) - logRecordCount := logs.LogRecordCount() - err = c.nextConsumer.ConsumeLogs(ctx, logs) - c.obsrecv.EndLogsOp(ctx, c.encoding, logRecordCount, err) - if err != nil { - if errorRequiresBackoff(err) && c.backOff != nil { - backOffDelay := c.getNextBackoff() - if backOffDelay != backoff.Stop { - c.logger.Info("Backing off due to error from the next consumer.", - zap.Error(err), - zap.Duration("delay", backOffDelay), - zap.String("topic", message.Topic), - zap.Int32("partition", claim.Partition())) - select { - case <-session.Context().Done(): - return nil - case <-time.After(backOffDelay): - if !c.messageMarking.After { - // Unmark the message so it can be retried - session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") - } - return err - } +func (c *consumerGroupHandler) handleMessage( + session sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim, + message *sarama.ConsumerMessage, +) error { + c.logger.Debug("Kafka message claimed", + zap.String("value", string(message.Value)), + zap.Time("timestamp", message.Timestamp), + zap.String("topic", message.Topic)) + if !c.messageMarking.After { + session.MarkMessage(message, "") + } + + // If the Kafka exporter has propagated headers in the message, + // create a new context with client.Info in it. + ctx := newContextWithHeaders(session.Context(), message.Headers) + attrs := attribute.NewSet( + attribute.String(attrInstanceName, c.id.String()), + attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))), + ) + c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, message.Offset, metric.WithAttributeSet(attrs)) + c.telemetryBuilder.KafkaReceiverOffsetLag.Record(ctx, claim.HighWaterMarkOffset()-message.Offset-1, metric.WithAttributeSet(attrs)) + + if err := c.consumeMessage(ctx, message); err != nil { + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.getNextBackoff() + if backOffDelay != backoff.Stop { + c.logger.Info("Backing off due to error from the next consumer.", + zap.Error(err), + zap.Duration("delay", backOffDelay), + zap.String("topic", message.Topic), + zap.Int32("partition", claim.Partition())) + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") } - c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", - zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) + return err } - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") - } - return err - } - if c.backOff != nil { - c.resetBackoff() - } - if c.messageMarking.After { - session.MarkMessage(message, "") - } - if !c.autocommitEnabled { - session.Commit() } - - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 - case <-session.Context().Done(): - return nil + c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached", + zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime)) + } + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") } + return err + } + if c.backOff != nil { + c.resetBackoff() + } + if c.messageMarking.After { + session.MarkMessage(message, "") + } + if !c.autocommitEnabled { + session.Commit() } + return nil } -func (c *logsConsumerGroupHandler) getNextBackoff() time.Duration { +func (c *consumerGroupHandler) getNextBackoff() time.Duration { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() return c.backOff.NextBackOff() } -func (c *logsConsumerGroupHandler) resetBackoff() { +func (c *consumerGroupHandler) resetBackoff() { c.backOffMutex.Lock() defer c.backOffMutex.Unlock() c.backOff.Reset() @@ -868,3 +577,30 @@ func newContextWithHeaders(ctx context.Context, } return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)}) } + +// getMessageHeaderResourceAttributes returns key-value pairs to add +// to the resource attributes of decoded data. This is used by the +// "header extraction" feature of the receiver. +func getMessageHeaderResourceAttributes(message *sarama.ConsumerMessage, headers []string) iter.Seq2[string, string] { + return func(yield func(string, string) bool) { + for _, header := range headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + continue + } + if !yield("kafka.header."+header, value) { + return + } + } + } +} + +func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) { + for _, kafkaHeader := range headers { + headerKey := string(kafkaHeader.Key) + if headerKey == header { + return string(kafkaHeader.Value), true + } + } + return "", false +} diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 04cae8d668795..59f5e9cfc8fa4 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -6,15 +6,16 @@ package kafkareceiver import ( "context" "errors" - "sync" "sync/atomic" "testing" "time" - "github.com/IBM/sarama" - "github.com/cenkalti/backoff/v4" + "github.com/rcrowley/go-metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kfake" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" @@ -23,7 +24,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" - "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -32,1106 +33,491 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/kafkatest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadatatest" ) -func TestTracesReceiverStart(t *testing.T) { - c := kafkaTracesConsumer{ - config: createDefaultConfig(), - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: receivertest.NewNopSettings(metadata.Type), - consumerGroup: &testConsumerGroup{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) +func init() { + // Disable the go-metrics registry, as there's a goroutine leak in the Sarama + // code that uses it. See this stale issue: https://github.com/IBM/sarama/issues/1321 + // + // Sarama docs suggest setting UseNilMetrics to true to disable metrics if they + // are not needed, which is the case here. We only disable in tests to avoid + // affecting other components that rely on go-metrics. + metrics.UseNilMetrics = true } -func TestTracesReceiverStartConsume(t *testing.T) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) +func TestReceiver(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) + + // Send some traces to the otlp_spans topic. + traces := testdata.GenerateTraces(5) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) require.NoError(t, err) - c := kafkaTracesConsumer{ - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: receivertest.NewNopSettings(metadata.Type), - consumerGroup: &testConsumerGroup{}, - telemetryBuilder: telemetryBuilder, - } - ctx, cancelFunc := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancelFunc - require.NoError(t, c.Shutdown(context.Background())) - c.consumeLoopWG.Add(1) - c.consumeLoop(ctx, &tracesConsumerGroupHandler{ - ready: make(chan bool), - telemetryBuilder: telemetryBuilder, + results := kafkaClient.ProduceSync(context.Background(), &kgo.Record{ + Topic: "otlp_spans", + Value: data, }) -} + require.NoError(t, results.FirstErr()) -func TestTracesReceiver_error(t *testing.T) { - zcore, logObserver := observer.New(zapcore.ErrorLevel) - logger := zap.New(zcore) - settings := receivertest.NewNopSettings(metadata.Type) - settings.Logger = logger - - expectedErr := errors.New("handler error") - c := kafkaTracesConsumer{ - config: createDefaultConfig(), - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, - telemetryBuilder: nopTelemetryBuilder(t), - } - - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) - assert.Eventually(t, func() bool { - return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0 - }, 10*time.Second, time.Millisecond*100) + // Wait for message to be consumed. + received := make(chan consumerArgs[ptrace.Traces], 1) + mustNewTracesReceiver(t, receiverConfig, newChannelTracesConsumer(received)) + args := <-received + assert.NoError(t, ptracetest.CompareTraces(traces, args.data)) } -func TestTracesConsumerGroupHandler(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - var called atomic.Bool - c := tracesConsumerGroupHandler{ - unmarshaler: &ptrace.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: func() consumer.Traces { - c, err := consumer.NewTraces(func(ctx context.Context, _ ptrace.Traces) error { - defer called.Store(true) - info := client.FromContext(ctx) - assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id")) - assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids")) - return nil - }) - require.NoError(t, err) - return c - }(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - testSession := testConsumerGroupSession{ctx: context.Background()} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("x-tenant-id"), - Value: []byte("abcdefg"), - }, - { - Key: []byte("x-request-ids"), - Value: []byte("1234"), +func TestReceiver_Headers_Metadata(t *testing.T) { + t.Parallel() + for name, testcase := range map[string]struct { + headers []kgo.RecordHeader + expected map[string][]string + }{ + "no headers": {}, + "single header": { + headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, }, - { - Key: []byte("x-request-ids"), - Value: []byte("5678"), + expected: map[string][]string{ + "key1": {"value1"}, }, }, - } - close(groupClaim.messageChan) - wg.Wait() - assert.True(t, called.Load()) // Ensure nextConsumer was called. -} - -func TestTracesConsumerGroupHandler_session_done(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - c := tracesConsumerGroupHandler{ - unmarshaler: &ptrace.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - ctx, cancelFunc := context.WithCancel(context.Background()) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - defer close(groupClaim.messageChan) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{} - cancelFunc() - wg.Wait() -} - -func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - c := tracesConsumerGroupHandler{ - unmarshaler: &ptrace.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - go func() { - err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.Error(t, err) - wg.Done() - }() - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} - close(groupClaim.messageChan) - wg.Wait() - metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverUnmarshalFailedSpans(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, metricdatatest.IgnoreTimestamp()) -} - -func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { - consumerError := errors.New("failed to consume") - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - - tests := []struct { - name string - err, expectedError error - expectedBackoff time.Duration - }{ - { - name: "memory limiter data refused error", - err: errMemoryLimiterDataRefused, - expectedError: errMemoryLimiterDataRefused, - expectedBackoff: backoff.DefaultInitialInterval, + "multiple headers": { + headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, + {Key: "key2", Value: []byte("value2")}, + }, + expected: map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + }, }, - { - name: "consumer error that does not require backoff", - err: consumerError, - expectedError: consumerError, - expectedBackoff: 0, + "single header multiple values": { + headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, + {Key: "key1", Value: []byte("value2")}, + }, + expected: map[string][]string{ + "key1": {"value1", "value2"}, + }, }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - backOff := backoff.NewExponentialBackOff() - backOff.RandomizationFactor = 0 - c := tracesConsumerGroupHandler{ - unmarshaler: &ptrace.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(tt.err), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - backOff: backOff, - } - - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - go func() { - start := time.Now() - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - end := time.Now() - if tt.expectedError != nil { - assert.EqualError(t, e, tt.expectedError.Error()) - } else { - assert.NoError(t, e) - } - assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) - wg.Done() - }() - - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty() - unmarshaler := &ptrace.ProtoMarshaler{} - bts, err := unmarshaler.MarshalTraces(td) + } { + t.Run(name, func(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) + + // Send some traces to the otlp_spans topic, including headers. + traces := testdata.GenerateTraces(1) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + results := kafkaClient.ProduceSync(context.Background(), &kgo.Record{ + Topic: "otlp_spans", + Value: data, + Headers: testcase.headers, + }) + require.NoError(t, results.FirstErr()) + + // Wait for message to be consumed. + received := make(chan consumerArgs[ptrace.Traces], 1) + mustNewTracesReceiver(t, receiverConfig, newChannelTracesConsumer(received)) + args := <-received + info := client.FromContext(args.ctx) + for key, values := range testcase.expected { + assert.Equal(t, values, info.Metadata.Get(key)) + } }) } } -func TestMetricsReceiverStartConsume(t *testing.T) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) - require.NoError(t, err) - c := kafkaMetricsConsumer{ - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: receivertest.NewNopSettings(metadata.Type), - consumerGroup: &testConsumerGroup{}, - telemetryBuilder: telemetryBuilder, - } - ctx, cancelFunc := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancelFunc - require.NoError(t, c.Shutdown(context.Background())) - c.consumeLoopWG.Add(1) - c.consumeLoop(ctx, &logsConsumerGroupHandler{ - ready: make(chan bool), - telemetryBuilder: telemetryBuilder, - }) -} +func TestReceiver_Headers_HeaderExtraction(t *testing.T) { + t.Parallel() + for _, enabled := range []bool{false, true} { + name := "enabled" + if !enabled { + name = "disabled" + } + t.Run(name, func(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) -func TestMetricsReceiver_error(t *testing.T) { - zcore, logObserver := observer.New(zapcore.ErrorLevel) - logger := zap.New(zcore) - settings := receivertest.NewNopSettings(metadata.Type) - settings.Logger = logger - - expectedErr := errors.New("handler error") - c := kafkaMetricsConsumer{ - config: createDefaultConfig(), - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, - telemetryBuilder: nopTelemetryBuilder(t), - } - - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) - assert.Eventually(t, func() bool { - return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0 - }, 10*time.Second, time.Millisecond*100) -} - -func TestMetricsConsumerGroupHandler(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - var called atomic.Bool - c := metricsConsumerGroupHandler{ - unmarshaler: &pmetric.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: func() consumer.Metrics { - c, err := consumer.NewMetrics(func(ctx context.Context, _ pmetric.Metrics) error { - defer called.Store(true) - info := client.FromContext(ctx) - assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id")) - assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids")) - return nil - }) + // Send some traces to the otlp_spans topic, including headers. + traces := testdata.GenerateTraces(1) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) require.NoError(t, err) - return c - }(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - testSession := testConsumerGroupSession{ctx: context.Background()} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("x-tenant-id"), - Value: []byte("abcdefg"), - }, - { - Key: []byte("x-request-ids"), - Value: []byte("1234"), - }, - { - Key: []byte("x-request-ids"), - Value: []byte("5678"), - }, - }, - } - close(groupClaim.messageChan) - wg.Wait() - assert.True(t, called.Load()) // Ensure nextConsumer was called. -} - -func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - c := metricsConsumerGroupHandler{ - unmarshaler: &pmetric.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - ctx, cancelFunc := context.WithCancel(context.Background()) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - defer close(groupClaim.messageChan) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{} - cancelFunc() - wg.Wait() -} - -func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - c := metricsConsumerGroupHandler{ - unmarshaler: &pmetric.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + results := kafkaClient.ProduceSync(context.Background(), &kgo.Record{ + Topic: "otlp_spans", + Value: data, + Headers: []kgo.RecordHeader{{ + Key: "extracted", + Value: []byte("value1"), + }, { + Key: "extracted", + Value: []byte("value2"), + }, { + Key: "not_extracted", + Value: []byte("value3"), + }}, + }) + require.NoError(t, results.FirstErr()) + + // Wait for message to be consumed. + received := make(chan consumerArgs[ptrace.Traces], 1) + receiverConfig.HeaderExtraction.ExtractHeaders = enabled + receiverConfig.HeaderExtraction.Headers = []string{"extracted"} + mustNewTracesReceiver(t, receiverConfig, newChannelTracesConsumer(received)) + args := <-received + + resource := args.data.ResourceSpans().At(0).Resource() + value, ok := resource.Attributes().Get("kafka.header.extracted") + if enabled { + require.True(t, ok) + assert.Equal(t, "value1", value.Str()) // only first value is extracted + } else { + require.False(t, ok) + } + }) } - go func() { - err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.Error(t, err) - wg.Done() - }() - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} - close(groupClaim.messageChan) - wg.Wait() - metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverUnmarshalFailedMetricPoints(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, metricdatatest.IgnoreTimestamp()) } -func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { - consumerError := errors.New("failed to consume") - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) - require.NoError(t, err) - - tests := []struct { - name string - err, expectedError error - expectedBackoff time.Duration +func TestReceiver_ConsumeError(t *testing.T) { + t.Parallel() + for name, testcase := range map[string]struct { + err error + shouldRetry bool }{ - { - name: "memory limiter data refused error", - err: errMemoryLimiterDataRefused, - expectedError: errMemoryLimiterDataRefused, - expectedBackoff: backoff.DefaultInitialInterval, + "retryable error": { + // FIXME the receiver checks a specific error message + // from a different component, which is a bit brittle. + // Let's revisit this in the future; we might want to check + // for permanent vs. transient errors instead. + err: errMemoryLimiterDataRefused, + shouldRetry: true, }, - { - name: "consumer error that does not require backoff", - err: consumerError, - expectedError: consumerError, - expectedBackoff: 0, + "permanent error": { + err: errors.New("failed to consume"), }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - backOff := backoff.NewExponentialBackOff() - backOff.RandomizationFactor = 0 - c := metricsConsumerGroupHandler{ - unmarshaler: &pmetric.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(tt.err), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - backOff: backOff, - } + } { + t.Run(name, func(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) + + // Send some traces to the otlp_spans topic. + traces := testdata.GenerateTraces(1) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) + require.NoError(t, err) + results := kafkaClient.ProduceSync(context.Background(), + &kgo.Record{Topic: "otlp_spans", Value: data}, + ) + require.NoError(t, results.FirstErr()) + + var calls atomic.Int64 + consumer := newTracesConsumer(func(context.Context, ptrace.Traces) error { + calls.Add(1) + return testcase.err + }) - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + // Wait for messages to be consumed. + receiverConfig.ErrorBackOff.Enabled = true + receiverConfig.ErrorBackOff.InitialInterval = 10 * time.Millisecond + receiverConfig.ErrorBackOff.MaxInterval = 10 * time.Millisecond + receiverConfig.ErrorBackOff.MaxElapsedTime = 500 * time.Millisecond + mustNewTracesReceiver(t, receiverConfig, consumer) + + if testcase.shouldRetry { + assert.Eventually( + t, func() bool { return calls.Load() > 1 }, + 10*time.Second, 100*time.Millisecond, + ) + } else { + assert.Eventually( + t, func() bool { return calls.Load() == 1 }, + 10*time.Second, 100*time.Millisecond, + ) + // Verify that no retries have been attempted. + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int64(1), calls.Load()) } - go func() { - start := time.Now() - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - end := time.Now() - if tt.expectedError != nil { - assert.EqualError(t, e, tt.expectedError.Error()) - } else { - assert.NoError(t, e) - } - assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) - wg.Done() - }() - - ld := testdata.GenerateMetrics(1) - unmarshaler := &pmetric.ProtoMarshaler{} - bts, err := unmarshaler.MarshalMetrics(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() }) } } -func TestLogsReceiverStart(t *testing.T) { - c := kafkaLogsConsumer{ - config: createDefaultConfig(), - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: receivertest.NewNopSettings(metadata.Type), - consumerGroup: &testConsumerGroup{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) -} +func TestReceiver_InternalTelemetry(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) -func TestLogsReceiverStartConsume(t *testing.T) { - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) + // Send some traces to the otlp_spans topic. + traces := testdata.GenerateTraces(1) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) require.NoError(t, err) - c := kafkaLogsConsumer{ - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: receivertest.NewNopSettings(metadata.Type), - consumerGroup: &testConsumerGroup{}, - telemetryBuilder: telemetryBuilder, - } - ctx, cancelFunc := context.WithCancel(context.Background()) - c.cancelConsumeLoop = cancelFunc - require.NoError(t, c.Shutdown(context.Background())) - c.consumeLoopWG.Add(1) - c.consumeLoop(ctx, &logsConsumerGroupHandler{ - ready: make(chan bool), - telemetryBuilder: telemetryBuilder, + results := kafkaClient.ProduceSync(context.Background(), + &kgo.Record{Topic: "otlp_spans", Value: data}, + &kgo.Record{Topic: "otlp_spans", Value: data}, + &kgo.Record{Topic: "otlp_spans", Value: data}, + &kgo.Record{Topic: "otlp_spans", Value: data}, + &kgo.Record{Topic: "otlp_spans", Value: []byte("junk")}, + ) + require.NoError(t, results.FirstErr()) + + // Wait for messages to be consumed. + received := make(chan consumerArgs[ptrace.Traces], 1) + set, tel, observedLogs := mustNewSettings(t) + f := NewFactory() + r, err := f.CreateTraces(context.Background(), set, receiverConfig, newChannelTracesConsumer(received)) + require.NoError(t, err) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, r.Shutdown(context.Background())) }) -} - -func TestLogsReceiver_error(t *testing.T) { - zcore, logObserver := observer.New(zapcore.ErrorLevel) - logger := zap.New(zcore) - settings := receivertest.NewNopSettings(metadata.Type) - settings.Logger = logger - - expectedErr := errors.New("handler error") - c := kafkaLogsConsumer{ - nextConsumer: consumertest.NewNop(), - consumeLoopWG: &sync.WaitGroup{}, - settings: settings, - consumerGroup: &testConsumerGroup{err: expectedErr}, - config: createDefaultConfig(), - telemetryBuilder: nopTelemetryBuilder(t), + for range 4 { + <-received } - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) + // There should be one failed message due to the invalid third message payload. + // It may not be available immediately, as the receiver may not have processed it yet. assert.Eventually(t, func() bool { - return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0 - }, 10*time.Second, time.Millisecond*100) -} - -func TestLogsConsumerGroupHandler(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) + _, getMetricErr := tel.GetMetric("otelcol_kafka_receiver_unmarshal_failed_spans") + return getMetricErr == nil + }, 10*time.Second, 100*time.Millisecond) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedSpans(t, tel, []metricdata.DataPoint[int64]{{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + ), + }}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // After receiving messages, the internal metrics should be updated. + metadatatest.AssertEqualKafkaReceiverPartitionStart(t, tel, []metricdata.DataPoint[int64]{{ + // 2 because: + // - the initial open + // - the invalid message causes the consumer to restart, closing the partition + Value: 2, + Attributes: attribute.NewSet(attribute.String("name", set.ID.Name())), + }}, metricdatatest.IgnoreTimestamp()) + + metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{{ + Value: 5, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + attribute.String("partition", "0"), + ), + }}, metricdatatest.IgnoreTimestamp()) + + // Shut down and check that the partition close metric is updated. + err = r.Shutdown(context.Background()) require.NoError(t, err) - - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) + metadatatest.AssertEqualKafkaReceiverPartitionClose(t, tel, []metricdata.DataPoint[int64]{{ + // 2 because: + // - the invalid message causes the consumer to restart, closing the partition + // - it re-acquires the partition, but then shutting down closes the partition again + Value: 2, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.Name()), + ), + }}, metricdatatest.IgnoreTimestamp()) + + observedErrorLogs := observedLogs.FilterLevelExact(zapcore.ErrorLevel) + logEntries := observedErrorLogs.All() + assert.Len(t, logEntries, 1) + assert.Equal(t, "failed to unmarshal message", logEntries[0].Message) + + metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{{ + Value: 4, // offset of the final message + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + attribute.String("partition", "0"), + ), + }}, metricdatatest.IgnoreTimestamp()) + + metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{{ + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + attribute.String("partition", "0"), + ), + }}, metricdatatest.IgnoreTimestamp()) +} + +func TestNewLogsReceiver(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_logs")) + + var sink consumertest.LogsSink + receiverConfig.HeaderExtraction.ExtractHeaders = true + receiverConfig.HeaderExtraction.Headers = []string{"key1"} + set, tel, _ := mustNewSettings(t) + r, err := newLogsReceiver(receiverConfig, set, &sink) require.NoError(t, err) - var called atomic.Bool - c := logsConsumerGroupHandler{ - unmarshaler: &plog.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: func() consumer.Logs { - c, err := consumer.NewLogs(func(ctx context.Context, _ plog.Logs) error { - defer called.Store(true) - info := client.FromContext(ctx) - assert.Equal(t, []string{"abcdefg"}, info.Metadata.Get("x-tenant-id")) - assert.Equal(t, []string{"1234", "5678"}, info.Metadata.Get("x-request-ids")) - return nil - }) - require.NoError(t, err) - return c - }(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - testSession := testConsumerGroupSession{ctx: context.Background()} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{ - Headers: []*sarama.RecordHeader{ - { - Key: []byte("x-tenant-id"), - Value: []byte("abcdefg"), - }, - { - Key: []byte("x-request-ids"), - Value: []byte("1234"), - }, - { - Key: []byte("x-request-ids"), - Value: []byte("5678"), + // Send some logs to the otlp_logs topic. + logs := testdata.GenerateLogs(1) + data, err := (&plog.ProtoMarshaler{}).MarshalLogs(logs) + require.NoError(t, err) + results := kafkaClient.ProduceSync(context.Background(), + &kgo.Record{ + Topic: "otlp_logs", + Value: data, + Headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, }, }, - } - close(groupClaim.messageChan) - wg.Wait() - assert.True(t, called.Load()) // Ensure nextConsumer was called. -} + &kgo.Record{Topic: "otlp_logs", Value: []byte("junk")}, + ) + require.NoError(t, results.FirstErr()) -func TestLogsConsumerGroupHandler_session_done(t *testing.T) { - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, r.Shutdown(context.Background())) + }) - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) + // There should be one failed message due to the invalid message payload. + // It may not be available immediately, as the receiver may not have processed it yet. + assert.Eventually(t, func() bool { + _, err := tel.GetMetric("otelcol_kafka_receiver_unmarshal_failed_log_records") + return err == nil + }, 10*time.Second, 100*time.Millisecond) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedLogRecords(t, tel, []metricdata.DataPoint[int64]{{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + ), + }}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // There should be one successfully processed batch of logs. + assert.Len(t, sink.AllLogs(), 1) + _, ok := sink.AllLogs()[0].ResourceLogs().At(0).Resource().Attributes().Get("kafka.header.key1") + require.True(t, ok) +} + +func TestNewMetricsReceiver(t *testing.T) { + t.Parallel() + kafkaClient, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_metrics")) + + var sink consumertest.MetricsSink + receiverConfig.HeaderExtraction.ExtractHeaders = true + receiverConfig.HeaderExtraction.Headers = []string{"key1"} + set, tel, _ := mustNewSettings(t) + r, err := newMetricsReceiver(receiverConfig, set, &sink) require.NoError(t, err) - c := logsConsumerGroupHandler{ - unmarshaler: &plog.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - ctx, cancelFunc := context.WithCancel(context.Background()) - testSession := testConsumerGroupSession{ctx: ctx} - require.NoError(t, c.Setup(testSession)) - _, ok := <-c.ready - assert.False(t, ok) - assertInternalTelemetry(t, tel, 0) - - require.NoError(t, c.Cleanup(testSession)) - assertInternalTelemetry(t, tel, 1) - groupClaim := testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - defer close(groupClaim.messageChan) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - assert.NoError(t, c.ConsumeClaim(testSession, groupClaim)) - wg.Done() - }() - - groupClaim.messageChan <- &sarama.ConsumerMessage{} - cancelFunc() - wg.Wait() -} - -func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) + // Send some metrics to the otlp_metrics topic. + metrics := testdata.GenerateMetrics(1) + data, err := (&pmetric.ProtoMarshaler{}).MarshalMetrics(metrics) require.NoError(t, err) - tel := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) - require.NoError(t, err) - c := logsConsumerGroupHandler{ - unmarshaler: &plog.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: telemetryBuilder, - } - - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - go func() { - err := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.Error(t, err) - wg.Done() - }() - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} - close(groupClaim.messageChan) - wg.Wait() - metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, metricdatatest.IgnoreTimestamp()) - metadatatest.AssertEqualKafkaReceiverUnmarshalFailedLogRecords(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - ), + results := kafkaClient.ProduceSync(context.Background(), + &kgo.Record{ + Topic: "otlp_metrics", + Value: data, + Headers: []kgo.RecordHeader{ + {Key: "key1", Value: []byte("value1")}, + }, }, - }, metricdatatest.IgnoreTimestamp()) -} + &kgo.Record{Topic: "otlp_metrics", Value: []byte("junk")}, + ) + require.NoError(t, results.FirstErr()) -func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { - consumerError := errors.New("failed to consume") - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type)}) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) - - tests := []struct { - name string - err, expectedError error - expectedBackoff time.Duration - }{ - { - name: "memory limiter data refused error", - err: errMemoryLimiterDataRefused, - expectedError: errMemoryLimiterDataRefused, - expectedBackoff: backoff.DefaultInitialInterval, - }, - { - name: "consumer error that does not require backoff", - err: consumerError, - expectedError: consumerError, - expectedBackoff: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - backOff := backoff.NewExponentialBackOff() - backOff.RandomizationFactor = 0 - c := logsConsumerGroupHandler{ - unmarshaler: &plog.ProtoUnmarshaler{}, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(tt.err), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - backOff: backOff, - } - - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), - } - go func() { - start := time.Now() - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - end := time.Now() - if tt.expectedError != nil { - assert.EqualError(t, e, tt.expectedError.Error()) - } else { - assert.NoError(t, e) - } - assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) - wg.Done() - }() - - ld := testdata.GenerateLogs(1) - unmarshaler := &plog.ProtoMarshaler{} - bts, err := unmarshaler.MarshalLogs(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() - }) - } -} - -type testConsumerGroupClaim struct { - messageChan chan *sarama.ConsumerMessage -} - -var _ sarama.ConsumerGroupClaim = (*testConsumerGroupClaim)(nil) - -const ( - testTopic = "otlp_spans" - testPartition = 5 - testInitialOffset = 6 - testHighWatermarkOffset = 4 -) - -func (t testConsumerGroupClaim) Topic() string { - return testTopic -} - -func (t testConsumerGroupClaim) Partition() int32 { - return testPartition -} - -func (t testConsumerGroupClaim) InitialOffset() int64 { - return testInitialOffset -} - -func (t testConsumerGroupClaim) HighWaterMarkOffset() int64 { - return testHighWatermarkOffset -} - -func (t testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { - return t.messageChan -} - -type testConsumerGroupSession struct { - ctx context.Context -} - -func (t testConsumerGroupSession) Commit() { -} - -var _ sarama.ConsumerGroupSession = (*testConsumerGroupSession)(nil) - -func (t testConsumerGroupSession) Claims() map[string][]int32 { - panic("implement me") -} - -func (t testConsumerGroupSession) MemberID() string { - panic("implement me") -} - -func (t testConsumerGroupSession) GenerationID() int32 { - panic("implement me") -} - -func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) { -} - -func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) { -} - -func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {} - -func (t testConsumerGroupSession) Context() context.Context { - return t.ctx -} - -type testConsumerGroup struct { - once sync.Once - err error -} - -var _ sarama.ConsumerGroup = (*testConsumerGroup)(nil) - -func (t *testConsumerGroup) Consume(ctx context.Context, _ []string, handler sarama.ConsumerGroupHandler) error { - t.once.Do(func() { - _ = handler.Setup(testConsumerGroupSession{ctx: ctx}) + t.Cleanup(func() { + assert.NoError(t, r.Shutdown(context.Background())) }) - return t.err -} -func (t *testConsumerGroup) Errors() <-chan error { - panic("implement me") + // There should be one failed message due to the invalid message payload. + // It may not be available immediately, as the receiver may not have processed it yet. + assert.Eventually(t, func() bool { + _, err := tel.GetMetric("otelcol_kafka_receiver_unmarshal_failed_metric_points") + return err == nil + }, 10*time.Second, 100*time.Millisecond) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedMetricPoints(t, tel, []metricdata.DataPoint[int64]{{ + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", set.ID.String()), + ), + }}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // There should be one successfully processed batch of metrics. + assert.Len(t, sink.AllMetrics(), 1) + _, ok := sink.AllMetrics()[0].ResourceMetrics().At(0).Resource().Attributes().Get("kafka.header.key1") + require.True(t, ok) +} + +func mustNewTracesReceiver(tb testing.TB, cfg *Config, nextConsumer consumer.Traces) { + tb.Helper() + + f := NewFactory() + r, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nextConsumer) + require.NoError(tb, err) + require.NoError(tb, r.Start(context.Background(), componenttest.NewNopHost())) + tb.Cleanup(func() { + assert.NoError(tb, r.Shutdown(context.Background())) + }) } -func (t *testConsumerGroup) Close() error { - return nil +func mustNewSettings(tb testing.TB) (receiver.Settings, *componenttest.Telemetry, *observer.ObservedLogs) { + zapCore, observedLogs := observer.New(zapcore.DebugLevel) + set := receivertest.NewNopSettings(metadata.Type) + tel := componenttest.NewTelemetry() + tb.Cleanup(func() { + assert.NoError(tb, tel.Shutdown(context.Background())) + }) + set.TelemetrySettings = tel.NewTelemetrySettings() + set.Logger = zap.New(zapCore) + return set, tel, observedLogs } -func (t *testConsumerGroup) Pause(_ map[string][]int32) { - panic("implement me") +// consumerArgs holds the context and data passed to the consumer function. +type consumerArgs[T any] struct { + ctx context.Context + data T } -func (t *testConsumerGroup) PauseAll() { - panic("implement me") +func newChannelTracesConsumer(ch chan<- consumerArgs[ptrace.Traces]) consumer.Traces { + return newTracesConsumer(func(ctx context.Context, data ptrace.Traces) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- consumerArgs[ptrace.Traces]{ctx: ctx, data: data}: + } + return nil + }) } -func (t *testConsumerGroup) Resume(_ map[string][]int32) { - panic("implement me") +func newTracesConsumer(f consumer.ConsumeTracesFunc) consumer.Traces { + consumer, _ := consumer.NewTraces(f) + return consumer } -func (t *testConsumerGroup) ResumeAll() { - panic("implement me") -} +// mustNewFakeCluster creates a new fake Kafka cluster with the given options, +// and returns a kgo.Client for operating on the cluster, and a receiver config. +func mustNewFakeCluster(tb testing.TB, opts ...kfake.Opt) (*kgo.Client, *Config) { + cluster, clientConfig := kafkatest.NewCluster(tb, opts...) + kafkaClient := mustNewClient(tb, cluster) + tb.Cleanup(func() { deleteConsumerGroups(tb, kafkaClient) }) -func assertInternalTelemetry(t *testing.T, tel *componenttest.Telemetry, partitionClose int64) { - metadatatest.AssertEqualKafkaReceiverPartitionStart(t, tel, []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, metricdatatest.IgnoreTimestamp()) - if partitionClose > 0 { - metadatatest.AssertEqualKafkaReceiverPartitionClose(t, tel, []metricdata.DataPoint[int64]{ - { - Value: partitionClose, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, metricdatatest.IgnoreTimestamp()) - } + cfg := createDefaultConfig() + cfg.ClientConfig = clientConfig + cfg.InitialOffset = "earliest" + cfg.MaxFetchWait = time.Millisecond + return kafkaClient, cfg } -func nopTelemetryBuilder(t *testing.T) *metadata.TelemetryBuilder { - telemetryBuilder, err := metadata.NewTelemetryBuilder(receivertest.NewNopSettings(metadata.Type).TelemetrySettings) - require.NoError(t, err) - return telemetryBuilder +func mustNewClient(tb testing.TB, cluster *kfake.Cluster) *kgo.Client { + client, err := kgo.NewClient(kgo.SeedBrokers(cluster.ListenAddrs()...)) + require.NoError(tb, err) + tb.Cleanup(client.Close) + return client } -func Test_newContextWithHeaders(t *testing.T) { - type args struct { - ctx context.Context - headers []*sarama.RecordHeader - } - tests := []struct { - name string - args args - want map[string][]string - }{ - { - name: "no headers", - args: args{ - ctx: context.Background(), - headers: []*sarama.RecordHeader{}, - }, - want: map[string][]string{}, - }, - { - name: "single header", - args: args{ - ctx: context.Background(), - headers: []*sarama.RecordHeader{ - {Key: []byte("key1"), Value: []byte("value1")}, - }, - }, - want: map[string][]string{ - "key1": {"value1"}, - }, - }, - { - name: "multiple headers", - args: args{ - ctx: context.Background(), - headers: []*sarama.RecordHeader{ - {Key: []byte("key1"), Value: []byte("value1")}, - {Key: []byte("key2"), Value: []byte("value2")}, - }, - }, - want: map[string][]string{ - "key1": {"value1"}, - "key2": {"value2"}, - }, - }, - { - name: "duplicate keys", - args: args{ - ctx: context.Background(), - headers: []*sarama.RecordHeader{ - {Key: []byte("key1"), Value: []byte("value1")}, - {Key: []byte("key1"), Value: []byte("value2")}, - }, - }, - want: map[string][]string{ - "key1": {"value1", "value2"}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := newContextWithHeaders(tt.args.ctx, tt.args.headers) - clientInfo := client.FromContext(ctx) - for k, wantVal := range tt.want { - val := clientInfo.Metadata.Get(k) - assert.Equal(t, wantVal, val) - } - }) - } +// deleteConsumerGroups deletes all consumer groups in the cluster. +// +// It is necessary to call this to exit the group goroutines in the kfake cluster. +func deleteConsumerGroups(tb testing.TB, client *kgo.Client) { + adminClient := kadm.NewClient(client) + groups, err := adminClient.ListGroups(context.Background()) + assert.NoError(tb, err) + _, err = adminClient.DeleteGroups(context.Background(), groups.Groups()...) + assert.NoError(tb, err) }