Skip to content

Commit 1c98077

Browse files
shaochengwangpmatyjasek-sumo
authored andcommitted
Direct EMF output to stdout to support AWS Lambda (#2720)
* Direct EMF output to stdout to support AWS Lambda * fix some old lib after merging from upstream * Use output_destination in customer config * use lowercase in output_destination config
1 parent 42e9b4d commit 1c98077

File tree

6 files changed

+103
-19
lines changed

6 files changed

+103
-19
lines changed

exporter/awsemfexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
2525
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
2626
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
2727
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
28+
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |
2829
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}```| [ ] |
2930
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
3031
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|

exporter/awsemfexporter/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ type Config struct {
7171
// MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch
7272
MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"`
7373

74+
// OutputDestination is an option to specify the EMFExporter output. Default option is "cloudwatch"
75+
// "cloudwatch" - direct the exporter output to CloudWatch backend
76+
// "stdout" - direct the exporter output to stdout
77+
// TODO: we can support directing output to a file (in the future) while customer specifies a file path here.
78+
OutputDestination string `mapstructure:"output_destination"`
79+
7480
// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
7581
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
7682
// If enabled, all the resource attributes will be converted to metric labels by default.

exporter/awsemfexporter/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
6060
Region: "us-west-2",
6161
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
6262
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
63+
OutputDestination: "cloudwatch",
6364
ParseJSONEncodedAttributeValues: make([]string, 0),
6465
MetricDeclarations: []*MetricDeclaration{},
6566
MetricDescriptors: []MetricDescriptor{},
@@ -80,6 +81,7 @@ func TestLoadConfig(t *testing.T) {
8081
Region: "",
8182
RoleARN: "",
8283
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
84+
OutputDestination: "cloudwatch",
8385
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
8486
ParseJSONEncodedAttributeValues: make([]string, 0),
8587
MetricDeclarations: []*MetricDeclaration{},

exporter/awsemfexporter/emf_exporter.go

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"strings"
2122
"sync"
2223

2324
"github.com/aws/aws-sdk-go/aws"
@@ -31,6 +32,12 @@ import (
3132
"go.uber.org/zap"
3233
)
3334

35+
const (
36+
// OutputDestination Options
37+
outputDestinationCloudWatch = "cloudwatch"
38+
outputDestinationStdout = "stdout"
39+
)
40+
3441
type emfExporter struct {
3542
//Each (log group, log stream) keeps a separate Pusher because of each (log group, log stream) requires separate stream token.
3643
groupStreamToPusherMap map[string]map[string]Pusher
@@ -119,6 +126,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
119126
groupedMetrics := make(map[interface{}]*GroupedMetric)
120127
expConfig := emf.config.(*Config)
121128
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
129+
outputDestination := expConfig.OutputDestination
122130

123131
for i := 0; i < rms.Len(); i++ {
124132
rm := rms.At(i)
@@ -128,31 +136,37 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
128136
for _, groupedMetric := range groupedMetrics {
129137
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
130138
putLogEvent := translateCWMetricToEMF(cWMetric, expConfig)
139+
// Currently we only support two options for "OutputDestination".
140+
if strings.EqualFold(outputDestination, outputDestinationStdout) {
141+
fmt.Println(*putLogEvent.InputLogEvent.Message)
142+
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
143+
logGroup := groupedMetric.Metadata.LogGroup
144+
logStream := groupedMetric.Metadata.LogStream
145+
if logStream == "" {
146+
logStream = defaultLogStream
147+
}
131148

132-
logGroup := groupedMetric.Metadata.LogGroup
133-
logStream := groupedMetric.Metadata.LogStream
134-
if logStream == "" {
135-
logStream = defaultLogStream
136-
}
137-
138-
pusher := emf.getPusher(logGroup, logStream)
139-
if pusher != nil {
140-
returnError := pusher.AddLogEntry(putLogEvent)
141-
if returnError != nil {
142-
return wrapErrorIfBadRequest(&returnError)
149+
pusher := emf.getPusher(logGroup, logStream)
150+
if pusher != nil {
151+
returnError := pusher.AddLogEntry(putLogEvent)
152+
if returnError != nil {
153+
return wrapErrorIfBadRequest(&returnError)
154+
}
143155
}
144156
}
145157
}
146158

147-
for _, pusher := range emf.listPushers() {
148-
returnError := pusher.ForceFlush()
149-
if returnError != nil {
150-
//TODO now we only have one pusher, so it's ok to return after first error occurred
151-
err := wrapErrorIfBadRequest(&returnError)
152-
if err != nil {
153-
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
159+
if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
160+
for _, pusher := range emf.listPushers() {
161+
returnError := pusher.ForceFlush()
162+
if returnError != nil {
163+
//TODO now we only have one pusher, so it's ok to return after first error occurred
164+
err := wrapErrorIfBadRequest(&returnError)
165+
if err != nil {
166+
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
167+
}
168+
return err
154169
}
155-
return err
156170
}
157171
}
158172

exporter/awsemfexporter/emf_exporter_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,66 @@ func TestConsumeMetrics(t *testing.T) {
122122
require.NoError(t, exp.Shutdown(ctx))
123123
}
124124

125+
func TestConsumeMetricsWithOutputDestination(t *testing.T) {
126+
ctx, cancel := context.WithCancel(context.Background())
127+
defer cancel()
128+
factory := NewFactory()
129+
expCfg := factory.CreateDefaultConfig().(*Config)
130+
expCfg.Region = "us-west-2"
131+
expCfg.MaxRetries = 0
132+
expCfg.OutputDestination = "stdout"
133+
exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()})
134+
assert.Nil(t, err)
135+
assert.NotNil(t, exp)
136+
137+
mdata := internaldata.MetricsData{
138+
Node: &commonpb.Node{
139+
ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"},
140+
LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"},
141+
},
142+
Resource: &resourcepb.Resource{
143+
Labels: map[string]string{
144+
"resource": "R1",
145+
},
146+
},
147+
Metrics: []*metricspb.Metric{
148+
{
149+
MetricDescriptor: &metricspb.MetricDescriptor{
150+
Name: "spanCounter",
151+
Description: "Counting all the spans",
152+
Unit: "Count",
153+
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
154+
LabelKeys: []*metricspb.LabelKey{
155+
{Key: "spanName"},
156+
{Key: "isItAnError"},
157+
},
158+
},
159+
Timeseries: []*metricspb.TimeSeries{
160+
{
161+
LabelValues: []*metricspb.LabelValue{
162+
{Value: "testSpan", HasValue: true},
163+
{Value: "false", HasValue: true},
164+
},
165+
Points: []*metricspb.Point{
166+
{
167+
Timestamp: &timestamp.Timestamp{
168+
Seconds: 1234567890123,
169+
},
170+
Value: &metricspb.Point_Int64Value{
171+
Int64Value: 1,
172+
},
173+
},
174+
},
175+
},
176+
},
177+
},
178+
},
179+
}
180+
md := internaldata.OCToMetrics(mdata)
181+
require.NoError(t, exp.ConsumeMetrics(ctx, md))
182+
require.NoError(t, exp.Shutdown(ctx))
183+
}
184+
125185
func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
126186
ctx, cancel := context.WithCancel(context.Background())
127187
defer cancel()

exporter/awsemfexporter/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func createDefaultConfig() configmodels.Exporter {
5555
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
5656
ParseJSONEncodedAttributeValues: make([]string, 0),
5757
MetricDeclarations: make([]*MetricDeclaration, 0),
58+
OutputDestination: "cloudwatch",
5859
logger: nil,
5960
}
6061
}

0 commit comments

Comments
 (0)