diff --git a/.chloggen/awss3receiver_metrics_logs.yaml b/.chloggen/awss3receiver_metrics_logs.yaml new file mode 100644 index 0000000000000..6e12a8957d0df --- /dev/null +++ b/.chloggen/awss3receiver_metrics_logs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for retrieving logs and metrics to the AWS S3 Receiver. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30750] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index ed0236b4110cd..a7ec61a84b47f 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -2,7 +2,7 @@ | Status | | | ------------- |-----------| -| Stability | [development]: traces | +| Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fawss3%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fawss3) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fawss3%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fawss3) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@adcharre](https://www.github.com/adcharre) | diff --git a/receiver/awss3receiver/factory.go b/receiver/awss3receiver/factory.go index 330ecfb43b491..bc4966814578d 100644 --- a/receiver/awss3receiver/factory.go +++ b/receiver/awss3receiver/factory.go @@ -18,9 +18,19 @@ func NewFactory() receiver.Factory { metadata.Type, createDefaultConfig, receiver.WithTraces(createTracesReceiver, metadata.TracesStability), + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(createLogsReceiver, metadata.LogsStability), ) } func createTracesReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Traces) (receiver.Traces, error) { return newAWSS3TraceReceiver(ctx, cc.(*Config), consumer, settings) } + +func createMetricsReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { + return newAWSS3MetricsReceiver(ctx, cc.(*Config), consumer, settings) +} + +func createLogsReceiver(ctx context.Context, settings receiver.Settings, cc component.Config, consumer consumer.Logs) (receiver.Logs, error) { + return newAWSS3LogsReceiver(ctx, cc.(*Config), consumer, settings) +} diff --git a/receiver/awss3receiver/generated_component_test.go b/receiver/awss3receiver/generated_component_test.go index 74985c9fe1199..be928a90cc0c3 100644 --- a/receiver/awss3receiver/generated_component_test.go +++ b/receiver/awss3receiver/generated_component_test.go @@ -31,6 +31,20 @@ func TestComponentLifecycle(t *testing.T) { createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) }{ + { + name: "logs", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "metrics", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + { name: "traces", createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index d6a989abe3412..4909c3b317def 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -39,7 +39,7 @@ require ( github.com/aws/smithy-go v1.20.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect @@ -55,7 +55,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index c81a09e4d8272..a235a8b76e15f 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -41,8 +41,9 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -86,8 +87,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= diff --git a/receiver/awss3receiver/internal/metadata/generated_status.go b/receiver/awss3receiver/internal/metadata/generated_status.go index 1b91b6f78ef17..ad73c06af7186 100644 --- a/receiver/awss3receiver/internal/metadata/generated_status.go +++ b/receiver/awss3receiver/internal/metadata/generated_status.go @@ -11,5 +11,7 @@ var ( ) const ( - TracesStability = component.StabilityLevelDevelopment + TracesStability = component.StabilityLevelDevelopment + MetricsStability = component.StabilityLevelDevelopment + LogsStability = component.StabilityLevelDevelopment ) diff --git a/receiver/awss3receiver/metadata.yaml b/receiver/awss3receiver/metadata.yaml index 5bfa60a318b5a..1885c6a7861bf 100644 --- a/receiver/awss3receiver/metadata.yaml +++ b/receiver/awss3receiver/metadata.yaml @@ -3,7 +3,7 @@ type: awss3 status: class: receiver stability: - development: [traces] + development: [traces,metrics,logs] distributions: [] codeowners: active: [atoulme, adcharre] diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index b752c5fc6ad58..f2a99c2936237 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" @@ -20,23 +22,28 @@ import ( ) type encodingExtension struct { - extension ptrace.Unmarshaler + extension component.Component suffix string } type encodingExtensions []encodingExtension -type awss3TraceReceiver struct { +type receiverProcessor interface { + processReceivedData(ctx context.Context, receiver *awss3Receiver, key string, data []byte) error +} + +type awss3Receiver struct { s3Reader *s3Reader - consumer consumer.Traces logger *zap.Logger cancel context.CancelFunc obsrecv *receiverhelper.ObsReport encodingsConfig []Encoding + telemetryType string + dataProcessor receiverProcessor extensions encodingExtensions } -func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, settings receiver.Settings) (*awss3TraceReceiver, error) { +func newAWSS3Receiver(ctx context.Context, cfg *Config, telemetryType string, settings receiver.Settings, processor receiverProcessor) (*awss3Receiver, error) { reader, err := newS3Reader(ctx, cfg) if err != nil { return nil, err @@ -50,17 +57,18 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra return nil, err } - return &awss3TraceReceiver{ + return &awss3Receiver{ s3Reader: reader, - consumer: traces, + telemetryType: telemetryType, logger: settings.Logger, cancel: nil, obsrecv: obsrecv, + dataProcessor: processor, encodingsConfig: cfg.Encodings, }, nil } -func (r *awss3TraceReceiver) Start(_ context.Context, host component.Host) error { +func (r *awss3Receiver) Start(_ context.Context, host component.Host) error { var err error r.extensions, err = newEncodingExtensions(r.encodingsConfig, host) if err != nil { @@ -70,23 +78,22 @@ func (r *awss3TraceReceiver) Start(_ context.Context, host component.Host) error var ctx context.Context ctx, r.cancel = context.WithCancel(context.Background()) go func() { - _ = r.s3Reader.readAll(ctx, "traces", r.receiveBytes) + _ = r.s3Reader.readAll(ctx, r.telemetryType, r.receiveBytes) }() return nil } -func (r *awss3TraceReceiver) Shutdown(_ context.Context) error { +func (r *awss3Receiver) Shutdown(_ context.Context) error { if r.cancel != nil { r.cancel() } return nil } -func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data []byte) error { +func (r *awss3Receiver) receiveBytes(ctx context.Context, key string, data []byte) error { if data == nil { return nil } - if strings.HasSuffix(key, ".gz") { reader, err := gzip.NewReader(bytes.NewReader(data)) if err != nil { @@ -98,8 +105,26 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data return err } } + return r.dataProcessor.processReceivedData(ctx, r, key, data) +} + +type traceReceiver struct { + consumer consumer.Traces +} + +func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, settings receiver.Settings) (*awss3Receiver, error) { + return newAWSS3Receiver(ctx, cfg, "traces", settings, &traceReceiver{consumer: traces}) +} + +func (r *traceReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error { + var unmarshaler ptrace.Unmarshaler + var format string + + if extension, f := rcvr.extensions.findExtension(key); extension != nil { + unmarshaler, _ = extension.(ptrace.Unmarshaler) + format = f + } - unmarshaler, format := r.extensions.findExtension(key) if unmarshaler == nil { if strings.HasSuffix(key, ".json") { unmarshaler = &ptrace.JSONUnmarshaler{} @@ -111,38 +136,118 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data } } if unmarshaler == nil { - r.logger.Warn("Unsupported file format", zap.String("key", key)) + rcvr.logger.Warn("Unsupported file format", zap.String("key", key)) return nil } traces, err := unmarshaler.UnmarshalTraces(data) if err != nil { return err } - obsCtx := r.obsrecv.StartTracesOp(ctx) + obsCtx := rcvr.obsrecv.StartTracesOp(ctx) err = r.consumer.ConsumeTraces(ctx, traces) - r.obsrecv.EndTracesOp(obsCtx, format, traces.SpanCount(), err) + rcvr.obsrecv.EndTracesOp(obsCtx, format, traces.SpanCount(), err) + return err +} + +type metricsReceiver struct { + consumer consumer.Metrics +} + +func newAWSS3MetricsReceiver(ctx context.Context, cfg *Config, metrics consumer.Metrics, settings receiver.Settings) (*awss3Receiver, error) { + return newAWSS3Receiver(ctx, cfg, "metrics", settings, &metricsReceiver{consumer: metrics}) +} + +func (r *metricsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error { + var unmarshaler pmetric.Unmarshaler + var format string + + if extension, f := rcvr.extensions.findExtension(key); extension != nil { + unmarshaler, _ = extension.(pmetric.Unmarshaler) + format = f + } + + if unmarshaler == nil { + if strings.HasSuffix(key, ".json") { + unmarshaler = &pmetric.JSONUnmarshaler{} + format = "otlp_json" + } + if strings.HasSuffix(key, ".binpb") { + unmarshaler = &pmetric.ProtoUnmarshaler{} + format = "otlp_proto" + } + } + if unmarshaler == nil { + rcvr.logger.Warn("Unsupported file format", zap.String("key", key)) + return nil + } + metrics, err := unmarshaler.UnmarshalMetrics(data) + if err != nil { + return err + } + obsCtx := rcvr.obsrecv.StartMetricsOp(ctx) + err = r.consumer.ConsumeMetrics(ctx, metrics) + rcvr.obsrecv.EndMetricsOp(obsCtx, format, metrics.MetricCount(), err) + return err +} + +type logsReceiver struct { + consumer consumer.Logs +} + +func newAWSS3LogsReceiver(ctx context.Context, cfg *Config, logs consumer.Logs, settings receiver.Settings) (*awss3Receiver, error) { + return newAWSS3Receiver(ctx, cfg, "logs", settings, &logsReceiver{consumer: logs}) +} + +func (r *logsReceiver) processReceivedData(ctx context.Context, rcvr *awss3Receiver, key string, data []byte) error { + var unmarshaler plog.Unmarshaler + var format string + + if extension, f := rcvr.extensions.findExtension(key); extension != nil { + unmarshaler, _ = extension.(plog.Unmarshaler) + format = f + } + + if unmarshaler == nil { + if strings.HasSuffix(key, ".json") { + unmarshaler = &plog.JSONUnmarshaler{} + format = "otlp_json" + } + if strings.HasSuffix(key, ".binpb") { + unmarshaler = &plog.ProtoUnmarshaler{} + format = "otlp_proto" + } + } + if unmarshaler == nil { + rcvr.logger.Warn("Unsupported file format", zap.String("key", key)) + return nil + } + logs, err := unmarshaler.UnmarshalLogs(data) + if err != nil { + return err + } + obsCtx := rcvr.obsrecv.StartLogsOp(ctx) + err = r.consumer.ConsumeLogs(ctx, logs) + rcvr.obsrecv.EndLogsOp(obsCtx, format, logs.LogRecordCount(), err) return err } func newEncodingExtensions(encodingsConfig []Encoding, host component.Host) (encodingExtensions, error) { encodings := make(encodingExtensions, 0) extensions := host.GetExtensions() - for _, encoding := range encodingsConfig { - if e, ok := extensions[encoding.Extension]; ok { - if u, ok := e.(ptrace.Unmarshaler); ok { - encodings = append(encodings, encodingExtension{extension: u, suffix: encoding.Suffix}) - } + for _, configItem := range encodingsConfig { + if e, ok := extensions[configItem.Extension]; ok { + encodings = append(encodings, encodingExtension{extension: e, suffix: configItem.Suffix}) } else { - return nil, fmt.Errorf("extension %q not found", encoding.Extension) + return nil, fmt.Errorf("extension %q not found", configItem.Extension) } } return encodings, nil } -func (encodings encodingExtensions) findExtension(key string) (ptrace.Unmarshaler, string) { - for _, encoding := range encodings { - if strings.HasSuffix(key, encoding.suffix) { - return encoding.extension, encoding.suffix +func (encodings encodingExtensions) findExtension(key string) (component.Component, string) { + for _, e := range encodings { + if strings.HasSuffix(key, e.suffix) { + return e.extension, e.suffix } } return nil, "" diff --git a/receiver/awss3receiver/receiver_test.go b/receiver/awss3receiver/receiver_test.go index ad2e4940061c8..e4fdd435d892e 100644 --- a/receiver/awss3receiver/receiver_test.go +++ b/receiver/awss3receiver/receiver_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" @@ -33,6 +35,26 @@ func generateTraceData() ptrace.Traces { return td } +func generateMetricData() pmetric.Metrics { + md := pmetric.NewMetrics() + rs := md.ResourceMetrics().AppendEmpty() + metric := rs.ScopeMetrics().AppendEmpty().Metrics() + dp := metric.AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(1.0) + dp.SetStartTimestamp(1581452772000000000) + dp.SetTimestamp(1581452772000000000) + return md +} + +func generateLogData() plog.Logs { + ld := plog.NewLogs() + rs := ld.ResourceLogs().AppendEmpty() + log := rs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log.SetObservedTimestamp(1581452772000000000) + log.Body().SetStr("test") + return ld +} + func gzipCompress(data []byte) []byte { var buf bytes.Buffer gz := gzip.NewWriter(&buf) @@ -76,7 +98,9 @@ func (e nonEncodingExtension) Shutdown(_ context.Context) error { } type unmarshalExtension struct { - trace ptrace.Traces + trace ptrace.Traces + metric pmetric.Metrics + log plog.Logs } func (e unmarshalExtension) Start(_ context.Context, _ component.Host) error { @@ -91,7 +115,15 @@ func (e unmarshalExtension) UnmarshalTraces(_ []byte) (ptrace.Traces, error) { return e.trace, nil } -func Test_receiveBytes(t *testing.T) { +func (e unmarshalExtension) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { + return e.metric, nil +} + +func (e unmarshalExtension) UnmarshalLogs(_ []byte) (plog.Logs, error) { + return e.log, nil +} + +func Test_receiveBytes_traces(t *testing.T) { testTrace := generateTraceData() jsonTrace, err := (&ptrace.JSONMarshaler{}).MarshalTraces(testTrace) @@ -205,16 +237,286 @@ func Test_receiveBytes(t *testing.T) { }) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - r := &awss3TraceReceiver{ - consumer: tracesConsumer, - logger: zap.NewNop(), - obsrecv: obsrecv, + r := &awss3Receiver{ + logger: zap.NewNop(), + obsrecv: obsrecv, extensions: encodingExtensions{ { extension: &unmarshalExtension{trace: testTrace}, suffix: ".test", }, }, + dataProcessor: &traceReceiver{ + consumer: tracesConsumer, + }, + } + if err := r.receiveBytes(context.Background(), tt.args.key, tt.args.data); (err != nil) != tt.wantErr { + t.Errorf("receiveBytes() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_receiveBytes_metrics(t *testing.T) { + testMetric := generateMetricData() + + jsonMetric, err := (&pmetric.JSONMarshaler{}).MarshalMetrics(testMetric) + require.NoError(t, err) + protobufMetric, err := (&pmetric.ProtoMarshaler{}).MarshalMetrics(testMetric) + require.NoError(t, err) + + type args struct { + key string + data []byte + } + tests := []struct { + name string + args args + wantErr bool + wantMetric bool + }{ + { + name: "nil data", + args: args{ + key: "test.json", + data: nil, + }, + wantErr: false, + wantMetric: false, + }, + { + name: ".json", + args: args{ + key: "test.json", + data: jsonMetric, + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".binpb", + args: args{ + key: "test.binpb", + data: protobufMetric, + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".unknown", + args: args{ + key: "test.unknown", + data: []byte("unknown"), + }, + wantErr: false, + wantMetric: false, + }, + { + name: ".json.gz", + args: args{ + key: "test.json.gz", + data: gzipCompress(jsonMetric), + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".binpb.gz", + args: args{ + key: "test.binpb.gz", + data: gzipCompress(protobufMetric), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "encoding extension", + args: args{ + key: "test.test", + data: []byte("test"), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "encoding extension .gz", + args: args{ + key: "test.test", + data: gzipCompress([]byte("test")), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "invalid gzip", + args: args{ + key: "test.json.gz", + data: []byte("invalid gzip"), + }, + wantErr: true, + wantMetric: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracesConsumer, _ := consumer.NewMetrics(func(_ context.Context, md pmetric.Metrics) error { + t.Helper() + if !tt.wantMetric { + t.Errorf("receiveBytes() received unexpected trace") + } else { + require.Equal(t, testMetric, md) + } + return nil + }) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) + require.NoError(t, err) + r := &awss3Receiver{ + logger: zap.NewNop(), + obsrecv: obsrecv, + extensions: encodingExtensions{ + { + extension: &unmarshalExtension{metric: testMetric}, + suffix: ".test", + }, + }, + dataProcessor: &metricsReceiver{ + consumer: tracesConsumer, + }, + } + if err := r.receiveBytes(context.Background(), tt.args.key, tt.args.data); (err != nil) != tt.wantErr { + t.Errorf("receiveBytes() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_receiveBytes_logs(t *testing.T) { + testLog := generateLogData() + + jsonLog, err := (&plog.JSONMarshaler{}).MarshalLogs(testLog) + require.NoError(t, err) + protobufLog, err := (&plog.ProtoMarshaler{}).MarshalLogs(testLog) + require.NoError(t, err) + + type args struct { + key string + data []byte + } + tests := []struct { + name string + args args + wantErr bool + wantMetric bool + }{ + { + name: "nil data", + args: args{ + key: "test.json", + data: nil, + }, + wantErr: false, + wantMetric: false, + }, + { + name: ".json", + args: args{ + key: "test.json", + data: jsonLog, + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".binpb", + args: args{ + key: "test.binpb", + data: protobufLog, + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".unknown", + args: args{ + key: "test.unknown", + data: []byte("unknown"), + }, + wantErr: false, + wantMetric: false, + }, + { + name: ".json.gz", + args: args{ + key: "test.json.gz", + data: gzipCompress(jsonLog), + }, + wantErr: false, + wantMetric: true, + }, + { + name: ".binpb.gz", + args: args{ + key: "test.binpb.gz", + data: gzipCompress(protobufLog), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "encoding extension", + args: args{ + key: "test.test", + data: []byte("test"), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "encoding extension .gz", + args: args{ + key: "test.test", + data: gzipCompress([]byte("test")), + }, + wantErr: false, + wantMetric: true, + }, + { + name: "invalid gzip", + args: args{ + key: "test.json.gz", + data: []byte("invalid gzip"), + }, + wantErr: true, + wantMetric: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracesConsumer, _ := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { + t.Helper() + if !tt.wantMetric { + t.Errorf("receiveBytes() received unexpected trace") + } else { + require.Equal(t, testLog, ld) + } + return nil + }) + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) + require.NoError(t, err) + r := &awss3Receiver{ + logger: zap.NewNop(), + obsrecv: obsrecv, + extensions: encodingExtensions{ + { + extension: &unmarshalExtension{log: testLog}, + suffix: ".test", + }, + }, + dataProcessor: &logsReceiver{ + consumer: tracesConsumer, + }, } if err := r.receiveBytes(context.Background(), tt.args.key, tt.args.data); (err != nil) != tt.wantErr { t.Errorf("receiveBytes() error = %v, wantErr %v", err, tt.wantErr) @@ -261,15 +563,10 @@ func Test_newEncodingExtensions(t *testing.T) { wantErr: assert.NoError, }, { - name: "non-encoding", + name: "empty", args: args{ - encodingsConfig: []Encoding{ - { - Extension: component.MustNewID("nonencoding"), - Suffix: ".non-encoding", - }, - }, - host: host, + encodingsConfig: []Encoding{}, + host: host, }, want: encodingExtensions{}, wantErr: assert.NoError,