From db1bfd0b62d2bb5223184dc0a73e735eecd292dc Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 21 Jan 2025 11:47:06 +0800 Subject: [PATCH 1/8] receiver/awsfirehose: add benchmarks Benchmark the entire HTTP request handling to avoid fixating on irrelevant details. --- .../awsfirehosereceiver/benchmark_test.go | 167 ++++++++++++++++++ receiver/awsfirehosereceiver/receiver_test.go | 9 + 2 files changed, 176 insertions(+) create mode 100644 receiver/awsfirehosereceiver/benchmark_test.go diff --git a/receiver/awsfirehosereceiver/benchmark_test.go b/receiver/awsfirehosereceiver/benchmark_test.go new file mode 100644 index 0000000000000..cb20d265c79f2 --- /dev/null +++ b/receiver/awsfirehosereceiver/benchmark_test.go @@ -0,0 +1,167 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awsfirehosereceiver + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "math/rand/v2" + "net/http" + "net/http/httptest" + "testing" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func BenchmarkLogsConsumer_cwlogs(b *testing.B) { + // numLogGroups is the maximum number of unique log groups + // to use across the generated logs, using a random generator + // with fixed seeds for repeatability. + const numLogGroups = 10 + rng := rand.New(rand.NewPCG(1, 2)) + + // numRecords is the number of records in the Firehose envelope. + for _, numRecords := range []int{10, 100} { + // numLogs is the number of CoudWatch log records within a Firehose record. + for _, numLogs := range []int{1, 10} { + b.Run(fmt.Sprintf("%dresources_%drecords_%dlogs", numLogGroups, numRecords, numLogs), func(b *testing.B) { + + config := createDefaultConfig().(*Config) + config.Endpoint = "localhost:0" + r, err := createLogsReceiver( + context.Background(), + receivertest.NewNopSettings(), + config, + consumertest.NewNop(), + ) + require.NoError(b, err) + + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + b.Cleanup(func() { + err = r.Shutdown(context.Background()) + assert.NoError(b, err) + }) + + records := make([]firehoseRecord, numRecords) + for i := range records { + records[i] = firehoseRecord{ + Data: base64.StdEncoding.EncodeToString( + makeCloudWatchLogRecord(rng, numLogs, numLogGroups), + ), + } + } + fr := testFirehoseRequest(testFirehoseRequestID, records) + body, err := json.Marshal(fr) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req := newTestRequest(body) + recorder := httptest.NewRecorder() + r.(http.Handler).ServeHTTP(recorder, req) + if recorder.Code != http.StatusOK { + b.Fatalf("expected status code 200, got %d", recorder.Code) + } + } + }) + } + } +} + +func BenchmarkMetricsConsumer_cwmetrics(b *testing.B) { + // numStreams is the maximum number of unique metric streams + // to use across the generated metrics, using a random generator + // with fixed seeds for repeatability. + const numStreams = 10 + rng := rand.New(rand.NewPCG(1, 2)) + + // numRecords is the number of records in the Firehose envelope. + for _, numRecords := range []int{10, 100} { + // numMetrics is the number of CoudWatch metrics within a Firehose record. + for _, numMetrics := range []int{1, 10} { + b.Run(fmt.Sprintf("%dresources_%drecords_%dmetrics", numStreams, numRecords, numMetrics), func(b *testing.B) { + + config := createDefaultConfig().(*Config) + config.Endpoint = "localhost:0" + r, err := createMetricsReceiver( + context.Background(), + receivertest.NewNopSettings(), + config, + consumertest.NewNop(), + ) + require.NoError(b, err) + + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + b.Cleanup(func() { + err = r.Shutdown(context.Background()) + assert.NoError(b, err) + }) + + records := make([]firehoseRecord, numRecords) + for i := range records { + records[i] = firehoseRecord{ + Data: base64.StdEncoding.EncodeToString( + makeCloudWatchMetricRecord(rng, numMetrics, numStreams), + ), + } + } + + fr := testFirehoseRequest(testFirehoseRequestID, records) + body, err := json.Marshal(fr) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req := newTestRequest(body) + recorder := httptest.NewRecorder() + r.(http.Handler).ServeHTTP(recorder, req) + if recorder.Code != http.StatusOK { + b.Fatalf("expected status code 200, got %d", recorder.Code) + } + } + }) + } + } +} + +func makeCloudWatchLogRecord(rng *rand.Rand, numLogs, numLogGroups int) []byte { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + for i := 0; i < numLogs; i++ { + group := rng.IntN(numLogGroups) + fmt.Fprintf(w, + `{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"group_%d","logStream":"stream","logEvents":[{"id":"the_id","timestamp":1725594035523,"message":"message %d"}]}`, + group, i, + ) + fmt.Fprintln(w) + } + if err := w.Close(); err != nil { + panic(err) + } + return buf.Bytes() +} + +func makeCloudWatchMetricRecord(rng *rand.Rand, numMetrics, numStreams int) []byte { + var buf bytes.Buffer + for i := 0; i < numMetrics; i++ { + stream := rng.IntN(numStreams) + fmt.Fprintf(&buf, + `{"metric_stream_name":"stream_%d","account_id":"1234567890","region":"us-east-1","namespace":"AWS/NATGateway","metric_name":"metric_%d","dimensions":{"NatGatewayId":"nat-01a4160dfb995b990"},"timestamp":1643916720000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":2.0},"unit":"Count"}`, + stream, i, + ) + fmt.Fprintln(&buf) + } + return buf.Bytes() +} diff --git a/receiver/awsfirehosereceiver/receiver_test.go b/receiver/awsfirehosereceiver/receiver_test.go index 2a70846462f55..5cf7d5e43638a 100644 --- a/receiver/awsfirehosereceiver/receiver_test.go +++ b/receiver/awsfirehosereceiver/receiver_test.go @@ -238,6 +238,15 @@ func testFirehoseReceiver(config *Config, consumer firehoseConsumer) *firehoseRe } } +func newTestRequest(requestBody []byte) *http.Request { + request := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(requestBody)) + request.Header.Set(headerContentType, "application/json") + request.Header.Set(headerContentLength, strconv.Itoa(len(requestBody))) + request.Header.Set(headerFirehoseRequestID, testFirehoseRequestID) + request.Header.Set(headerFirehoseAccessKey, testFirehoseAccessKey) + return request +} + func testFirehoseRequest(requestID string, records []firehoseRecord) firehoseRequest { return firehoseRequest{ RequestID: requestID, From 941cbeff3388b4c28a52197e5eb144b3539acb93 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 9 Jan 2025 12:16:14 +0800 Subject: [PATCH 2/8] Refactor unmarshalers to fit into encoding framework The internal unmarshalers now implement plog.Unmarshaler and pmetric.Unmarshaler. This will enable extracting them later as encoding extensions; for now they remain embedded within the receiver. As a result of the interface change, the unmarshalers now unmarshal a single record at a time, which means we cannot merge resources/metrics as we go, but only after each record. This also fixes a bug in the cwmetrics unmarshaller where the unit of a metric was not considered part of its identity, and so two metrics that differed only by unit would be merged. Optimise all the things: - Use json-iterator for decoding JSON - Use klauspost/compress for decompressing gzip - Pool gzip readers - Remove pointer type from cwMetricValue to avoid allocation - Don't read the whole request body into memory - Reuse buffer for decoding base64; decode as we go - Implement more efficient metrics merging --- .chloggen/firehose-unmarshal-record.yaml | 27 +++ .../awsfirehosereceiver/benchmark_test.go | 7 +- receiver/awsfirehosereceiver/factory.go | 11 +- receiver/awsfirehosereceiver/go.mod | 13 +- receiver/awsfirehosereceiver/go.sum | 2 + .../cwlog/compression/compression.go | 48 ---- .../internal/unmarshaler/cwlog/logsbuilder.go | 54 ----- .../internal/unmarshaler/cwlog/unmarshaler.go | 143 +++++++----- .../unmarshaler/cwlog/unmarshaler_test.go | 27 ++- .../unmarshaler/cwmetricstream/cwmetric.go | 17 +- .../cwmetricstream/metricsbuilder.go | 160 ------------- .../cwmetricstream/metricsbuilder_test.go | 212 ------------------ .../unmarshaler/cwmetricstream/unmarshaler.go | 185 ++++++++++----- .../cwmetricstream/unmarshaler_test.go | 122 +++++++++- .../otlpmetricstream/unmarshaler.go | 40 ++-- .../otlpmetricstream/unmarshaler_test.go | 36 ++- .../internal/unmarshaler/unmarshaler.go | 27 --- .../unmarshalertest/nop_logs_unmarshaler.go | 10 +- .../nop_logs_unmarshaler_test.go | 6 +- .../nop_metrics_unmarshaler.go | 10 +- .../nop_metrics_unmarshaler_test.go | 6 +- receiver/awsfirehosereceiver/logs_receiver.go | 62 ++--- .../awsfirehosereceiver/logs_receiver_test.go | 118 ++++++++-- .../awsfirehosereceiver/metrics_receiver.go | 54 +++-- .../metrics_receiver_test.go | 143 +++++++++--- receiver/awsfirehosereceiver/receiver.go | 69 +++--- receiver/awsfirehosereceiver/receiver_test.go | 60 +++-- 27 files changed, 809 insertions(+), 860 deletions(-) create mode 100644 .chloggen/firehose-unmarshal-record.yaml delete mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go delete mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go delete mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go delete mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go delete mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go diff --git a/.chloggen/firehose-unmarshal-record.yaml b/.chloggen/firehose-unmarshal-record.yaml new file mode 100644 index 0000000000000..4351ba66ce734 --- /dev/null +++ b/.chloggen/firehose-unmarshal-record.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Refactor unmarshallers to implement pdata unmarshaler interfaces + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37361] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awsfirehosereceiver/benchmark_test.go b/receiver/awsfirehosereceiver/benchmark_test.go index cb20d265c79f2..32c485a07cf53 100644 --- a/receiver/awsfirehosereceiver/benchmark_test.go +++ b/receiver/awsfirehosereceiver/benchmark_test.go @@ -15,12 +15,11 @@ import ( "net/http/httptest" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func BenchmarkLogsConsumer_cwlogs(b *testing.B) { @@ -35,7 +34,6 @@ func BenchmarkLogsConsumer_cwlogs(b *testing.B) { // numLogs is the number of CoudWatch log records within a Firehose record. for _, numLogs := range []int{1, 10} { b.Run(fmt.Sprintf("%dresources_%drecords_%dlogs", numLogGroups, numRecords, numLogs), func(b *testing.B) { - config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" r, err := createLogsReceiver( @@ -91,7 +89,6 @@ func BenchmarkMetricsConsumer_cwmetrics(b *testing.B) { // numMetrics is the number of CoudWatch metrics within a Firehose record. for _, numMetrics := range []int{1, 10} { b.Run(fmt.Sprintf("%dresources_%drecords_%dmetrics", numStreams, numRecords, numMetrics), func(b *testing.B) { - config := createDefaultConfig().(*Config) config.Endpoint = "localhost:0" r, err := createMetricsReceiver( diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index 1e6b2347276fe..12801e364fde0 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -10,11 +10,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" @@ -54,19 +55,19 @@ func validateRecordType(recordType string) error { // defaultMetricsUnmarshalers creates a map of the available metrics // unmarshalers. -func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler { +func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler { cwmsu := cwmetricstream.NewUnmarshaler(logger) otlpv1msu := otlpmetricstream.NewUnmarshaler(logger) - return map[string]unmarshaler.MetricsUnmarshaler{ + return map[string]pmetric.Unmarshaler{ cwmsu.Type(): cwmsu, otlpv1msu.Type(): otlpv1msu, } } // defaultLogsUnmarshalers creates a map of the available logs unmarshalers. -func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler { +func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler { u := cwlog.NewUnmarshaler(logger) - return map[string]unmarshaler.LogsUnmarshaler{ + return map[string]plog.Unmarshaler{ u.Type(): u, } } diff --git a/receiver/awsfirehosereceiver/go.mod b/receiver/awsfirehosereceiver/go.mod index cd0540c6556b9..44bcd84b5c28a 100644 --- a/receiver/awsfirehosereceiver/go.mod +++ b/receiver/awsfirehosereceiver/go.mod @@ -4,6 +4,9 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 + github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.17.11 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.117.1-0.20250117002813-e970f8bb1258 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.117.1-0.20250119231113-f07ebc3afb51 go.opentelemetry.io/collector/component/componentstatus v0.117.1-0.20250119231113-f07ebc3afb51 @@ -24,6 +27,7 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect @@ -32,8 +36,6 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect @@ -41,6 +43,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.117.1-0.20250114172347-71aae791d7f8 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect @@ -75,3 +78,9 @@ retract ( v0.76.1 v0.65.0 ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/awsfirehosereceiver/go.sum b/receiver/awsfirehosereceiver/go.sum index 364dbc6143e2f..6b4c71c032cad 100644 --- a/receiver/awsfirehosereceiver/go.sum +++ b/receiver/awsfirehosereceiver/go.sum @@ -1,3 +1,5 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go deleted file mode 100644 index 2ebca77861dd2..0000000000000 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package compression // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" - -import ( - "bytes" - "compress/gzip" -) - -// Zip returns a gzip-compressed representation of the input bytes. -func Zip(data []byte) ([]byte, error) { - var b bytes.Buffer - w := gzip.NewWriter(&b) - - _, err := w.Write(data) - if err != nil { - return nil, err - } - - if err = w.Flush(); err != nil { - return nil, err - } - - if err = w.Close(); err != nil { - return nil, err - } - - return b.Bytes(), nil -} - -// Unzip expects gzip-compressed input bytes and returns their uncompressed form. -func Unzip(data []byte) ([]byte, error) { - b := bytes.NewBuffer(data) - - r, err := gzip.NewReader(b) - if err != nil { - return nil, err - } - - var rv bytes.Buffer - _, err = rv.ReadFrom(r) - if err != nil { - return nil, err - } - - return rv.Bytes(), nil -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go deleted file mode 100644 index 5dc7a3db59f85..0000000000000 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" - -import ( - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" -) - -const ( - attributeAWSCloudWatchLogGroupName = "aws.cloudwatch.log_group_name" - attributeAWSCloudWatchLogStreamName = "aws.cloudwatch.log_stream_name" -) - -// resourceAttributes are the CloudWatch log attributes that define a unique resource. -type resourceAttributes struct { - owner, logGroup, logStream string -} - -// resourceLogsBuilder provides convenient access to the a Resource's LogRecordSlice. -type resourceLogsBuilder struct { - rls plog.LogRecordSlice -} - -// setAttributes applies the resourceAttributes to the provided Resource. -func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) { - attrs := resource.Attributes() - attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner) - attrs.PutStr(attributeAWSCloudWatchLogGroupName, ra.logGroup) - attrs.PutStr(attributeAWSCloudWatchLogStreamName, ra.logStream) -} - -// newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. -func newResourceLogsBuilder(logs plog.Logs, attrs resourceAttributes) *resourceLogsBuilder { - rls := logs.ResourceLogs().AppendEmpty() - attrs.setAttributes(rls.Resource()) - return &resourceLogsBuilder{rls.ScopeLogs().AppendEmpty().LogRecords()} -} - -// AddLog events to the LogRecordSlice. Resource attributes are captured when creating -// the resourceLogsBuilder, so we only need to consider the LogEvents themselves. -func (rlb *resourceLogsBuilder) AddLog(log cWLog) { - for _, event := range log.LogEvents { - logLine := rlb.rls.AppendEmpty() - // pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds - // but timestamp in cloudwatch logs are in milliseconds. - logLine.SetTimestamp(pcommon.Timestamp(event.Timestamp * int64(time.Millisecond))) - logLine.Body().SetStr(event.Message) - } -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index b3fa132166b53..182f809b7ef59 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -4,101 +4,130 @@ package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" import ( + "bufio" "bytes" - "encoding/json" "errors" + "fmt" + "sync" + "time" + jsoniter "github.com/json-iterator/go" + "github.com/klauspost/compress/gzip" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" ) const ( - TypeStr = "cwlogs" - recordDelimiter = "\n" + TypeStr = "cwlogs" + + attributeAWSCloudWatchLogGroupName = "aws.cloudwatch.log_group_name" + attributeAWSCloudWatchLogStreamName = "aws.cloudwatch.log_stream_name" ) var errInvalidRecords = errors.New("record format invalid") // Unmarshaler for the CloudWatch Log JSON record format. type Unmarshaler struct { - logger *zap.Logger + logger *zap.Logger + gzipPool sync.Pool } -var _ unmarshaler.LogsUnmarshaler = (*Unmarshaler)(nil) +var _ plog.Unmarshaler = (*Unmarshaler)(nil) // NewUnmarshaler creates a new instance of the Unmarshaler. func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { - return &Unmarshaler{logger} + return &Unmarshaler{logger: logger} } // Unmarshal deserializes the records into cWLogs and uses the // resourceLogsBuilder to group them into a single plog.Logs. // Skips invalid cWLogs received in the record and -func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { - md := plog.NewLogs() - builders := make(map[resourceAttributes]*resourceLogsBuilder) - for recordIndex, compressedRecord := range records { - record, err := compression.Unzip(compressedRecord) - if err != nil { - u.logger.Error("Failed to unzip record", +func (u *Unmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error) { + var err error + r, _ := u.gzipPool.Get().(*gzip.Reader) + if r == nil { + r, err = gzip.NewReader(bytes.NewReader(compressedRecord)) + } else { + err = r.Reset(bytes.NewReader(compressedRecord)) + } + if err != nil { + return plog.Logs{}, fmt.Errorf("failed to decompress record: %w", err) + } + defer u.gzipPool.Put(r) + + type resourceKey struct { + owner string + logGroup string + logStream string + } + byResource := make(map[resourceKey]plog.LogRecordSlice) + + // Multiple logs in each record separated by newline character + scanner := bufio.NewScanner(r) + for datumIndex := 0; scanner.Scan(); datumIndex++ { + var log cWLog + if err := jsoniter.ConfigFastest.Unmarshal(scanner.Bytes(), &log); err != nil { + u.logger.Error( + "Unable to unmarshal input", zap.Error(err), - zap.Int("record_index", recordIndex), + zap.Int("datum_index", datumIndex), ) continue } - // Multiple logs in each record separated by newline character - for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { - if len(datum) > 0 { - var log cWLog - err := json.Unmarshal(datum, &log) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - if !u.isValid(log) { - u.logger.Error( - "Invalid log", - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := resourceAttributes{ - owner: log.Owner, - logGroup: log.LogGroup, - logStream: log.LogStream, - } - lb, ok := builders[attrs] - if !ok { - lb = newResourceLogsBuilder(md, attrs) - builders[attrs] = lb - } - lb.AddLog(log) - } + if !isValid(log) { + u.logger.Error( + "Invalid log", + zap.Int("datum_index", datumIndex), + ) + continue + } + + key := resourceKey{ + owner: log.Owner, + logGroup: log.LogGroup, + logStream: log.LogStream, + } + logRecords, ok := byResource[key] + if !ok { + logRecords = plog.NewLogRecordSlice() + byResource[key] = logRecords } - } - if len(builders) == 0 { - return plog.NewLogs(), errInvalidRecords + for _, event := range log.LogEvents { + logRecord := logRecords.AppendEmpty() + // pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds + // but timestamp in cloudwatch logs are in milliseconds. + logRecord.SetTimestamp(pcommon.Timestamp(event.Timestamp * int64(time.Millisecond))) + logRecord.Body().SetStr(event.Message) + } + } + if err := scanner.Err(); err != nil { + return plog.Logs{}, err + } + if len(byResource) == 0 { + return plog.Logs{}, errInvalidRecords } - return md, nil + logs := plog.NewLogs() + for resourceKey, logRecords := range byResource { + rl := logs.ResourceLogs().AppendEmpty() + resourceAttrs := rl.Resource().Attributes() + resourceAttrs.PutStr(conventions.AttributeCloudAccountID, resourceKey.owner) + resourceAttrs.PutStr(attributeAWSCloudWatchLogGroupName, resourceKey.logGroup) + resourceAttrs.PutStr(attributeAWSCloudWatchLogStreamName, resourceKey.logStream) + logRecords.MoveAndAppendTo(rl.ScopeLogs().AppendEmpty().LogRecords()) + } + return logs, nil } // isValid validates that the cWLog has been unmarshalled correctly. -func (u Unmarshaler) isValid(log cWLog) bool { +func isValid(log cWLog) bool { return log.Owner != "" && log.LogGroup != "" && log.LogStream != "" } // Type of the serialized messages. -func (u Unmarshaler) Type() string { +func (u *Unmarshaler) Type() string { return TypeStr } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go index 71b49295df609..e1e329440327d 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -4,14 +4,14 @@ package cwlog import ( + "bytes" + "compress/gzip" "os" "path/filepath" "testing" "github.com/stretchr/testify/require" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" ) func TestType(t *testing.T) { @@ -57,11 +57,10 @@ func TestUnmarshal(t *testing.T) { record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) require.NoError(t, err) - compressedRecord, err := compression.Zip(record) + compressedRecord, err := gzipData(record) require.NoError(t, err) - records := [][]byte{compressedRecord} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalLogs(compressedRecord) if testCase.wantErr != nil { require.Error(t, err) require.Equal(t, testCase.wantErr, err) @@ -87,11 +86,10 @@ func TestLogTimestamp(t *testing.T) { record, err := os.ReadFile(filepath.Join(".", "testdata", "single_record")) require.NoError(t, err) - compressedRecord, err := compression.Zip(record) + compressedRecord, err := gzipData(record) require.NoError(t, err) - records := [][]byte{compressedRecord} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalLogs(compressedRecord) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, 1, got.ResourceLogs().Len()) @@ -103,3 +101,16 @@ func TestLogTimestamp(t *testing.T) { expectedTimestamp := "2024-09-05 13:47:15.523 +0000 UTC" require.Equal(t, expectedTimestamp, ilm.LogRecords().At(0).Timestamp().String()) } + +func gzipData(data []byte) ([]byte, error) { + var b bytes.Buffer + w := gzip.NewWriter(&b) + + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return b.Bytes(), nil +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go index 1896fd0c869ee..ae7c546022a43 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go @@ -3,6 +3,10 @@ package cwmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" +import ( + jsoniter "github.com/json-iterator/go" +) + // The cWMetric is the format for the CloudWatch metric stream records. // // More details can be found at: @@ -26,7 +30,7 @@ type cWMetric struct { Timestamp int64 `json:"timestamp"` // Value is the cWMetricValue, which has the min, max, // sum, and count. - Value *cWMetricValue `json:"value"` + Value cWMetricValue `json:"value"` // Unit is the unit for the metric. // // More details can be found at: @@ -36,6 +40,8 @@ type cWMetric struct { // The cWMetricValue is the actual values of the CloudWatch metric. type cWMetricValue struct { + isSet bool + // Max is the highest value observed. Max float64 `json:"max"` // Min is the lowest value observed. @@ -45,3 +51,12 @@ type cWMetricValue struct { // Count is the number of data points. Count float64 `json:"count"` } + +func (v *cWMetricValue) UnmarshalJSON(data []byte) error { + type valueType cWMetricValue + if err := jsoniter.ConfigFastest.Unmarshal(data, (*valueType)(v)); err != nil { + return err + } + v.isSet = true + return nil +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go deleted file mode 100644 index c39a2c716ee97..0000000000000 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package cwmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" - -import ( - "fmt" - "strings" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - conventions "go.opentelemetry.io/collector/semconv/v1.27.0" -) - -const ( - attributeAWSCloudWatchMetricStreamName = "aws.cloudwatch.metric_stream_name" - dimensionInstanceID = "InstanceId" - namespaceDelimiter = "/" -) - -// resourceAttributes are the CloudWatch metric stream attributes that define a -// unique resource. -type resourceAttributes struct { - // metricStreamName is the metric stream name. - metricStreamName string - // accountID is the AWS account ID. - accountID string - // region is the AWS region. - region string - // namespace is the CloudWatch metric namespace. - namespace string -} - -// The resourceMetricsBuilder is used to aggregate metrics for the -// same resourceAttributes. -type resourceMetricsBuilder struct { - rms pmetric.MetricSlice - // metricBuilders is the map of metrics within the same - // resource group. - metricBuilders map[string]*metricBuilder -} - -// newResourceMetricsBuilder creates a resourceMetricsBuilder with the -// resourceAttributes. -func newResourceMetricsBuilder(md pmetric.Metrics, attrs resourceAttributes) *resourceMetricsBuilder { - rms := md.ResourceMetrics().AppendEmpty() - attrs.setAttributes(rms.Resource()) - return &resourceMetricsBuilder{ - rms: rms.ScopeMetrics().AppendEmpty().Metrics(), - metricBuilders: make(map[string]*metricBuilder), - } -} - -// AddMetric adds a metric to one of the metric builders based on -// the key generated for each. -func (rmb *resourceMetricsBuilder) AddMetric(metric cWMetric) { - mb, ok := rmb.metricBuilders[metric.MetricName] - if !ok { - mb = newMetricBuilder(rmb.rms, metric.MetricName, metric.Unit) - rmb.metricBuilders[metric.MetricName] = mb - } - mb.AddDataPoint(metric) -} - -// setAttributes creates a pcommon.Resource from the fields in the resourceMetricsBuilder. -func (rmb *resourceAttributes) setAttributes(resource pcommon.Resource) { - attributes := resource.Attributes() - attributes.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) - attributes.PutStr(conventions.AttributeCloudAccountID, rmb.accountID) - attributes.PutStr(conventions.AttributeCloudRegion, rmb.region) - serviceNamespace, serviceName := toServiceAttributes(rmb.namespace) - if serviceNamespace != "" { - attributes.PutStr(conventions.AttributeServiceNamespace, serviceNamespace) - } - attributes.PutStr(conventions.AttributeServiceName, serviceName) - attributes.PutStr(attributeAWSCloudWatchMetricStreamName, rmb.metricStreamName) -} - -// toServiceAttributes splits the CloudWatch namespace into service namespace/name -// if prepended by AWS/. Otherwise, it returns the CloudWatch namespace as the -// service name with an empty service namespace -func toServiceAttributes(namespace string) (serviceNamespace, serviceName string) { - index := strings.Index(namespace, namespaceDelimiter) - if index != -1 && strings.EqualFold(namespace[:index], conventions.AttributeCloudProviderAWS) { - return namespace[:index], namespace[index+1:] - } - return "", namespace -} - -// dataPointKey combines the dimensions and timestamps to create a key -// used to prevent duplicate metrics. -type dataPointKey struct { - // timestamp is the milliseconds since epoch - timestamp int64 - // dimensions is the string representation of the metric dimensions. - // fmt guarantees key-sorted order when printing a map. - dimensions string -} - -// The metricBuilder aggregates metrics of the same name and unit -// into data points. -type metricBuilder struct { - metric pmetric.Metric - // seen is the set of added data point keys. - seen map[dataPointKey]bool -} - -// newMetricBuilder creates a metricBuilder with the name and unit. -func newMetricBuilder(rms pmetric.MetricSlice, name, unit string) *metricBuilder { - m := rms.AppendEmpty() - m.SetName(name) - m.SetUnit(unit) - m.SetEmptySummary() - return &metricBuilder{ - metric: m, - seen: make(map[dataPointKey]bool), - } -} - -// AddDataPoint adds the metric as a datapoint if a metric for that timestamp -// hasn't already been added. -func (mb *metricBuilder) AddDataPoint(metric cWMetric) { - key := dataPointKey{ - timestamp: metric.Timestamp, - dimensions: fmt.Sprint(metric.Dimensions), - } - if _, ok := mb.seen[key]; !ok { - mb.toDataPoint(mb.metric.Summary().DataPoints().AppendEmpty(), metric) - mb.seen[key] = true - } -} - -// toDataPoint converts a cWMetric into a pdata datapoint and attaches the -// dimensions as attributes. -func (mb *metricBuilder) toDataPoint(dp pmetric.SummaryDataPoint, metric cWMetric) { - dp.SetCount(uint64(metric.Value.Count)) - dp.SetSum(metric.Value.Sum) - qv := dp.QuantileValues() - minQ := qv.AppendEmpty() - minQ.SetQuantile(0) - minQ.SetValue(metric.Value.Min) - maxQ := qv.AppendEmpty() - maxQ.SetQuantile(1) - maxQ.SetValue(metric.Value.Max) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(metric.Timestamp))) - for k, v := range metric.Dimensions { - dp.Attributes().PutStr(ToSemConvAttributeKey(k), v) - } -} - -// ToSemConvAttributeKey maps some common keys to semantic convention attributes. -func ToSemConvAttributeKey(key string) string { - switch key { - case dimensionInstanceID: - return conventions.AttributeServiceInstanceID - default: - return key - } -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go deleted file mode 100644 index 114554648f7d2..0000000000000 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package cwmetricstream - -import ( - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pmetric" - conventions "go.opentelemetry.io/collector/semconv/v1.27.0" -) - -const ( - testRegion = "us-east-1" - testAccountID = "1234567890" - testStreamName = "MyMetricStream" - testInstanceID = "i-1234567890abcdef0" -) - -func TestToSemConvAttributeKey(t *testing.T) { - testCases := map[string]struct { - key string - want string - }{ - "WithValidKey": { - key: "InstanceId", - want: conventions.AttributeServiceInstanceID, - }, - "WithInvalidKey": { - key: "CustomDimension", - want: "CustomDimension", - }, - } - - for name, testCase := range testCases { - t.Run(name, func(t *testing.T) { - got := ToSemConvAttributeKey(testCase.key) - require.Equal(t, testCase.want, got) - }) - } -} - -func TestMetricBuilder(t *testing.T) { - t.Run("WithSingleMetric", func(t *testing.T) { - metric := cWMetric{ - MetricName: "name", - Unit: "unit", - Timestamp: time.Now().UnixMilli(), - Value: testCWMetricValue(), - Dimensions: map[string]string{"CustomDimension": "test"}, - } - gots := pmetric.NewMetricSlice() - mb := newMetricBuilder(gots, metric.MetricName, metric.Unit) - mb.AddDataPoint(metric) - require.Equal(t, 1, gots.Len()) - got := gots.At(0) - require.Equal(t, metric.MetricName, got.Name()) - require.Equal(t, metric.Unit, got.Unit()) - require.Equal(t, pmetric.MetricTypeSummary, got.Type()) - gotDps := got.Summary().DataPoints() - require.Equal(t, 1, gotDps.Len()) - gotDp := gotDps.At(0) - require.Equal(t, uint64(metric.Value.Count), gotDp.Count()) - require.Equal(t, metric.Value.Sum, gotDp.Sum()) - gotQv := gotDp.QuantileValues() - require.Equal(t, 2, gotQv.Len()) - require.Equal(t, []float64{metric.Value.Min, metric.Value.Max}, []float64{gotQv.At(0).Value(), gotQv.At(1).Value()}) - require.Equal(t, 1, gotDp.Attributes().Len()) - }) - t.Run("WithTimestampCollision", func(t *testing.T) { - timestamp := time.Now().UnixMilli() - metrics := []cWMetric{ - { - Timestamp: timestamp, - Value: testCWMetricValue(), - Dimensions: map[string]string{ - "AccountId": testAccountID, - "Region": testRegion, - "InstanceId": testInstanceID, - }, - }, - { - Timestamp: timestamp, - Value: testCWMetricValue(), - Dimensions: map[string]string{ - "InstanceId": testInstanceID, - "AccountId": testAccountID, - "Region": testRegion, - }, - }, - } - gots := pmetric.NewMetricSlice() - mb := newMetricBuilder(gots, "name", "unit") - for _, metric := range metrics { - mb.AddDataPoint(metric) - } - require.Equal(t, 1, gots.Len()) - got := gots.At(0) - gotDps := got.Summary().DataPoints() - require.Equal(t, 1, gotDps.Len()) - gotDp := gotDps.At(0) - require.Equal(t, uint64(metrics[0].Value.Count), gotDp.Count()) - require.Equal(t, metrics[0].Value.Sum, gotDp.Sum()) - require.Equal(t, 3, gotDp.Attributes().Len()) - }) -} - -func TestResourceMetricsBuilder(t *testing.T) { - testCases := map[string]struct { - namespace string - wantAttributes map[string]string - }{ - "WithAwsNamespace": { - namespace: "AWS/EC2", - wantAttributes: map[string]string{ - attributeAWSCloudWatchMetricStreamName: testStreamName, - conventions.AttributeCloudAccountID: testAccountID, - conventions.AttributeCloudRegion: testRegion, - conventions.AttributeServiceName: "EC2", - conventions.AttributeServiceNamespace: "AWS", - }, - }, - "WithCustomNamespace": { - namespace: "CustomNamespace", - wantAttributes: map[string]string{ - attributeAWSCloudWatchMetricStreamName: testStreamName, - conventions.AttributeCloudAccountID: testAccountID, - conventions.AttributeCloudRegion: testRegion, - conventions.AttributeServiceName: "CustomNamespace", - conventions.AttributeServiceNamespace: "", - }, - }, - } - for name, testCase := range testCases { - t.Run(name, func(t *testing.T) { - metric := cWMetric{ - MetricName: "name", - Unit: "unit", - Timestamp: time.Now().UnixMilli(), - Value: testCWMetricValue(), - Dimensions: map[string]string{}, - } - attrs := resourceAttributes{ - metricStreamName: testStreamName, - accountID: testAccountID, - region: testRegion, - namespace: testCase.namespace, - } - gots := pmetric.NewMetrics() - rmb := newResourceMetricsBuilder(gots, attrs) - rmb.AddMetric(metric) - require.Equal(t, 1, gots.ResourceMetrics().Len()) - got := gots.ResourceMetrics().At(0) - gotAttrs := got.Resource().Attributes() - for wantKey, wantValue := range testCase.wantAttributes { - gotValue, ok := gotAttrs.Get(wantKey) - if wantValue != "" { - require.True(t, ok) - require.Equal(t, wantValue, gotValue.AsString()) - } else { - require.False(t, ok) - } - } - }) - } - t.Run("WithSameMetricDifferentDimensions", func(t *testing.T) { - metrics := []cWMetric{ - { - MetricName: "name", - Unit: "unit", - Timestamp: time.Now().UnixMilli(), - Value: testCWMetricValue(), - Dimensions: map[string]string{}, - }, - { - MetricName: "name", - Unit: "unit", - Timestamp: time.Now().Add(time.Second * 3).UnixMilli(), - Value: testCWMetricValue(), - Dimensions: map[string]string{ - "CustomDimension": "value", - }, - }, - } - attrs := resourceAttributes{ - metricStreamName: testStreamName, - accountID: testAccountID, - region: testRegion, - namespace: "AWS/EC2", - } - gots := pmetric.NewMetrics() - rmb := newResourceMetricsBuilder(gots, attrs) - for _, metric := range metrics { - rmb.AddMetric(metric) - } - require.Equal(t, 1, gots.ResourceMetrics().Len()) - got := gots.ResourceMetrics().At(0) - require.Equal(t, 1, got.ScopeMetrics().Len()) - gotMetrics := got.ScopeMetrics().At(0).Metrics() - require.Equal(t, 1, gotMetrics.Len()) - gotDps := gotMetrics.At(0).Summary().DataPoints() - require.Equal(t, 2, gotDps.Len()) - }) -} - -// testCWMetricValue is a convenience function for creating a test cWMetricValue -func testCWMetricValue() *cWMetricValue { - return &cWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))} -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index 110ef4afc0aa0..17fae59fd5354 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -4,19 +4,25 @@ package cwmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" import ( + "bufio" "bytes" - "encoding/json" "errors" + "strings" + "time" + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" ) const ( - TypeStr = "cwmetrics" - recordDelimiter = "\n" + TypeStr = "cwmetrics" + + attributeAWSCloudWatchMetricStreamName = "aws.cloudwatch.metric_stream_name" + dimensionInstanceID = "InstanceId" + namespaceDelimiter = "/" ) var errInvalidRecords = errors.New("record format invalid") @@ -29,7 +35,7 @@ type Unmarshaler struct { logger *zap.Logger } -var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil) +var _ pmetric.Unmarshaler = (*Unmarshaler)(nil) // NewUnmarshaler creates a new instance of the Unmarshaler. func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { @@ -39,61 +45,140 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { // Unmarshal deserializes the records into cWMetrics and uses the // resourceMetricsBuilder to group them into a single pmetric.Metrics. // Skips invalid cWMetrics received in the record and -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { - md := pmetric.NewMetrics() - builders := make(map[resourceAttributes]*resourceMetricsBuilder) - for recordIndex, record := range records { - // Multiple metrics in each record separated by newline character - for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { - if len(datum) > 0 { - var metric cWMetric - err := json.Unmarshal(datum, &metric) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - if !u.isValid(metric) { - u.logger.Error( - "Invalid metric", - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := resourceAttributes{ - metricStreamName: metric.MetricStreamName, - namespace: metric.Namespace, - accountID: metric.AccountID, - region: metric.Region, - } - mb, ok := builders[attrs] - if !ok { - mb = newResourceMetricsBuilder(md, attrs) - builders[attrs] = mb - } - mb.AddMetric(metric) - } - } +func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { + type metricKey struct { + name string + unit string } + byResource := make(map[resourceKey]map[metricKey]pmetric.Metric) + + // Multiple metrics in each record separated by newline character + scanner := bufio.NewScanner(bytes.NewReader(record)) + for datumIndex := 0; scanner.Scan(); datumIndex++ { + var cwMetric cWMetric + if err := jsoniter.ConfigFastest.Unmarshal(scanner.Bytes(), &cwMetric); err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + ) + continue + } + if !u.isValid(cwMetric) { + u.logger.Error( + "Invalid metric", + zap.Int("datum_index", datumIndex), + ) + continue + } + + rkey := resourceKey{ + metricStreamName: cwMetric.MetricStreamName, + namespace: cwMetric.Namespace, + accountID: cwMetric.AccountID, + region: cwMetric.Region, + } + metrics, ok := byResource[rkey] + if !ok { + metrics = make(map[metricKey]pmetric.Metric) + byResource[rkey] = metrics + } - if len(builders) == 0 { - return pmetric.NewMetrics(), errInvalidRecords + mkey := metricKey{ + name: cwMetric.MetricName, + unit: cwMetric.Unit, + } + metric, ok := metrics[mkey] + if !ok { + metric = pmetric.NewMetric() + metric.SetName(mkey.name) + metric.SetUnit(mkey.unit) + metric.SetEmptySummary() + metrics[mkey] = metric + } + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(cwMetric.Timestamp))) + setDataPointAttributes(cwMetric, dp) + dp.SetCount(uint64(cwMetric.Value.Count)) + dp.SetSum(cwMetric.Value.Sum) + minQ := dp.QuantileValues().AppendEmpty() + minQ.SetQuantile(0) + minQ.SetValue(cwMetric.Value.Min) + maxQ := dp.QuantileValues().AppendEmpty() + maxQ.SetQuantile(1) + maxQ.SetValue(cwMetric.Value.Max) + } + if err := scanner.Err(); err != nil { + return pmetric.Metrics{}, err + } + if len(byResource) == 0 { + return pmetric.Metrics{}, errInvalidRecords } - return md, nil + metrics := pmetric.NewMetrics() + for resourceKey, metricsMap := range byResource { + rm := metrics.ResourceMetrics().AppendEmpty() + setResourceAttributes(resourceKey, rm.Resource()) + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + for _, metric := range metricsMap { + metric.MoveTo(scopeMetrics.Metrics().AppendEmpty()) + } + } + return metrics, nil } // isValid validates that the cWMetric has been unmarshalled correctly. func (u Unmarshaler) isValid(metric cWMetric) bool { - return metric.MetricName != "" && metric.Namespace != "" && metric.Unit != "" && metric.Value != nil + return metric.MetricName != "" && metric.Namespace != "" && metric.Unit != "" && metric.Value.isSet } // Type of the serialized messages. func (u Unmarshaler) Type() string { return TypeStr } + +type resourceKey struct { + metricStreamName string + namespace string + accountID string + region string +} + +// setResourceAttributes sets attributes on a pcommon.Resource from a cwMetric. +func setResourceAttributes(key resourceKey, resource pcommon.Resource) { + attributes := resource.Attributes() + attributes.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) + attributes.PutStr(conventions.AttributeCloudAccountID, key.accountID) + attributes.PutStr(conventions.AttributeCloudRegion, key.region) + serviceNamespace, serviceName := toServiceAttributes(key.namespace) + if serviceNamespace != "" { + attributes.PutStr(conventions.AttributeServiceNamespace, serviceNamespace) + } + attributes.PutStr(conventions.AttributeServiceName, serviceName) + attributes.PutStr(attributeAWSCloudWatchMetricStreamName, key.metricStreamName) +} + +// toServiceAttributes splits the CloudWatch namespace into service namespace/name +// if prepended by AWS/. Otherwise, it returns the CloudWatch namespace as the +// service name with an empty service namespace +func toServiceAttributes(namespace string) (serviceNamespace, serviceName string) { + index := strings.Index(namespace, namespaceDelimiter) + if index != -1 && strings.EqualFold(namespace[:index], conventions.AttributeCloudProviderAWS) { + return namespace[:index], namespace[index+1:] + } + return "", namespace +} + +// setResourceAttributes sets attributes on a metric data point from a cwMetric. +func setDataPointAttributes(m cWMetric, dp pmetric.SummaryDataPoint) { + attrs := dp.Attributes() + for k, v := range m.Dimensions { + switch k { + case dimensionInstanceID: + attrs.PutStr(conventions.AttributeServiceInstanceID, v) + default: + attrs.PutStr(k, v) + } + } +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index 16a517db17566..37d850de7afa3 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -8,10 +8,22 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "go.uber.org/zap" ) +const ( + testRegion = "us-east-1" + testAccountID = "1234567890" + testStreamName = "MyMetricStream" + testInstanceID = "i-1234567890abcdef0" +) + func TestType(t *testing.T) { unmarshaler := NewUnmarshaler(zap.NewNop()) require.Equal(t, TypeStr, unmarshaler.Type()) @@ -45,18 +57,16 @@ func TestUnmarshal(t *testing.T) { "WithSomeInvalidRecords": { filename: "some_invalid_records", wantResourceCount: 5, - wantMetricCount: 35, + wantMetricCount: 36, wantDatapointCount: 88, }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) + record, err := os.ReadFile(filepath.Join("testdata", testCase.filename)) require.NoError(t, err) - records := [][]byte{record} - - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalMetrics(record) if testCase.wantErr != nil { require.Error(t, err) require.Equal(t, testCase.wantErr, err) @@ -82,3 +92,105 @@ func TestUnmarshal(t *testing.T) { }) } } + +func TestUnmarshal_SingleRecord(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + + record, err := os.ReadFile(filepath.Join("testdata", "single_record")) + require.NoError(t, err) + metrics, err := unmarshaler.UnmarshalMetrics(record) + require.NoError(t, err) + + rms := metrics.ResourceMetrics() + require.Equal(t, 1, rms.Len()) + rm := rms.At(0) + + // Check one resource attribute to check things are wired up. + // Remaining resource attributes are checked in TestSetResourceAttributes. + res := rm.Resource() + cloudProvider, ok := res.Attributes().Get(semconv.AttributeCloudProvider) + require.True(t, ok) + assert.Equal(t, semconv.AttributeCloudProviderAWS, cloudProvider.Str()) + require.Equal(t, 1, rm.ScopeMetrics().Len()) + sm := rm.ScopeMetrics().At(0) + + require.Equal(t, 1, sm.Metrics().Len()) + metric := sm.Metrics().At(0) + assert.Equal(t, "DiskWriteOps", metric.Name()) + assert.Equal(t, "Seconds", metric.Unit()) + require.Equal(t, pmetric.MetricTypeSummary, metric.Type()) + summary := metric.Summary() + require.Equal(t, 1, summary.DataPoints().Len()) + dp := summary.DataPoints().At(0) + assert.Equal(t, map[string]any{"service.instance.id": "i-123456789012"}, dp.Attributes().AsRaw()) + assert.Equal(t, pcommon.Timestamp(1611929698000000000), dp.Timestamp()) + assert.Equal(t, uint64(3), dp.Count()) + assert.Equal(t, 20.0, dp.Sum()) + require.Equal(t, 2, dp.QuantileValues().Len()) + q0 := dp.QuantileValues().At(0) + q1 := dp.QuantileValues().At(1) + assert.Equal(t, 0.0, q0.Quantile()) // min + assert.Equal(t, 0.0, q0.Value()) + assert.Equal(t, 1.0, q1.Quantile()) // max + assert.Equal(t, 18.0, q1.Value()) +} + +func TestSetDataPointAttributes(t *testing.T) { + metric := cWMetric{ + Dimensions: map[string]string{ + "InstanceId": testInstanceID, + "CustomDimension": "whatever", + }, + } + want := map[string]any{ + conventions.AttributeServiceInstanceID: testInstanceID, + "CustomDimension": "whatever", + } + + dp := pmetric.NewSummaryDataPoint() + setDataPointAttributes(metric, dp) + require.Equal(t, want, dp.Attributes().AsRaw()) +} + +func TestSetResourceAttributes(t *testing.T) { + testCases := map[string]struct { + namespace string + want map[string]any + }{ + "WithAWSNamespace": { + namespace: "AWS/EC2", + want: map[string]any{ + attributeAWSCloudWatchMetricStreamName: testStreamName, + conventions.AttributeCloudAccountID: testAccountID, + conventions.AttributeCloudRegion: testRegion, + conventions.AttributeCloudProvider: semconv.AttributeCloudProviderAWS, + conventions.AttributeServiceName: "EC2", + conventions.AttributeServiceNamespace: "AWS", + }, + }, + "WithCustomNamespace": { + namespace: "CustomNamespace", + want: map[string]any{ + attributeAWSCloudWatchMetricStreamName: testStreamName, + conventions.AttributeCloudAccountID: testAccountID, + conventions.AttributeCloudRegion: testRegion, + conventions.AttributeCloudProvider: semconv.AttributeCloudProviderAWS, + conventions.AttributeServiceName: "CustomNamespace", + }, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + key := resourceKey{ + accountID: testAccountID, + region: testRegion, + metricStreamName: testStreamName, + namespace: testCase.namespace, + } + + resource := pcommon.NewResource() + setResourceAttributes(key, resource) + require.Equal(t, testCase.want, resource.Attributes().AsRaw()) + }) + } +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go index c3dde9699e905..cc1c131a31b0f 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go @@ -5,13 +5,12 @@ package otlpmetricstream // import "github.com/open-telemetry/opentelemetry-coll import ( "errors" + "fmt" "github.com/gogo/protobuf/proto" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" ) const ( @@ -29,7 +28,7 @@ type Unmarshaler struct { logger *zap.Logger } -var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil) +var _ pmetric.Unmarshaler = (*Unmarshaler)(nil) // NewUnmarshaler creates a new instance of the Unmarshaler. func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { @@ -37,29 +36,22 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { } // Unmarshal deserializes the records into pmetric.Metrics -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { +func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() - for recordIndex, record := range records { - dataLen, pos := len(record), 0 - for pos < dataLen { - n, nLen := proto.DecodeVarint(record) - if nLen == 0 && n == 0 { - return md, errInvalidOTLPFormatStart - } - req := pmetricotlp.NewExportRequest() - pos += nLen - err := req.UnmarshalProto(record[pos : pos+int(n)]) - pos += int(n) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("record_index", recordIndex), - ) - continue - } - req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) + dataLen, pos := len(record), 0 + for pos < dataLen { + n, nLen := proto.DecodeVarint(record) + if nLen == 0 && n == 0 { + return md, errInvalidOTLPFormatStart + } + req := pmetricotlp.NewExportRequest() + pos += nLen + err := req.UnmarshalProto(record[pos : pos+int(n)]) + pos += int(n) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("unable to unmarshal input: %w", err) } + req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) } return md, nil diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go index bc11270099978..9e781953b9d4e 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go @@ -4,6 +4,7 @@ package otlpmetricstream import ( + "slices" "testing" "time" @@ -45,62 +46,51 @@ func createMetricRecord() []byte { func TestUnmarshal(t *testing.T) { unmarshaler := NewUnmarshaler(zap.NewNop()) testCases := map[string]struct { - records [][]byte + record []byte wantResourceCount int wantMetricCount int wantDatapointCount int - wantErr error + wantErr string }{ "WithSingleRecord": { - records: [][]byte{ - createMetricRecord(), - }, + record: createMetricRecord(), wantResourceCount: 1, wantMetricCount: 1, wantDatapointCount: 1, }, "WithMultipleRecords": { - records: [][]byte{ + record: slices.Concat( createMetricRecord(), createMetricRecord(), createMetricRecord(), createMetricRecord(), createMetricRecord(), createMetricRecord(), - }, + ), wantResourceCount: 6, wantMetricCount: 6, wantDatapointCount: 6, }, "WithEmptyRecord": { - records: make([][]byte, 0), + record: []byte{}, wantResourceCount: 0, wantMetricCount: 0, wantDatapointCount: 0, }, - "WithInvalidRecords": { - records: [][]byte{{1, 2}}, + "WithInvalidRecord": { + record: []byte{1, 2}, wantResourceCount: 0, wantMetricCount: 0, wantDatapointCount: 0, - }, - "WithSomeInvalidRecords": { - records: [][]byte{ - createMetricRecord(), - {1, 2}, - createMetricRecord(), - }, - wantResourceCount: 2, - wantMetricCount: 2, - wantDatapointCount: 2, + wantErr: "unable to unmarshal input: proto: ExportMetricsServiceRequest: illegal tag 0 (wire type 2)", }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - got, err := unmarshaler.Unmarshal(testCase.records) - if testCase.wantErr != nil { + got, err := unmarshaler.UnmarshalMetrics(testCase.record) + if testCase.wantErr != "" { require.Error(t, err) - require.Equal(t, testCase.wantErr, err) + require.EqualError(t, err, testCase.wantErr) } else { require.NoError(t, err) require.NotNil(t, got) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go deleted file mode 100644 index 0ffb4b0a80e8b..0000000000000 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package unmarshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" - -import ( - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -// MetricsUnmarshaler deserializes the message body -type MetricsUnmarshaler interface { - // Unmarshal deserializes the records into metrics. - Unmarshal(records [][]byte) (pmetric.Metrics, error) - - // Type of the serialized messages. - Type() string -} - -// LogsUnmarshaler deserializes the message body -type LogsUnmarshaler interface { - // Unmarshal deserializes the records into logs. - Unmarshal(records [][]byte) (plog.Logs, error) - - // Type of the serialized messages. - Type() string -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go index 79f29caecfdbf..9bf64900ac77f 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go @@ -5,8 +5,6 @@ package unmarshalertest // import "github.com/open-telemetry/opentelemetry-colle import ( "go.opentelemetry.io/collector/pdata/plog" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" ) // NopLogsUnmarshaler is a LogsUnmarshaler that doesn't do anything @@ -16,12 +14,12 @@ type NopLogsUnmarshaler struct { err error } -var _ unmarshaler.LogsUnmarshaler = (*NopLogsUnmarshaler)(nil) +var _ plog.Unmarshaler = (*NopLogsUnmarshaler)(nil) // NewNopLogs provides a nop logs unmarshaler with the default // plog.Logs and no error. func NewNopLogs() *NopLogsUnmarshaler { - return &NopLogsUnmarshaler{} + return &NopLogsUnmarshaler{logs: plog.NewLogs()} } // NewWithLogs provides a nop logs unmarshaler with the passed @@ -33,11 +31,11 @@ func NewWithLogs(logs plog.Logs) *NopLogsUnmarshaler { // NewErrLogs provides a nop logs unmarshaler with the passed // in error as the Unmarshal error. func NewErrLogs(err error) *NopLogsUnmarshaler { - return &NopLogsUnmarshaler{err: err} + return &NopLogsUnmarshaler{logs: plog.NewLogs(), err: err} } // Unmarshal deserializes the records into logs. -func (u *NopLogsUnmarshaler) Unmarshal([][]byte) (plog.Logs, error) { +func (u *NopLogsUnmarshaler) UnmarshalLogs([]byte) (plog.Logs, error) { return u.logs, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go index ce90c351cfbd5..38aefcbed6da8 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go @@ -13,7 +13,7 @@ import ( func TestNewNopLogs(t *testing.T) { unmarshaler := NewNopLogs() - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs(nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, typeStr, unmarshaler.Type()) @@ -23,7 +23,7 @@ func TestNewWithLogs(t *testing.T) { logs := plog.NewLogs() logs.ResourceLogs().AppendEmpty() unmarshaler := NewWithLogs(logs) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs(nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, logs, got) @@ -33,7 +33,7 @@ func TestNewWithLogs(t *testing.T) { func TestNewErrLogs(t *testing.T) { wantErr := errors.New("test error") unmarshaler := NewErrLogs(wantErr) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalLogs(nil) require.Error(t, err) require.Equal(t, wantErr, err) require.NotNil(t, got) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go index a8f5c36319e37..0ad0559ac64ac 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go @@ -5,8 +5,6 @@ package unmarshalertest // import "github.com/open-telemetry/opentelemetry-colle import ( "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" ) const typeStr = "nop" @@ -18,12 +16,12 @@ type NopMetricsUnmarshaler struct { err error } -var _ unmarshaler.MetricsUnmarshaler = (*NopMetricsUnmarshaler)(nil) +var _ pmetric.Unmarshaler = (*NopMetricsUnmarshaler)(nil) // NewNopMetrics provides a nop metrics unmarshaler with the default // pmetric.Metrics and no error. func NewNopMetrics() *NopMetricsUnmarshaler { - return &NopMetricsUnmarshaler{} + return &NopMetricsUnmarshaler{metrics: pmetric.NewMetrics()} } // NewWithMetrics provides a nop metrics unmarshaler with the passed @@ -35,11 +33,11 @@ func NewWithMetrics(metrics pmetric.Metrics) *NopMetricsUnmarshaler { // NewErrMetrics provides a nop metrics unmarshaler with the passed // in error as the Unmarshal error. func NewErrMetrics(err error) *NopMetricsUnmarshaler { - return &NopMetricsUnmarshaler{err: err} + return &NopMetricsUnmarshaler{metrics: pmetric.NewMetrics(), err: err} } // Unmarshal deserializes the records into metrics. -func (u *NopMetricsUnmarshaler) Unmarshal([][]byte) (pmetric.Metrics, error) { +func (u *NopMetricsUnmarshaler) UnmarshalMetrics([]byte) (pmetric.Metrics, error) { return u.metrics, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go index 572c39bc475c1..05739108ac18f 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go @@ -13,7 +13,7 @@ import ( func TestNewNopMetrics(t *testing.T) { unmarshaler := NewNopMetrics() - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics(nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, typeStr, unmarshaler.Type()) @@ -23,7 +23,7 @@ func TestNewWithMetrics(t *testing.T) { metrics := pmetric.NewMetrics() metrics.ResourceMetrics().AppendEmpty() unmarshaler := NewWithMetrics(metrics) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics(nil) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, metrics, got) @@ -33,7 +33,7 @@ func TestNewWithMetrics(t *testing.T) { func TestNewErrMetrics(t *testing.T) { wantErr := errors.New("test error") unmarshaler := NewErrMetrics(wantErr) - got, err := unmarshaler.Unmarshal(nil) + got, err := unmarshaler.UnmarshalMetrics(nil) require.Error(t, err) require.Equal(t, wantErr, err) require.NotNil(t, got) diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 570e6cf1e7459..98e5c49f9bcc2 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -5,13 +5,16 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c import ( "context" + "errors" + "fmt" + "io" "net/http" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" ) @@ -23,9 +26,9 @@ type logsConsumer struct { // consumer passes the translated logs on to the // next consumer. consumer consumer.Logs - // unmarshaler is the configured LogsUnmarshaler + // unmarshaler is the configured plog.Unmarshaler // to use when processing the records. - unmarshaler unmarshaler.LogsUnmarshaler + unmarshaler plog.Unmarshaler } var _ firehoseConsumer = (*logsConsumer)(nil) @@ -35,7 +38,7 @@ var _ firehoseConsumer = (*logsConsumer)(nil) func newLogsReceiver( config *Config, set receiver.Settings, - unmarshalers map[string]unmarshaler.LogsUnmarshaler, + unmarshalers map[string]plog.Unmarshaler, nextConsumer consumer.Logs, ) (receiver.Logs, error) { recordType := config.RecordType @@ -44,47 +47,50 @@ func newLogsReceiver( } configuredUnmarshaler := unmarshalers[recordType] if configuredUnmarshaler == nil { - return nil, errUnrecognizedRecordType + return nil, fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, recordType) } - mc := &logsConsumer{ + c := &logsConsumer{ consumer: nextConsumer, unmarshaler: configuredUnmarshaler, } - return &firehoseReceiver{ settings: set, config: config, - consumer: mc, + consumer: c, }, nil } -// Consume uses the configured unmarshaler to deserialize the records into a -// single plog.Logs. It will send the final result -// to the next consumer. -func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) - if err != nil { - return http.StatusBadRequest, err - } +// Consume uses the configured unmarshaler to unmarshal each record +// into a plog.Logs and pass it to the next consumer, one record at a time. +func (c *logsConsumer) Consume(ctx context.Context, nextRecord nextRecordFunc, commonAttributes map[string]string) (int, error) { + for { + record, err := nextRecord() + if errors.Is(err, io.EOF) { + break + } + logs, err := c.unmarshaler.UnmarshalLogs(record) + if err != nil { + return http.StatusBadRequest, err + } - if commonAttributes != nil { - for i := 0; i < md.ResourceLogs().Len(); i++ { - rm := md.ResourceLogs().At(i) - for k, v := range commonAttributes { - if _, found := rm.Resource().Attributes().Get(k); !found { - rm.Resource().Attributes().PutStr(k, v) + if commonAttributes != nil { + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rm := logs.ResourceLogs().At(i) + for k, v := range commonAttributes { + if _, found := rm.Resource().Attributes().Get(k); !found { + rm.Resource().Attributes().PutStr(k, v) + } } } } - } - err = mc.consumer.ConsumeLogs(ctx, md) - if err != nil { - if consumererror.IsPermanent(err) { - return http.StatusBadRequest, err + if err := c.consumer.ConsumeLogs(ctx, logs); err != nil { + if consumererror.IsPermanent(err) { + return http.StatusBadRequest, err + } + return http.StatusServiceUnavailable, err } - return http.StatusServiceUnavailable, err } return http.StatusOK, nil } diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go index 6739f81379292..3fe642794982a 100644 --- a/receiver/awsfirehosereceiver/logs_receiver_test.go +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -9,25 +9,28 @@ import ( "net/http" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest" ) type logsRecordConsumer struct { - result plog.Logs + results []plog.Logs } var _ consumer.Logs = (*logsRecordConsumer)(nil) func (rc *logsRecordConsumer) ConsumeLogs(_ context.Context, logs plog.Logs) error { - rc.result = logs + rc.results = append(rc.results, logs) return nil } @@ -35,16 +38,27 @@ func (rc *logsRecordConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -func TestNewLogsReceiver(t *testing.T) { +func TestLogsReceiver_Start(t *testing.T) { + unmarshalers := map[string]plog.Unmarshaler{ + "cwlogs": &cwlog.Unmarshaler{}, + "otlp_logs": &plog.ProtoUnmarshaler{}, + } + testCases := map[string]struct { - consumer consumer.Logs - recordType string - wantErr error + recordType string + wantUnmarshalerType plog.Unmarshaler + wantErr string }{ - "WithInvalidRecordType": { - consumer: consumertest.NewNop(), - recordType: "test", - wantErr: errUnrecognizedRecordType, + "WithDefaultRecordType": { + wantUnmarshalerType: &cwlog.Unmarshaler{}, + }, + "WithSpecifiedRecordType": { + recordType: "otlp_logs", + wantUnmarshalerType: &plog.ProtoUnmarshaler{}, + }, + "WithUnknownRecordType": { + recordType: "invalid", + wantErr: errUnrecognizedRecordType.Error() + ": recordType = invalid", }, } for name, testCase := range testCases { @@ -54,20 +68,22 @@ func TestNewLogsReceiver(t *testing.T) { got, err := newLogsReceiver( cfg, receivertest.NewNopSettings(), - defaultLogsUnmarshalers(zap.NewNop()), - testCase.consumer, + unmarshalers, + consumertest.NewNop(), ) - require.Equal(t, testCase.wantErr, err) - if testCase.wantErr == nil { - require.NotNil(t, got) - } else { + if testCase.wantErr != "" { + require.EqualError(t, err, testCase.wantErr) require.Nil(t, got) + } else { + require.NoError(t, err) + require.NotNil(t, got) + require.IsType(t, &firehoseReceiver{}, got) } }) } } -func TestLogsConsumer(t *testing.T) { +func TestLogsConsumer_Errors(t *testing.T) { testErr := errors.New("test error") testCases := map[string]struct { unmarshalerErr error @@ -96,32 +112,88 @@ func TestLogsConsumer(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - mc := &logsConsumer{ + lc := &logsConsumer{ unmarshaler: unmarshalertest.NewErrLogs(testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) + gotStatus, gotErr := lc.Consume(context.TODO(), newNextRecordFunc([][]byte{{}}), nil) require.Equal(t, testCase.wantStatus, gotStatus) require.Equal(t, testCase.wantErr, gotErr) }) } +} +func TestLogsConsumer(t *testing.T) { t.Run("WithCommonAttributes", func(t *testing.T) { base := plog.NewLogs() base.ResourceLogs().AppendEmpty() rc := logsRecordConsumer{} - mc := &logsConsumer{ + lc := &logsConsumer{ unmarshaler: unmarshalertest.NewWithLogs(base), consumer: &rc, } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ + gotStatus, gotErr := lc.Consume(context.TODO(), newNextRecordFunc([][]byte{{}}), map[string]string{ "CommonAttributes": "Test", }) require.Equal(t, http.StatusOK, gotStatus) require.NoError(t, gotErr) - gotRms := rc.result.ResourceLogs() + require.Len(t, rc.results, 1) + gotRms := rc.results[0].ResourceLogs() require.Equal(t, 1, gotRms.Len()) gotRm := gotRms.At(0) require.Equal(t, 1, gotRm.Resource().Attributes().Len()) }) + t.Run("WithMultipleRecords", func(t *testing.T) { + logs0, logRecords0 := newLogs("service0", "scope0") + logRecords0.AppendEmpty().Body().SetStr("record0") + logRecords0.AppendEmpty().Body().SetStr("record1") + + logs1, logRecords1 := newLogs("service0", "scope0") + logRecords1.AppendEmpty().Body().SetStr("record2") + logRecords1.AppendEmpty().Body().SetStr("record3") + + logsRemaining := []plog.Logs{logs0, logs1} + var unmarshaler unmarshalLogsFunc = func([]byte) (plog.Logs, error) { + logs := logsRemaining[0] + logsRemaining = logsRemaining[1:] + return logs, nil + } + + rc := logsRecordConsumer{} + lc := &logsConsumer{unmarshaler: unmarshaler, consumer: &rc} + nextRecord := newNextRecordFunc(make([][]byte, len(logsRemaining))) + gotStatus, gotErr := lc.Consume(context.Background(), nextRecord, nil) + require.Equal(t, http.StatusOK, gotStatus) + require.NoError(t, gotErr) + require.Len(t, rc.results, 2) + assert.NoError(t, plogtest.CompareLogs(logs0, rc.results[0])) + assert.NoError(t, plogtest.CompareLogs(logs1, rc.results[1])) + }) +} + +func newLogs(serviceName, scopeName string) (plog.Logs, plog.LogRecordSlice) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + newResource(serviceName).MoveTo(resourceLogs.Resource()) + newScope(scopeName).MoveTo(scopeLogs.Scope()) + return logs, scopeLogs.LogRecords() +} + +func newResource(serviceName string) pcommon.Resource { + r := pcommon.NewResource() + r.Attributes().PutStr("service.name", serviceName) + return r +} + +func newScope(scopeName string) pcommon.InstrumentationScope { + s := pcommon.NewInstrumentationScope() + s.SetName(scopeName) + return s +} + +type unmarshalLogsFunc func([]byte) (plog.Logs, error) + +func (f unmarshalLogsFunc) UnmarshalLogs(data []byte) (plog.Logs, error) { + return f(data) } diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 4a5128583ac06..05e356c4873e5 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -5,14 +5,16 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c import ( "context" + "errors" "fmt" + "io" "net/http" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" ) @@ -24,9 +26,9 @@ type metricsConsumer struct { // consumer passes the translated metrics on to the // next consumer. consumer consumer.Metrics - // unmarshaler is the configured MetricsUnmarshaler + // unmarshaler is the configured pmetric.Unmarshaler // to use when processing the records. - unmarshaler unmarshaler.MetricsUnmarshaler + unmarshaler pmetric.Unmarshaler } var _ firehoseConsumer = (*metricsConsumer)(nil) @@ -36,7 +38,7 @@ var _ firehoseConsumer = (*metricsConsumer)(nil) func newMetricsReceiver( config *Config, set receiver.Settings, - unmarshalers map[string]unmarshaler.MetricsUnmarshaler, + unmarshalers map[string]pmetric.Unmarshaler, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { recordType := config.RecordType @@ -48,15 +50,14 @@ func newMetricsReceiver( return nil, fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, recordType) } - mc := &metricsConsumer{ + c := &metricsConsumer{ consumer: nextConsumer, unmarshaler: configuredUnmarshaler, } - return &firehoseReceiver{ settings: set, config: config, - consumer: mc, + consumer: c, }, nil } @@ -64,29 +65,34 @@ func newMetricsReceiver( // single pmetric.Metrics. If there are common attributes available, then it will // attach those to each of the pcommon.Resources. It will send the final result // to the next consumer. -func (mc *metricsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) - if err != nil { - return http.StatusBadRequest, err - } +func (c *metricsConsumer) Consume(ctx context.Context, nextRecord nextRecordFunc, commonAttributes map[string]string) (int, error) { + for { + record, err := nextRecord() + if errors.Is(err, io.EOF) { + break + } + metrics, err := c.unmarshaler.UnmarshalMetrics(record) + if err != nil { + return http.StatusBadRequest, err + } - if commonAttributes != nil { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rm := md.ResourceMetrics().At(i) - for k, v := range commonAttributes { - if _, found := rm.Resource().Attributes().Get(k); !found { - rm.Resource().Attributes().PutStr(k, v) + if commonAttributes != nil { + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + for k, v := range commonAttributes { + if _, found := rm.Resource().Attributes().Get(k); !found { + rm.Resource().Attributes().PutStr(k, v) + } } } } - } - err = mc.consumer.ConsumeMetrics(ctx, md) - if err != nil { - if consumererror.IsPermanent(err) { - return http.StatusBadRequest, err + if err := c.consumer.ConsumeMetrics(ctx, metrics); err != nil { + if consumererror.IsPermanent(err) { + return http.StatusBadRequest, err + } + return http.StatusServiceUnavailable, err } - return http.StatusServiceUnavailable, err } return http.StatusOK, nil } diff --git a/receiver/awsfirehosereceiver/metrics_receiver_test.go b/receiver/awsfirehosereceiver/metrics_receiver_test.go index d32ec4efc8a58..8402bf346e831 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver_test.go +++ b/receiver/awsfirehosereceiver/metrics_receiver_test.go @@ -6,46 +6,58 @@ package awsfirehosereceiver import ( "context" "errors" - "fmt" "net/http" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest" ) -type recordConsumer struct { - result pmetric.Metrics +type metricsRecordConsumer struct { + results []pmetric.Metrics } -var _ consumer.Metrics = (*recordConsumer)(nil) +var _ consumer.Metrics = (*metricsRecordConsumer)(nil) -func (rc *recordConsumer) ConsumeMetrics(_ context.Context, metrics pmetric.Metrics) error { - rc.result = metrics +func (rc *metricsRecordConsumer) ConsumeMetrics(_ context.Context, metrics pmetric.Metrics) error { + rc.results = append(rc.results, metrics) return nil } -func (rc *recordConsumer) Capabilities() consumer.Capabilities { +func (rc *metricsRecordConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -func TestNewMetricsReceiver(t *testing.T) { +func TestMetricsReceiver_Start(t *testing.T) { + unmarshalers := map[string]pmetric.Unmarshaler{ + "cwmetrics": &cwmetricstream.Unmarshaler{}, + "otlp_metrics": &pmetric.ProtoUnmarshaler{}, + } + testCases := map[string]struct { - consumer consumer.Metrics - recordType string - wantErr error + recordType string + wantUnmarshalerType pmetric.Unmarshaler + wantErr string }{ - "WithInvalidRecordType": { - consumer: consumertest.NewNop(), - recordType: "test", - wantErr: fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, "test"), + "WithDefaultRecordType": { + wantUnmarshalerType: &cwmetricstream.Unmarshaler{}, + }, + "WithSpecifiedRecordType": { + recordType: "otlp_metrics", + wantUnmarshalerType: &pmetric.ProtoUnmarshaler{}, + }, + "WithUnknownRecordType": { + recordType: "invalid", + wantErr: errUnrecognizedRecordType.Error() + ": recordType = invalid", }, } for name, testCase := range testCases { @@ -55,20 +67,21 @@ func TestNewMetricsReceiver(t *testing.T) { got, err := newMetricsReceiver( cfg, receivertest.NewNopSettings(), - defaultMetricsUnmarshalers(zap.NewNop()), - testCase.consumer, + unmarshalers, + consumertest.NewNop(), ) - require.Equal(t, testCase.wantErr, err) - if testCase.wantErr == nil { - require.NotNil(t, got) - } else { + if testCase.wantErr != "" { + require.EqualError(t, err, testCase.wantErr) require.Nil(t, got) + } else { + require.NoError(t, err) + require.NotNil(t, got) } }) } } -func TestMetricsConsumer(t *testing.T) { +func TestMetricsConsumer_Errors(t *testing.T) { testErr := errors.New("test error") testCases := map[string]struct { unmarshalerErr error @@ -101,28 +114,100 @@ func TestMetricsConsumer(t *testing.T) { unmarshaler: unmarshalertest.NewErrMetrics(testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) + gotStatus, gotErr := mc.Consume( + context.Background(), + newNextRecordFunc([][]byte{{}}), + nil, + ) require.Equal(t, testCase.wantStatus, gotStatus) require.Equal(t, testCase.wantErr, gotErr) }) } +} +func TestMetricsConsumer(t *testing.T) { t.Run("WithCommonAttributes", func(t *testing.T) { base := pmetric.NewMetrics() base.ResourceMetrics().AppendEmpty() - rc := recordConsumer{} + rc := metricsRecordConsumer{} mc := &metricsConsumer{ unmarshaler: unmarshalertest.NewWithMetrics(base), consumer: &rc, } - gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ - "CommonAttributes": "Test", - }) + gotStatus, gotErr := mc.Consume( + context.Background(), + newNextRecordFunc([][]byte{{}}), + map[string]string{ + "CommonAttributes": "Test", + }, + ) require.Equal(t, http.StatusOK, gotStatus) require.NoError(t, gotErr) - gotRms := rc.result.ResourceMetrics() + require.Len(t, rc.results, 1) + gotRms := rc.results[0].ResourceMetrics() require.Equal(t, 1, gotRms.Len()) gotRm := gotRms.At(0) require.Equal(t, 1, gotRm.Resource().Attributes().Len()) }) + t.Run("WithMultipleRecords", func(t *testing.T) { + metrics0, metricSlice0 := newMetrics("service0", "scope0") + sum0unit0 := addDeltaSumMetric(metricSlice0, "sum0", "unit0") + addSumDataPoint(sum0unit0, 1) + + metrics1, metricSlice1 := newMetrics("service0", "scope0") + sum0unit0 = addDeltaSumMetric(metricSlice1, "sum0", "unit0") + addSumDataPoint(sum0unit0, 2) + sum0unit1 := addDeltaSumMetric(metricSlice1, "sum0", "unit1") + addSumDataPoint(sum0unit1, 3) + scopeMetrics1 := metrics1.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + newScope("scope1").MoveTo(scopeMetrics1.Scope()) + sum0unit0 = addDeltaSumMetric(scopeMetrics1.Metrics(), "sum1", "unit0") + addSumDataPoint(sum0unit0, 4) + + metricsRemaining := []pmetric.Metrics{metrics0, metrics1} + var unmarshaler unmarshalMetricsFunc = func([]byte) (pmetric.Metrics, error) { + metrics := metricsRemaining[0] + metricsRemaining = metricsRemaining[1:] + return metrics, nil + } + + rc := metricsRecordConsumer{} + lc := &metricsConsumer{unmarshaler: unmarshaler, consumer: &rc} + nextRecord := newNextRecordFunc(make([][]byte, len(metricsRemaining))) + gotStatus, gotErr := lc.Consume(context.Background(), nextRecord, nil) + require.Equal(t, http.StatusOK, gotStatus) + require.NoError(t, gotErr) + require.Len(t, rc.results, 2) + assert.NoError(t, pmetrictest.CompareMetrics(metrics0, rc.results[0])) + assert.NoError(t, pmetrictest.CompareMetrics(metrics1, rc.results[1])) + }) +} + +func newMetrics(serviceName, scopeName string) (pmetric.Metrics, pmetric.MetricSlice) { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + newResource(serviceName).MoveTo(resourceMetrics.Resource()) + newScope(scopeName).MoveTo(scopeMetrics.Scope()) + return metrics, scopeMetrics.Metrics() +} + +func addDeltaSumMetric(metrics pmetric.MetricSlice, name, unit string) pmetric.Sum { + m := metrics.AppendEmpty() + m.SetName(name) + m.SetUnit(unit) + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + return sum +} + +func addSumDataPoint(sum pmetric.Sum, value int64) { + dp := sum.DataPoints().AppendEmpty() + dp.SetIntValue(value) +} + +type unmarshalMetricsFunc func([]byte) (pmetric.Metrics, error) + +func (f unmarshalMetricsFunc) UnmarshalMetrics(data []byte) (pmetric.Metrics, error) { + return f(data) } diff --git a/receiver/awsfirehosereceiver/receiver.go b/receiver/awsfirehosereceiver/receiver.go index baa9750b61628..c2002dece3037 100644 --- a/receiver/awsfirehosereceiver/receiver.go +++ b/receiver/awsfirehosereceiver/receiver.go @@ -16,6 +16,7 @@ import ( "sync" "time" + jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/receiver" @@ -40,10 +41,15 @@ var ( // The firehoseConsumer is responsible for using the unmarshaler and the consumer. type firehoseConsumer interface { - // Consume unmarshalls and consumes the records. - Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) + // Consume unmarshals and consumes the records returned by f. + Consume(ctx context.Context, f nextRecordFunc, commonAttributes map[string]string) (int, error) } +// nextRecordFunc is a function provided to consumers for obtaining the +// next record to consume. The function returns (nil, io.EOF) when there +// are no more records. +type nextRecordFunc func() ([]byte, error) + // firehoseReceiver type firehoseReceiver struct { // settings is the base receiver settings. @@ -174,14 +180,8 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - body, err := fmr.getBody(r) - if err != nil { - fmr.sendResponse(w, requestID, http.StatusBadRequest, err) - return - } - var fr firehoseRequest - if err = json.Unmarshal(body, &fr); err != nil { + if err := jsoniter.ConfigFastest.NewDecoder(r.Body).Decode(&fr); err != nil { fmr.sendResponse(w, requestID, http.StatusBadRequest, err) return } @@ -194,24 +194,6 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - records := make([][]byte, 0, len(fr.Records)) - for index, record := range fr.Records { - if record.Data != "" { - var decoded []byte - decoded, err = base64.StdEncoding.DecodeString(record.Data) - if err != nil { - fmr.sendResponse( - w, - requestID, - http.StatusBadRequest, - fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err), - ) - return - } - records = append(records, decoded) - } - } - commonAttributes, err := fmr.getCommonAttributes(r) if err != nil { fmr.settings.Logger.Error( @@ -220,7 +202,24 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) } - statusCode, err := fmr.consumer.Consume(ctx, records, commonAttributes) + var recordIndex int + var recordBuf []byte + nextRecord := func() ([]byte, error) { + if recordIndex == len(fr.Records) { + return nil, io.EOF + } + record := fr.Records[recordIndex] + recordIndex++ + + var decodeErr error + recordBuf, decodeErr = base64.StdEncoding.AppendDecode(recordBuf[:0], []byte(record.Data)) + if decodeErr != nil { + return nil, fmt.Errorf("unable to base64 decode the record at index %d: %w", recordIndex-1, decodeErr) + } + return recordBuf, nil + } + + statusCode, err := fmr.consumer.Consume(ctx, nextRecord, commonAttributes) if err != nil { fmr.settings.Logger.Error( "Unable to consume records", @@ -229,7 +228,6 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmr.sendResponse(w, requestID, statusCode, err) return } - fmr.sendResponse(w, requestID, http.StatusOK, nil) } @@ -246,19 +244,6 @@ func (fmr *firehoseReceiver) validate(r *http.Request) (int, error) { return http.StatusUnauthorized, errInvalidAccessKey } -// getBody reads the body from the request as a slice of bytes. -func (fmr *firehoseReceiver) getBody(r *http.Request) ([]byte, error) { - body, err := io.ReadAll(r.Body) - if err != nil { - return nil, err - } - err = r.Body.Close() - if err != nil { - return nil, err - } - return body, nil -} - // getCommonAttributes unmarshalls the common attributes from the request header func (fmr *firehoseReceiver) getCommonAttributes(r *http.Request) (map[string]string, error) { attributes := make(map[string]string) diff --git a/receiver/awsfirehosereceiver/receiver_test.go b/receiver/awsfirehosereceiver/receiver_test.go index 5cf7d5e43638a..b6845dc325654 100644 --- a/receiver/awsfirehosereceiver/receiver_test.go +++ b/receiver/awsfirehosereceiver/receiver_test.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net" "net/http" "net/http/httptest" @@ -40,10 +41,16 @@ func newNopFirehoseConsumer(statusCode int, err error) *nopFirehoseConsumer { return &nopFirehoseConsumer{statusCode, err} } -func (nfc *nopFirehoseConsumer) Consume(context.Context, [][]byte, map[string]string) (int, error) { +func (nfc *nopFirehoseConsumer) Consume(context.Context, nextRecordFunc, map[string]string) (int, error) { return nfc.statusCode, nfc.err } +type firehoseConsumerFunc func(context.Context, nextRecordFunc, map[string]string) (int, error) + +func (f firehoseConsumerFunc) Consume(ctx context.Context, next nextRecordFunc, common map[string]string) (int, error) { + return f(ctx, next, common) +} + func TestStart(t *testing.T) { testCases := map[string]struct { host component.Host @@ -60,7 +67,7 @@ func TestStart(t *testing.T) { t.Run(name, func(t *testing.T) { cfg := &Config{} ctx := context.TODO() - r := testFirehoseReceiver(cfg, nil) + r := testFirehoseReceiver(cfg, &nopFirehoseConsumer{}) got := r.Start(ctx, testCase.host) require.Equal(t, testCase.wantErr, got) if r.server != nil { @@ -80,7 +87,7 @@ func TestStart(t *testing.T) { }, } ctx := context.TODO() - r := testFirehoseReceiver(cfg, nil) + r := testFirehoseReceiver(cfg, &nopFirehoseConsumer{}) got := r.Start(ctx, componenttest.NewNopHost()) require.Error(t, got) if r.server != nil { @@ -144,11 +151,6 @@ func TestFirehoseRequest(t *testing.T) { wantStatusCode: http.StatusBadRequest, wantErr: errInBodyDiffRequestID, }, - "WithInvalidBody": { - body: "{ test: ", - wantStatusCode: http.StatusBadRequest, - wantErr: errors.New("json: cannot unmarshal string into Go value of type awsfirehosereceiver.firehoseRequest"), - }, "WithNoRecords": { body: testFirehoseRequest(testFirehoseRequestID, noRecords), wantStatusCode: http.StatusOK, @@ -163,6 +165,12 @@ func TestFirehoseRequest(t *testing.T) { body: testFirehoseRequest(testFirehoseRequestID, []firehoseRecord{ {Data: "XXXXXaGVsbG8="}, }), + consumer: firehoseConsumerFunc(func(_ context.Context, next nextRecordFunc, _ map[string]string) (int, error) { + if _, err := next(); err != nil { + return http.StatusBadRequest, err + } + return http.StatusOK, nil + }), wantStatusCode: http.StatusBadRequest, wantErr: fmt.Errorf("unable to base64 decode the record at index 0: %w", base64.CorruptInputError(12)), }, @@ -187,13 +195,7 @@ func TestFirehoseRequest(t *testing.T) { body, err := json.Marshal(testCase.body) require.NoError(t, err) - requestBody := bytes.NewBuffer(body) - - request := httptest.NewRequest(http.MethodPost, "/", requestBody) - request.Header.Set(headerContentType, "application/json") - request.Header.Set(headerContentLength, strconv.Itoa(requestBody.Len())) - request.Header.Set(headerFirehoseRequestID, testFirehoseRequestID) - request.Header.Set(headerFirehoseAccessKey, testFirehoseAccessKey) + request := newTestRequest(body) if testCase.headers != nil { for k, v := range testCase.headers { request.Header.Set(k, v) @@ -229,6 +231,23 @@ func TestFirehoseRequest(t *testing.T) { } } +// TestFirehoseRequestInvalidJSON is tested separately from TestFirehoseRequest +// because the error message is highly dependent on the JSON decoding library used, +// so we don't do an exact match. +func TestFirehoseRequestInvalidJSON(t *testing.T) { + consumer := newNopFirehoseConsumer(http.StatusOK, nil) + r := testFirehoseReceiver(&Config{}, consumer) + + got := httptest.NewRecorder() + r.ServeHTTP(got, newTestRequest([]byte("{ test: "))) + require.Equal(t, http.StatusBadRequest, got.Code) + + var gotResponse firehoseResponse + require.NoError(t, json.Unmarshal(got.Body.Bytes(), &gotResponse)) + require.Equal(t, testFirehoseRequestID, gotResponse.RequestID) + require.Regexp(t, gotResponse.ErrorMessage, `awsfirehosereceiver\.firehoseRequest\.ReadStringAsSlice: expects .*`) +} + // testFirehoseReceiver is a convenience function for creating a test firehoseReceiver func testFirehoseReceiver(config *Config, consumer firehoseConsumer) *firehoseReceiver { return &firehoseReceiver{ @@ -259,3 +278,14 @@ func testFirehoseRecord(data string) firehoseRecord { encoded := base64.StdEncoding.EncodeToString([]byte(data)) return firehoseRecord{Data: encoded} } + +func newNextRecordFunc(records [][]byte) nextRecordFunc { + return func() ([]byte, error) { + if len(records) == 0 { + return nil, io.EOF + } + next := records[0] + records = records[1:] + return next, nil + } +} From 2b0a955f4b8feebe5d2fd367edcb3bcb5580cc39 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 5 Feb 2025 09:23:49 +0800 Subject: [PATCH 3/8] Apply suggestions from code review Co-authored-by: Anthony Mirabella --- .../internal/unmarshaler/cwlog/unmarshaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index 182f809b7ef59..922d8af6a4d8d 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -15,7 +15,7 @@ import ( "github.com/klauspost/compress/gzip" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" ) From 142af97bfeaedfade885cf8bde1f2bfe7113bd3d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 5 Feb 2025 09:25:19 +0800 Subject: [PATCH 4/8] Use newer semconv version consistently --- .../unmarshaler/cwmetricstream/unmarshaler_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index 37d850de7afa3..49e35e33ae951 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -13,7 +13,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "go.uber.org/zap" ) @@ -108,9 +107,9 @@ func TestUnmarshal_SingleRecord(t *testing.T) { // Check one resource attribute to check things are wired up. // Remaining resource attributes are checked in TestSetResourceAttributes. res := rm.Resource() - cloudProvider, ok := res.Attributes().Get(semconv.AttributeCloudProvider) + cloudProvider, ok := res.Attributes().Get(conventions.AttributeCloudProvider) require.True(t, ok) - assert.Equal(t, semconv.AttributeCloudProviderAWS, cloudProvider.Str()) + assert.Equal(t, conventions.AttributeCloudProviderAWS, cloudProvider.Str()) require.Equal(t, 1, rm.ScopeMetrics().Len()) sm := rm.ScopeMetrics().At(0) @@ -163,7 +162,7 @@ func TestSetResourceAttributes(t *testing.T) { attributeAWSCloudWatchMetricStreamName: testStreamName, conventions.AttributeCloudAccountID: testAccountID, conventions.AttributeCloudRegion: testRegion, - conventions.AttributeCloudProvider: semconv.AttributeCloudProviderAWS, + conventions.AttributeCloudProvider: conventions.AttributeCloudProviderAWS, conventions.AttributeServiceName: "EC2", conventions.AttributeServiceNamespace: "AWS", }, @@ -174,7 +173,7 @@ func TestSetResourceAttributes(t *testing.T) { attributeAWSCloudWatchMetricStreamName: testStreamName, conventions.AttributeCloudAccountID: testAccountID, conventions.AttributeCloudRegion: testRegion, - conventions.AttributeCloudProvider: semconv.AttributeCloudProviderAWS, + conventions.AttributeCloudProvider: conventions.AttributeCloudProviderAWS, conventions.AttributeServiceName: "CustomNamespace", }, }, From ffd533db1cd54ebbb72b443ca72057f7193ac2a8 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 5 Feb 2025 09:40:28 +0800 Subject: [PATCH 5/8] Fix UnmarshalFoo doc comments --- .../internal/unmarshaler/cwlog/unmarshaler.go | 7 ++++--- .../internal/unmarshaler/cwmetricstream/unmarshaler.go | 6 +++--- .../internal/unmarshaler/otlpmetricstream/unmarshaler.go | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index 922d8af6a4d8d..5547a3c872d9f 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -41,9 +41,10 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger: logger} } -// Unmarshal deserializes the records into cWLogs and uses the -// resourceLogsBuilder to group them into a single plog.Logs. -// Skips invalid cWLogs received in the record and +// UnmarshalLogs deserializes the given record as CloudWatch Logs events +// into a plog.Logs, grouping logs by owner (account ID), log group, and +// log stream. Logs are assumed to be gzip-compressed as specified at +// https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html. func (u *Unmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error) { var err error r, _ := u.gzipPool.Get().(*gzip.Reader) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index 17fae59fd5354..ef4a88eebef31 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -42,9 +42,9 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWMetrics and uses the -// resourceMetricsBuilder to group them into a single pmetric.Metrics. -// Skips invalid cWMetrics received in the record and +// UnmarshalMetrics deserializes the record in CloudWatch Metric Stream JSON +// format into a pmetric.Metrics, grouping metrics by resource and metric +// name and unit. func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { type metricKey struct { name string diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go index cc1c131a31b0f..b7d8f45a5eac5 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go @@ -35,7 +35,8 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into pmetric.Metrics +// UnmarshalMetrics deserializes the recordsas a length-delimited sequence of +// OTLP metrics into pmetric.Metrics. func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() dataLen, pos := len(record), 0 From 36ebd3a1264db2218d2ef342f351b8a308497782 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 5 Feb 2025 09:43:56 +0800 Subject: [PATCH 6/8] Update go.mod --- receiver/awsfirehosereceiver/go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/awsfirehosereceiver/go.mod b/receiver/awsfirehosereceiver/go.mod index 072fc9f710bb3..4693fdce7ecdc 100644 --- a/receiver/awsfirehosereceiver/go.mod +++ b/receiver/awsfirehosereceiver/go.mod @@ -6,7 +6,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.11 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.117.1-0.20250117002813-e970f8bb1258 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.119.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.119.0 go.opentelemetry.io/collector/component/componentstatus v0.119.0 @@ -43,7 +43,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.117.1-0.20250114172347-71aae791d7f8 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.119.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect From 32bb584bc007e3d0dc7f02007a9a6375a98b9890 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 5 Feb 2025 09:53:10 +0800 Subject: [PATCH 7/8] Swallow scanner errors This would imply corruption at the gzip level, since the gzip reader is working off an in-memory buffer. Unlikely, but swallow the error to keep it in line with existing code. --- .../internal/unmarshaler/cwlog/unmarshaler.go | 3 ++- .../internal/unmarshaler/cwmetricstream/unmarshaler.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index 5547a3c872d9f..133237cccab55 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -105,7 +105,8 @@ func (u *Unmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error) } } if err := scanner.Err(); err != nil { - return plog.Logs{}, err + // Treat this as a non-fatal error, and handle the data below. + u.logger.Error("Error scanning for newline-delimited JSON", zap.Error(err)) } if len(byResource) == 0 { return plog.Logs{}, errInvalidRecords diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index ef4a88eebef31..5992ecdf322ec 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -110,7 +110,8 @@ func (u Unmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { maxQ.SetValue(cwMetric.Value.Max) } if err := scanner.Err(); err != nil { - return pmetric.Metrics{}, err + // Treat this as a non-fatal error, and handle the data below. + u.logger.Error("Error scanning for newline-delimited JSON", zap.Error(err)) } if len(byResource) == 0 { return pmetric.Metrics{}, errInvalidRecords From d813c7159904f8c5ef3404be32a57c909bc4d006 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 6 Feb 2025 07:31:41 +0800 Subject: [PATCH 8/8] Log error if pooled object is not a gzip.Reader --- .../internal/unmarshaler/cwlog/unmarshaler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index 133237cccab55..8a4d78c3716fc 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -47,7 +47,11 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { // https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html. func (u *Unmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error) { var err error - r, _ := u.gzipPool.Get().(*gzip.Reader) + r, ok := u.gzipPool.Get().(*gzip.Reader) + if !ok { + u.logger.Error(fmt.Sprintf("Expected *gzip.Reader, got %T", r)) + // Fall through and create a new *gzip.Reader (r == nil) + } if r == nil { r, err = gzip.NewReader(bytes.NewReader(compressedRecord)) } else {