Skip to content

Commit 18cec46

Browse files
authored
[extension/ecsobserver] Add exporter to convert task to target (#3333)
* ext: ecsobserver Add exporter to convert task to target * ext: ecsobserver Don't export TaskExporter * ext: ecsobserver Add printing typed error to log fields This allow searching for log based on field value. * ext: ecsobserver Add errctx package This should aovid use defining getter and setters * ext: ecssd Use errctx and remove getter and setter * ext: ecssd Fix linter errors Its shadow warning seems to be a bit buggy * ext: ecsobserver Run go mod tidy sum file is updated, might caused by merging
1 parent 7c3c4c4 commit 18cec46

File tree

10 files changed

+800
-3
lines changed

10 files changed

+800
-3
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ecsobserver
16+
17+
import (
18+
"errors"
19+
20+
"github.com/aws/aws-sdk-go/aws"
21+
"go.uber.org/multierr"
22+
"go.uber.org/zap"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/errctx"
25+
)
26+
27+
// error.go defines common error interfaces and util methods for generating reports
28+
// for log and metrics that can be used for debugging.
29+
30+
const (
31+
errKeyTask = "task"
32+
errKeyTarget = "target"
33+
)
34+
35+
type errWithAttributes interface {
36+
// message does not include attributes like task arn etc.
37+
// and expect the caller extract them using getters.
38+
message() string
39+
// zapFields will be logged as json attribute and allows searching and filter backend like cloudwatch.
40+
// For example { $.ErrScope == "Target" } list all the error whose scope is a (scrape) target.
41+
zapFields() []zap.Field
42+
}
43+
44+
func printErrors(logger *zap.Logger, err error) {
45+
merr := multierr.Errors(err)
46+
if merr == nil {
47+
return
48+
}
49+
50+
for _, err := range merr {
51+
m := err.Error()
52+
// Use the short message, this makes searching the code via error message easier
53+
// as additional info are flushed as fields.
54+
var errAttr errWithAttributes
55+
if errors.As(err, &errAttr) {
56+
m = errAttr.message()
57+
}
58+
fields, scope := extractErrorFields(err)
59+
fields = append(fields, zap.String("ErrScope", scope))
60+
logger.Error(m, fields...)
61+
}
62+
}
63+
64+
func extractErrorFields(err error) ([]zap.Field, string) {
65+
var fields []zap.Field
66+
scope := "Unknown"
67+
var errAttr errWithAttributes
68+
// Stop early because we are only attaching value for our internal errors.
69+
if !errors.As(err, &errAttr) {
70+
return fields, scope
71+
}
72+
fields = errAttr.zapFields()
73+
v, ok := errctx.ValueFrom(err, errKeyTask)
74+
if ok {
75+
// Rename ok to tok because linter says it shadows outer ok.
76+
// Though the linter seems to allow the similar block to shadow...
77+
if task, tok := v.(*Task); tok {
78+
fields = append(fields, zap.String("TaskArn", aws.StringValue(task.Task.TaskArn)))
79+
scope = "Task"
80+
}
81+
}
82+
v, ok = errctx.ValueFrom(err, errKeyTarget)
83+
if ok {
84+
if target, ok := v.(MatchedTarget); ok {
85+
// TODO: change to string once another PR for matcher got merged
86+
// https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3386 defines Stringer
87+
fields = append(fields, zap.Int("MatcherType", int(target.MatcherType)))
88+
scope = "Target"
89+
}
90+
}
91+
return fields, scope
92+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ecsobserver
16+
17+
import (
18+
"testing"
19+
20+
"go.uber.org/zap"
21+
)
22+
23+
func TestSetInvalidError(t *testing.T) {
24+
printErrors(zap.NewExample(), nil) // you know, for coverage
25+
// The actual test cen be found in the following locations:
26+
//
27+
// exporter_test.go where we filter logs by error scope
28+
}

extension/observer/ecsobserver/exporter.go

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,19 @@
1414

1515
package ecsobserver
1616

17-
import "fmt"
17+
import (
18+
"fmt"
19+
20+
"github.com/aws/aws-sdk-go/aws"
21+
"go.uber.org/multierr"
22+
"go.uber.org/zap"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/errctx"
25+
)
26+
27+
const (
28+
defaultMetricsPath = "/metrics"
29+
)
1830

1931
// CommonExporterConfig should be embedded into filter config.
2032
// They set labels like job, metrics_path etc. that can override prometheus default.
@@ -45,3 +57,106 @@ type commonExportSetting struct {
4557
func (s *commonExportSetting) hasContainerPort(containerPort int) bool {
4658
return s.metricsPorts[containerPort]
4759
}
60+
61+
// taskExporter converts annotated Task into PrometheusECSTarget.
62+
type taskExporter struct {
63+
logger *zap.Logger
64+
cluster string
65+
}
66+
67+
func newTaskExporter(logger *zap.Logger, cluster string) *taskExporter {
68+
return &taskExporter{
69+
logger: logger,
70+
cluster: cluster,
71+
}
72+
}
73+
74+
// exportTasks loops a list of tasks and export prometheus scrape targets.
75+
// It keeps track of error but does NOT stop when error occurs.
76+
// The returned targets are valid, invalid targets are saved in a multi error.
77+
// Caller can ignore the error because the only source is failing to get ip and port.
78+
// The error(s) can generates debug log or metrics.
79+
// To print the error with its task as context, use printExporterErrors.
80+
func (e *taskExporter) exportTasks(tasks []*Task) ([]PrometheusECSTarget, error) {
81+
var merr error
82+
var allTargets []PrometheusECSTarget
83+
for _, t := range tasks {
84+
targets, err := e.exportTask(t)
85+
multierr.AppendInto(&merr, err) // if err == nil, AppendInto does nothing
86+
// Even if there are error, returned targets are still valid.
87+
allTargets = append(allTargets, targets...)
88+
}
89+
return allTargets, merr
90+
}
91+
92+
// exportTask exports all the matched container within a single task.
93+
// One task can contain multiple containers. One container can have more than one target
94+
// if there are multiple ports in `metrics_port`.
95+
func (e *taskExporter) exportTask(task *Task) ([]PrometheusECSTarget, error) {
96+
// All targets in one task shares same IP.
97+
privateIP, err := task.PrivateIP()
98+
if err != nil {
99+
return nil, errctx.WithValue(err, errKeyTask, task)
100+
}
101+
102+
// Base for all the containers in this task, most attributes are same.
103+
baseTarget := PrometheusECSTarget{
104+
Source: aws.StringValue(task.Task.TaskArn),
105+
MetricsPath: defaultMetricsPath,
106+
ClusterName: e.cluster,
107+
TaskDefinitionFamily: aws.StringValue(task.Definition.Family),
108+
TaskDefinitionRevision: int(aws.Int64Value(task.Definition.Revision)),
109+
TaskStartedBy: aws.StringValue(task.Task.StartedBy),
110+
TaskLaunchType: aws.StringValue(task.Task.LaunchType),
111+
TaskGroup: aws.StringValue(task.Task.Group),
112+
TaskTags: task.TaskTags(),
113+
HealthStatus: aws.StringValue(task.Task.HealthStatus),
114+
}
115+
if task.Service != nil {
116+
baseTarget.ServiceName = aws.StringValue(task.Service.ServiceName)
117+
}
118+
if task.EC2 != nil {
119+
ec2 := task.EC2
120+
baseTarget.EC2InstanceID = aws.StringValue(ec2.InstanceId)
121+
baseTarget.EC2InstanceType = aws.StringValue(ec2.InstanceType)
122+
baseTarget.EC2Tags = task.EC2Tags()
123+
baseTarget.EC2VpcID = aws.StringValue(ec2.VpcId)
124+
baseTarget.EC2SubnetID = aws.StringValue(ec2.SubnetId)
125+
baseTarget.EC2PrivateIP = privateIP
126+
baseTarget.EC2PublicIP = aws.StringValue(ec2.PublicIpAddress)
127+
}
128+
129+
var targetsInTask []PrometheusECSTarget
130+
var merr error
131+
for _, m := range task.Matched {
132+
container := task.Definition.ContainerDefinitions[m.ContainerIndex]
133+
// Shallow copy task level attributes
134+
containerTarget := baseTarget
135+
// Add container specific info
136+
containerTarget.ContainerName = aws.StringValue(container.Name)
137+
containerTarget.ContainerLabels = task.ContainerLabels(m.ContainerIndex)
138+
// Multiple targets for a single container
139+
for _, matchedTarget := range m.Targets {
140+
// Shallow copy from container
141+
target := containerTarget
142+
mappedPort, err := task.MappedPort(container, int64(matchedTarget.Port))
143+
if err != nil {
144+
err = errctx.WithValues(err, map[string]interface{}{
145+
errKeyTarget: matchedTarget,
146+
errKeyTask: task,
147+
})
148+
}
149+
// Skip this target and keep track of port error, does not abort.
150+
if multierr.AppendInto(&merr, err) {
151+
continue
152+
}
153+
target.Address = fmt.Sprintf("%s:%d", privateIP, mappedPort)
154+
if matchedTarget.MetricsPath != "" {
155+
target.MetricsPath = matchedTarget.MetricsPath
156+
}
157+
target.Job = matchedTarget.Job
158+
targetsInTask = append(targetsInTask, target)
159+
}
160+
}
161+
return targetsInTask, merr
162+
}

0 commit comments

Comments
 (0)