Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "CloudWatch" or "stdout" | `CloudWatch` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "CloudWatch" or "stdout" | `CloudWatch` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |

All lowercase seems more consistent with the config keys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I make the change accordingly.

| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors.| [ ]|

Expand Down
6 changes: 6 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type Config struct {
// MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch
MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"`

// OutputDestination is an option to specify the EMFExporter output. Default option is "CloudWatch"
// "CloudWatch" - direct the exporter output to CloudWatch backend
// "stdout" - direct the exporter output to stdout
// TODO: we can support directing output to a file (in the future) while customer specifies a file path here.
OutputDestination string `mapstructure:"output_destination"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "CloudWatch",
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
}, r1)
Expand All @@ -79,6 +80,7 @@ func TestLoadConfig(t *testing.T) {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
OutputDestination: "CloudWatch",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
MetricDeclarations: []*MetricDeclaration{},
MetricDescriptors: []MetricDescriptor{},
Expand Down
45 changes: 26 additions & 19 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
groupedMetrics := make(map[interface{}]*GroupedMetric)
expConfig := emf.config.(*Config)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
outputDestination := expConfig.OutputDestination

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -128,31 +129,37 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) err
for _, groupedMetric := range groupedMetrics {
cWMetric := translateGroupedMetricToCWMetric(groupedMetric, expConfig)
putLogEvent := translateCWMetricToEMF(cWMetric)
// Currently we only support two options for "OutputDestination".
if outputDestination == "stdout" {
fmt.Println(*putLogEvent.InputLogEvent.Message)
} else if outputDestination == "CloudWatch" {
logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

logGroup := groupedMetric.Metadata.LogGroup
logStream := groupedMetric.Metadata.LogStream
if logStream == "" {
logStream = defaultLogStream
}

pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
pusher := emf.getPusher(logGroup, logStream)
if pusher != nil {
returnError := pusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(&returnError)
}
}
}
}

for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
if outputDestination == "CloudWatch" {
for _, pusher := range emf.listPushers() {
returnError := pusher.ForceFlush()
if returnError != nil {
//TODO now we only have one pusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(&returnError)
if err != nil {
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
}
return err
}
return err
}
}

Expand Down
60 changes: 60 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,66 @@ func TestConsumeMetrics(t *testing.T) {
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithOutputDestination(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = 0
expCfg.OutputDestination = "stdout"
exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.Nil(t, err)
assert.NotNil(t, exp)

mdata := internaldata.MetricsData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"},
LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"},
},
Resource: &resourcepb.Resource{
Labels: map[string]string{
"resource": "R1",
},
},
Metrics: []*metricspb.Metric{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "spanCounter",
Description: "Counting all the spans",
Unit: "Count",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "spanName"},
{Key: "isItAnError"},
},
},
Timeseries: []*metricspb.TimeSeries{
{
LabelValues: []*metricspb.LabelValue{
{Value: "testSpan", HasValue: true},
{Value: "false", HasValue: true},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1234567890123,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
},
}
md := internaldata.OCToMetrics(mdata)
require.NoError(t, exp.ConsumeMetrics(ctx, md))
require.NoError(t, exp.Shutdown(ctx))
}

func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func createDefaultConfig() configmodels.Exporter {
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: make([]*MetricDeclaration, 0),
OutputDestination: "CloudWatch",
logger: nil,
}
}
Expand Down