From e030aa3115b1a586dbd358c9da22cca2e6daf045 Mon Sep 17 00:00:00 2001 From: Shaocheng Wang Date: Thu, 25 Feb 2021 16:03:11 -0800 Subject: [PATCH 1/4] Direct EMF output to stdout to support AWS Lambda --- exporter/awsemfexporter/README.md | 1 + exporter/awsemfexporter/config.go | 3 + exporter/awsemfexporter/emf_exporter.go | 38 +++++++------ exporter/awsemfexporter/emf_exporter_test.go | 60 ++++++++++++++++++++ exporter/awsemfexporter/factory.go | 1 + 5 files changed, 87 insertions(+), 16 deletions(-) diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index fa282a29df46c..cec55c3f7fc06 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)| | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | +| `run_in_lambda` | "run_in_lambda" is the option to support AWS Lambda by directing EMF exporter output to stdout, which will then be sent to CWLogs by Lambda. | `false` | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]| diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 31362ea87b45b..3b254b94e7be0 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -67,6 +67,9 @@ type Config struct { // MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"` + // RunInLambda is an option to support AWS Lambda. (In Lambda, the EMF output will be directed to stdout) + RunInLambda bool `mapstructure:"run_in_lambda"` + // ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 7dc028736d940..c81c6be42b036 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -108,6 +108,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (dr groupedMetrics := make(map[interface{}]*GroupedMetric) expConfig := emf.config.(*Config) defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID) + runInLambda := expConfig.RunInLambda rms := md.ResourceMetrics() @@ -120,23 +121,28 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (dr cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) putLogEvent := translateCWMetricToEMF(cWMetric) - logGroup := groupedMetric.Metadata.LogGroup - logStream := groupedMetric.Metadata.LogStream - if logStream == "" { - logStream = defaultLogStream - } - - pusher := emf.getPusher(logGroup, logStream) - if pusher != nil { - returnError := pusher.AddLogEntry(putLogEvent) - if returnError != nil { - err = wrapErrorIfBadRequest(&returnError) - return + // If RUN_IN_LAMBDA, send EMF log into stdout, which will then be redirected to CWLogs by Lambda + if runInLambda { + fmt.Println(*putLogEvent.InputLogEvent.Message) + } else { + logGroup := groupedMetric.Metadata.LogGroup + logStream := groupedMetric.Metadata.LogStream + if logStream == "" { + logStream = defaultLogStream } - returnError = pusher.ForceFlush() - if returnError != nil { - err = wrapErrorIfBadRequest(&returnError) - return + + pusher := emf.getPusher(logGroup, logStream) + if pusher != nil { + returnError := pusher.AddLogEntry(putLogEvent) + if returnError != nil { + err = wrapErrorIfBadRequest(&returnError) + return + } + returnError = pusher.ForceFlush() + if returnError != nil { + err = wrapErrorIfBadRequest(&returnError) + return + } } } } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index d0664324f8da0..2d72138269ae0 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -123,6 +123,66 @@ func TestConsumeMetrics(t *testing.T) { require.NoError(t, exp.Shutdown(ctx)) } +func TestConsumeMetricsWithRunInLambda(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + factory := NewFactory() + expCfg := factory.CreateDefaultConfig().(*Config) + expCfg.Region = "us-west-2" + expCfg.MaxRetries = 0 + expCfg.RunInLambda = true + exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()}) + assert.Nil(t, err) + assert.NotNil(t, exp) + + mdata := consumerdata.MetricsData{ + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"}, + LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"}, + }, + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + "resource": "R1", + }, + }, + Metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "spanCounter", + Description: "Counting all the spans", + Unit: "Count", + Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, + LabelKeys: []*metricspb.LabelKey{ + {Key: "spanName"}, + {Key: "isItAnError"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "testSpan", HasValue: true}, + {Value: "false", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Timestamp: ×tamp.Timestamp{ + Seconds: 1234567890123, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 1, + }, + }, + }, + }, + }, + }, + }, + } + md := internaldata.OCToMetrics(mdata) + require.NoError(t, exp.ConsumeMetrics(ctx, md)) + require.NoError(t, exp.Shutdown(ctx)) +} + func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index 6f60508441364..677d9deedfd76 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -54,6 +54,7 @@ func createDefaultConfig() configmodels.Exporter { RoleARN: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", MetricDeclarations: make([]*MetricDeclaration, 0), + RunInLambda: false, logger: nil, } } From 013e12ff676d149becbc4566ad21618b8be326b6 Mon Sep 17 00:00:00 2001 From: Shaocheng Wang Date: Tue, 16 Mar 2021 16:50:48 -0700 Subject: [PATCH 2/4] fix some old lib after merging from upstream --- exporter/awsemfexporter/emf_exporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 7b770f47656f6..a657da20d3074 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -134,7 +134,7 @@ func TestConsumeMetricsWithRunInLambda(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, exp) - mdata := consumerdata.MetricsData{ + mdata := internaldata.MetricsData{ Node: &commonpb.Node{ ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"}, LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"}, From a9fd9cac259f39c4e3cb67218670454db0bf380a Mon Sep 17 00:00:00 2001 From: Shaocheng Wang Date: Wed, 24 Mar 2021 15:35:26 -0700 Subject: [PATCH 3/4] Use output_destination in customer config --- exporter/awsemfexporter/README.md | 2 +- exporter/awsemfexporter/config.go | 7 +++++-- exporter/awsemfexporter/config_test.go | 2 ++ exporter/awsemfexporter/emf_exporter.go | 10 +++++----- exporter/awsemfexporter/emf_exporter_test.go | 4 ++-- exporter/awsemfexporter/factory.go | 2 +- 6 files changed, 16 insertions(+), 11 deletions(-) diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index cec55c3f7fc06..e2ed1fa243b2c 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -25,7 +25,7 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)| | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `run_in_lambda` | "run_in_lambda" is the option to support AWS Lambda by directing EMF exporter output to stdout, which will then be sent to CWLogs by Lambda. | `false` | +| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "CloudWatch" or "stdout" | `CloudWatch` | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]| diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index 3b254b94e7be0..f417e50798d5a 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -67,8 +67,11 @@ type Config struct { // MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"` - // RunInLambda is an option to support AWS Lambda. (In Lambda, the EMF output will be directed to stdout) - RunInLambda bool `mapstructure:"run_in_lambda"` + // OutputDestination is an option to specify the EMFExporter output. Default option is "CloudWatch" + // "CloudWatch" - direct the exporter output to CloudWatch backend + // "stdout" - direct the exporter output to stdout + // TODO: we can support directing output to a file (in the future) while customer specifies a file path here. + OutputDestination string `mapstructure:"output_destination"` // ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes. // "Enabled" - A boolean field to enable/disable this option. Default is `false`. diff --git a/exporter/awsemfexporter/config_test.go b/exporter/awsemfexporter/config_test.go index d49897c1b66d8..78a835cd16246 100644 --- a/exporter/awsemfexporter/config_test.go +++ b/exporter/awsemfexporter/config_test.go @@ -60,6 +60,7 @@ func TestLoadConfig(t *testing.T) { Region: "us-west-2", RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole", DimensionRollupOption: "ZeroAndSingleDimensionRollup", + OutputDestination: "CloudWatch", MetricDeclarations: []*MetricDeclaration{}, MetricDescriptors: []MetricDescriptor{}, }, r1) @@ -79,6 +80,7 @@ func TestLoadConfig(t *testing.T) { Region: "", RoleARN: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", + OutputDestination: "CloudWatch", ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true}, MetricDeclarations: []*MetricDeclaration{}, MetricDescriptors: []MetricDescriptor{}, diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index fa34973c2dd45..78d388972d733 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -119,7 +119,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err groupedMetrics := make(map[interface{}]*GroupedMetric) expConfig := emf.config.(*Config) defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID) - runInLambda := expConfig.RunInLambda + outputDestination := expConfig.OutputDestination for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -129,10 +129,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err for _, groupedMetric := range groupedMetrics { cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) putLogEvent := translateCWMetricToEMF(cWMetric) - // If RUN_IN_LAMBDA, send EMF log into stdout, which will then be redirected to CWLogs by Lambda - if runInLambda { + // Currently we only support two options for "OutputDestination". + if outputDestination == "stdout" { fmt.Println(*putLogEvent.InputLogEvent.Message) - } else { + } else if outputDestination == "CloudWatch" { logGroup := groupedMetric.Metadata.LogGroup logStream := groupedMetric.Metadata.LogStream if logStream == "" { @@ -149,7 +149,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err } } - if !runInLambda { + if outputDestination == "CloudWatch" { for _, pusher := range emf.listPushers() { returnError := pusher.ForceFlush() if returnError != nil { diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index a657da20d3074..9edd6cc1a6dd3 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -122,14 +122,14 @@ func TestConsumeMetrics(t *testing.T) { require.NoError(t, exp.Shutdown(ctx)) } -func TestConsumeMetricsWithRunInLambda(t *testing.T) { +func TestConsumeMetricsWithOutputDestination(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() factory := NewFactory() expCfg := factory.CreateDefaultConfig().(*Config) expCfg.Region = "us-west-2" expCfg.MaxRetries = 0 - expCfg.RunInLambda = true + expCfg.OutputDestination = "stdout" exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()}) assert.Nil(t, err) assert.NotNil(t, exp) diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index 677d9deedfd76..5620285bf9643 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -54,7 +54,7 @@ func createDefaultConfig() configmodels.Exporter { RoleARN: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", MetricDeclarations: make([]*MetricDeclaration, 0), - RunInLambda: false, + OutputDestination: "CloudWatch", logger: nil, } } From c73914641fa0f2f1dc10190c240ef59599b30d5d Mon Sep 17 00:00:00 2001 From: Shaocheng Wang Date: Thu, 25 Mar 2021 09:31:56 -0700 Subject: [PATCH 4/4] use lowercase in output_destination config --- exporter/awsemfexporter/README.md | 2 +- exporter/awsemfexporter/config.go | 4 ++-- exporter/awsemfexporter/config_test.go | 4 ++-- exporter/awsemfexporter/emf_exporter.go | 13 ++++++++++--- exporter/awsemfexporter/factory.go | 2 +- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index e2ed1fa243b2c..de19946045624 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -25,7 +25,7 @@ The following exporter configuration parameters are supported. | `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 | | `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)| | `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` | -| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "CloudWatch" or "stdout" | `CloudWatch` | +| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` | | [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] | | [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]| diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index f417e50798d5a..f5d57b4922622 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -67,8 +67,8 @@ type Config struct { // MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"` - // OutputDestination is an option to specify the EMFExporter output. Default option is "CloudWatch" - // "CloudWatch" - direct the exporter output to CloudWatch backend + // OutputDestination is an option to specify the EMFExporter output. Default option is "cloudwatch" + // "cloudwatch" - direct the exporter output to CloudWatch backend // "stdout" - direct the exporter output to stdout // TODO: we can support directing output to a file (in the future) while customer specifies a file path here. OutputDestination string `mapstructure:"output_destination"` diff --git a/exporter/awsemfexporter/config_test.go b/exporter/awsemfexporter/config_test.go index 78a835cd16246..35da8043c962a 100644 --- a/exporter/awsemfexporter/config_test.go +++ b/exporter/awsemfexporter/config_test.go @@ -60,7 +60,7 @@ func TestLoadConfig(t *testing.T) { Region: "us-west-2", RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole", DimensionRollupOption: "ZeroAndSingleDimensionRollup", - OutputDestination: "CloudWatch", + OutputDestination: "cloudwatch", MetricDeclarations: []*MetricDeclaration{}, MetricDescriptors: []MetricDescriptor{}, }, r1) @@ -80,7 +80,7 @@ func TestLoadConfig(t *testing.T) { Region: "", RoleARN: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", - OutputDestination: "CloudWatch", + OutputDestination: "cloudwatch", ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true}, MetricDeclarations: []*MetricDeclaration{}, MetricDescriptors: []MetricDescriptor{}, diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 78d388972d733..987a5510a0f33 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "github.com/aws/aws-sdk-go/aws" @@ -31,6 +32,12 @@ import ( "go.uber.org/zap" ) +const ( + // OutputDestination Options + outputDestinationCloudWatch = "cloudwatch" + outputDestinationStdout = "stdout" +) + type emfExporter struct { //Each (log group, log stream) keeps a separate Pusher because of each (log group, log stream) requires separate stream token. groupStreamToPusherMap map[string]map[string]Pusher @@ -130,9 +137,9 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig) putLogEvent := translateCWMetricToEMF(cWMetric) // Currently we only support two options for "OutputDestination". - if outputDestination == "stdout" { + if strings.EqualFold(outputDestination, outputDestinationStdout) { fmt.Println(*putLogEvent.InputLogEvent.Message) - } else if outputDestination == "CloudWatch" { + } else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { logGroup := groupedMetric.Metadata.LogGroup logStream := groupedMetric.Metadata.LogStream if logStream == "" { @@ -149,7 +156,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err } } - if outputDestination == "CloudWatch" { + if strings.EqualFold(outputDestination, outputDestinationCloudWatch) { for _, pusher := range emf.listPushers() { returnError := pusher.ForceFlush() if returnError != nil { diff --git a/exporter/awsemfexporter/factory.go b/exporter/awsemfexporter/factory.go index 5620285bf9643..082f34f3699b1 100644 --- a/exporter/awsemfexporter/factory.go +++ b/exporter/awsemfexporter/factory.go @@ -54,7 +54,7 @@ func createDefaultConfig() configmodels.Exporter { RoleARN: "", DimensionRollupOption: "ZeroAndSingleDimensionRollup", MetricDeclarations: make([]*MetricDeclaration, 0), - OutputDestination: "CloudWatch", + OutputDestination: "cloudwatch", logger: nil, } }