Skip to content

Commit 7c3c4c4

Browse files
authored
[extension/ecsobserver] Add task definition, ec2 and service fetcher (#3503)
* ext: ecsobserver Add TaskDefinition Fetcher * ext: ecsobserver Fetch ec2 instance info for ECS EC2 * ext: ecsobserver Add Service Fetcher and mock
1 parent 222b401 commit 7c3c4c4

File tree

6 files changed

+1026
-26
lines changed

6 files changed

+1026
-26
lines changed

extension/observer/ecsobserver/fetcher.go

Lines changed: 326 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,52 @@ package ecsobserver
1717
import (
1818
"context"
1919
"fmt"
20+
"sort"
21+
"strings"
2022

2123
"github.com/aws/aws-sdk-go/aws"
2224
"github.com/aws/aws-sdk-go/aws/request"
25+
"github.com/aws/aws-sdk-go/service/ec2"
2326
"github.com/aws/aws-sdk-go/service/ecs"
27+
"github.com/hashicorp/golang-lru/simplelru"
2428
"go.uber.org/zap"
2529
)
2630

31+
const (
32+
// ECS Service Quota: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-quotas.html
33+
taskDefCacheSize = 2000
34+
// Based on existing number from cloudwatch-agent
35+
ec2CacheSize = 2000
36+
describeContainerInstanceLimit = 100
37+
describeServiceLimit = 10
38+
// NOTE: these constants are not defined in go sdk, there are three values for deployment status.
39+
deploymentStatusActive = "ACTIVE"
40+
deploymentStatusPrimary = "PRIMARY"
41+
)
42+
2743
// ecsClient includes API required by taskFetcher.
2844
type ecsClient interface {
2945
ListTasksWithContext(ctx context.Context, input *ecs.ListTasksInput, opts ...request.Option) (*ecs.ListTasksOutput, error)
3046
DescribeTasksWithContext(ctx context.Context, input *ecs.DescribeTasksInput, opts ...request.Option) (*ecs.DescribeTasksOutput, error)
47+
DescribeTaskDefinitionWithContext(ctx context.Context, input *ecs.DescribeTaskDefinitionInput, opts ...request.Option) (*ecs.DescribeTaskDefinitionOutput, error)
48+
DescribeContainerInstancesWithContext(ctx context.Context, input *ecs.DescribeContainerInstancesInput, opts ...request.Option) (*ecs.DescribeContainerInstancesOutput, error)
49+
ListServicesWithContext(ctx context.Context, input *ecs.ListServicesInput, opts ...request.Option) (*ecs.ListServicesOutput, error)
50+
DescribeServicesWithContext(ctx context.Context, input *ecs.DescribeServicesInput, opts ...request.Option) (*ecs.DescribeServicesOutput, error)
51+
}
52+
53+
// ec2Client includes API required by TaskFetcher.
54+
type ec2Client interface {
55+
DescribeInstancesWithContext(ctx context.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
3156
}
3257

3358
type taskFetcher struct {
34-
logger *zap.Logger
35-
ecs ecsClient
36-
cluster string
59+
logger *zap.Logger
60+
ecs ecsClient
61+
ec2 ec2Client
62+
cluster string
63+
taskDefCache simplelru.LRUCache
64+
ec2Cache simplelru.LRUCache
65+
serviceNameFilter serviceNameFilter
3766
}
3867

3968
type taskFetcherOptions struct {
@@ -43,24 +72,68 @@ type taskFetcherOptions struct {
4372

4473
// test overrides
4574
ecsOverride ecsClient
75+
ec2Override ec2Client
4676
}
4777

4878
func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) {
79+
// Init cache
80+
taskDefCache, err := simplelru.NewLRU(taskDefCacheSize, nil)
81+
if err != nil {
82+
return nil, err
83+
}
84+
ec2Cache, err := simplelru.NewLRU(ec2CacheSize, nil)
85+
if err != nil {
86+
return nil, err
87+
}
88+
4989
fetcher := taskFetcher{
50-
logger: opts.Logger,
51-
ecs: opts.ecsOverride,
52-
cluster: opts.Cluster,
90+
logger: opts.Logger,
91+
ecs: opts.ecsOverride,
92+
ec2: opts.ec2Override,
93+
cluster: opts.Cluster,
94+
taskDefCache: taskDefCache,
95+
ec2Cache: ec2Cache,
96+
// TODO: after the service matcher PR is merged, use actual service name filter here.
97+
// For now, describe all the services
98+
serviceNameFilter: func(name string) bool {
99+
return true
100+
},
53101
}
54-
// Return early if clients are mocked
55-
if fetcher.ecs != nil {
102+
// Return early if any clients are mocked, caller should overrides all the clients when mocking.
103+
if fetcher.ecs != nil || fetcher.ec2 != nil {
56104
return &fetcher, nil
57105
}
58106
return nil, fmt.Errorf("actual aws init logic not implemented")
59107
}
60108

61-
// GetAllTasks get arns of all running tasks and describe those tasks.
109+
func (f *taskFetcher) fetchAndDecorate(ctx context.Context) ([]*Task, error) {
110+
// Task
111+
rawTasks, err := f.getAllTasks(ctx)
112+
if err != nil {
113+
return nil, fmt.Errorf("getAllTasks failed: %w", err)
114+
}
115+
tasks, err := f.attachTaskDefinition(ctx, rawTasks)
116+
if err != nil {
117+
return nil, fmt.Errorf("attachTaskDefinition failed: %w", err)
118+
}
119+
120+
// EC2
121+
if err = f.attachContainerInstance(ctx, tasks); err != nil {
122+
return nil, fmt.Errorf("attachContainerInstance failed: %w", err)
123+
}
124+
125+
// Services
126+
services, err := f.getAllServices(ctx)
127+
if err != nil {
128+
return nil, fmt.Errorf("getAllServices failed: %w", err)
129+
}
130+
f.attachService(tasks, services)
131+
return tasks, nil
132+
}
133+
134+
// getAllTasks get arns of all running tasks and describe those tasks.
62135
// There is no API to list task detail without arn so we need to call two APIs.
63-
func (f *taskFetcher) GetAllTasks(ctx context.Context) ([]*ecs.Task, error) {
136+
func (f *taskFetcher) getAllTasks(ctx context.Context) ([]*ecs.Task, error) {
64137
svc := f.ecs
65138
cluster := aws.String(f.cluster)
66139
req := ecs.ListTasksInput{Cluster: cluster}
@@ -86,3 +159,246 @@ func (f *taskFetcher) GetAllTasks(ctx context.Context) ([]*ecs.Task, error) {
86159
}
87160
return tasks, nil
88161
}
162+
163+
// attachTaskDefinition converts ecs.Task into a annotated Task to include its ecs.TaskDefinition.
164+
func (f *taskFetcher) attachTaskDefinition(ctx context.Context, tasks []*ecs.Task) ([]*Task, error) {
165+
svc := f.ecs
166+
// key is task definition arn
167+
arn2Def := make(map[string]*ecs.TaskDefinition)
168+
for _, t := range tasks {
169+
arn2Def[aws.StringValue(t.TaskDefinitionArn)] = nil
170+
}
171+
172+
for arn := range arn2Def {
173+
if arn == "" {
174+
continue
175+
}
176+
var def *ecs.TaskDefinition
177+
if cached, ok := f.taskDefCache.Get(arn); ok {
178+
def = cached.(*ecs.TaskDefinition)
179+
} else {
180+
res, err := svc.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
181+
TaskDefinition: aws.String(arn),
182+
})
183+
if err != nil {
184+
return nil, err
185+
}
186+
f.taskDefCache.Add(arn, res.TaskDefinition)
187+
def = res.TaskDefinition
188+
}
189+
arn2Def[arn] = def
190+
}
191+
192+
var tasksWithDef []*Task
193+
for _, t := range tasks {
194+
tasksWithDef = append(tasksWithDef, &Task{
195+
Task: t,
196+
Definition: arn2Def[aws.StringValue(t.TaskDefinitionArn)],
197+
})
198+
}
199+
return tasksWithDef, nil
200+
}
201+
202+
// attachContainerInstance fetches all the container instances' underlying EC2 vms
203+
// and attach EC2 info to tasks.
204+
func (f *taskFetcher) attachContainerInstance(ctx context.Context, tasks []*Task) error {
205+
// Map container instance to EC2, key is container instance id.
206+
ciToEC2 := make(map[string]*ec2.Instance)
207+
// Only EC2 instance type need to fetch EC2 info
208+
for _, t := range tasks {
209+
if aws.StringValue(t.Task.LaunchType) != ecs.LaunchTypeEc2 {
210+
continue
211+
}
212+
ciToEC2[aws.StringValue(t.Task.ContainerInstanceArn)] = nil
213+
}
214+
// All fargate, skip
215+
if len(ciToEC2) == 0 {
216+
return nil
217+
}
218+
219+
// Describe container instances that do not have cached EC2 info.
220+
var instanceList []*string
221+
for instanceArn := range ciToEC2 {
222+
cached, ok := f.ec2Cache.Get(instanceArn)
223+
if ok {
224+
ciToEC2[instanceArn] = cached.(*ec2.Instance) // use value from cache
225+
} else {
226+
instanceList = append(instanceList, aws.String(instanceArn))
227+
}
228+
}
229+
sortStringPointers(instanceList)
230+
231+
// DescribeContainerInstance size limit is 100, do it in batch.
232+
for i := 0; i < len(instanceList); i += describeContainerInstanceLimit {
233+
end := minInt(i+describeContainerInstanceLimit, len(instanceList))
234+
if err := f.describeContainerInstances(ctx, instanceList[i:end], ciToEC2); err != nil {
235+
return fmt.Errorf("describe container instanced failed offset=%d: %w", i, err)
236+
}
237+
}
238+
239+
// Assign the info back to task
240+
for _, t := range tasks {
241+
// NOTE: we need to skip fargate here because we are looping all tasks again.
242+
if aws.StringValue(t.Task.LaunchType) != ecs.LaunchTypeEc2 {
243+
continue
244+
}
245+
containerInstance := aws.StringValue(t.Task.ContainerInstanceArn)
246+
ec2Info, ok := ciToEC2[containerInstance]
247+
if !ok {
248+
return fmt.Errorf("container instance ec2 info not found containerInstnace=%q", containerInstance)
249+
}
250+
t.EC2 = ec2Info
251+
}
252+
253+
// Update the cache
254+
for ci, ec2Info := range ciToEC2 {
255+
f.ec2Cache.Add(ci, ec2Info)
256+
}
257+
return nil
258+
}
259+
260+
// Run ecs.DescribeContainerInstances and ec2.DescribeInstances for a batch (less than 100 container instances).
261+
func (f *taskFetcher) describeContainerInstances(ctx context.Context, instanceList []*string,
262+
ci2EC2 map[string]*ec2.Instance) error {
263+
// Get container instances
264+
res, err := f.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
265+
Cluster: aws.String(f.cluster),
266+
ContainerInstances: instanceList,
267+
})
268+
if err != nil {
269+
return fmt.Errorf("ecs.DescribeContainerInstance failed: %w", err)
270+
}
271+
272+
// Create the index to map ec2 id back to container instance id.
273+
var ec2Ids []*string
274+
ec2IdToCI := make(map[string]string)
275+
for _, containerInstance := range res.ContainerInstances {
276+
ec2Id := containerInstance.Ec2InstanceId
277+
ec2Ids = append(ec2Ids, ec2Id)
278+
ec2IdToCI[aws.StringValue(ec2Id)] = aws.StringValue(containerInstance.ContainerInstanceArn)
279+
}
280+
281+
// Fetch all ec2 instances and update mapping from container instance id to ec2 info.
282+
// NOTE: because the limit on ec2 is 1000, much larger than ecs container instance's 100,
283+
// we don't do paging logic here.
284+
req := ec2.DescribeInstancesInput{InstanceIds: ec2Ids}
285+
ec2Res, err := f.ec2.DescribeInstancesWithContext(ctx, &req)
286+
if err != nil {
287+
return fmt.Errorf("ec2.DescribeInstances failed: %w", err)
288+
}
289+
for _, reservation := range ec2Res.Reservations {
290+
for _, instance := range reservation.Instances {
291+
if instance.InstanceId == nil {
292+
continue
293+
}
294+
ec2Id := aws.StringValue(instance.InstanceId)
295+
ci, ok := ec2IdToCI[ec2Id]
296+
if !ok {
297+
return fmt.Errorf("mapping from ec2 to container instance not found ec2=%s", ec2Id)
298+
}
299+
ci2EC2[ci] = instance // update mapping
300+
}
301+
}
302+
return nil
303+
}
304+
305+
// serviceNameFilter decides if we should get detail info for a service, i.e. make the describe API call.
306+
type serviceNameFilter func(name string) bool
307+
308+
// getAllServices does not have cache like task definition or ec2 instances
309+
// because we need to get the deployment id to map service to task, which changes frequently.
310+
func (f *taskFetcher) getAllServices(ctx context.Context) ([]*ecs.Service, error) {
311+
svc := f.ecs
312+
cluster := aws.String(f.cluster)
313+
// List and filter out services we need to desribe.
314+
listReq := ecs.ListServicesInput{Cluster: cluster}
315+
var servicesToDescribe []*string
316+
for {
317+
res, err := svc.ListServicesWithContext(ctx, &listReq)
318+
if err != nil {
319+
return nil, err
320+
}
321+
for _, arn := range res.ServiceArns {
322+
segs := strings.Split(aws.StringValue(arn), "/")
323+
name := segs[len(segs)-1]
324+
if f.serviceNameFilter(name) {
325+
servicesToDescribe = append(servicesToDescribe, arn)
326+
}
327+
}
328+
if res.NextToken == nil {
329+
break
330+
}
331+
listReq.NextToken = res.NextToken
332+
}
333+
334+
// DescribeServices size limit is 10 so we need to do paging on client side.
335+
var services []*ecs.Service
336+
for i := 0; i < len(servicesToDescribe); i += describeServiceLimit {
337+
end := minInt(i+describeServiceLimit, len(servicesToDescribe))
338+
desc := &ecs.DescribeServicesInput{
339+
Cluster: cluster,
340+
Services: servicesToDescribe[i:end],
341+
}
342+
res, err := svc.DescribeServicesWithContext(ctx, desc)
343+
if err != nil {
344+
return nil, fmt.Errorf("ecs.DescribeServices failed %w", err)
345+
}
346+
services = append(services, res.Services...)
347+
}
348+
return services, nil
349+
}
350+
351+
// attachService map service to task using deployment id.
352+
// Each service can have multiple deployment and each task keep track of the deployment in task.StartedBy.
353+
func (f *taskFetcher) attachService(tasks []*Task, services []*ecs.Service) {
354+
// Map deployment ID to service name
355+
idToService := make(map[string]*ecs.Service)
356+
for _, svc := range services {
357+
for _, deployment := range svc.Deployments {
358+
status := aws.StringValue(deployment.Status)
359+
if status == deploymentStatusActive || status == deploymentStatusPrimary {
360+
idToService[aws.StringValue(deployment.Id)] = svc
361+
break
362+
}
363+
}
364+
}
365+
366+
// Attach service to task
367+
for _, t := range tasks {
368+
// Task is created using RunTask i.e. not manged by a service.
369+
if t.Task.StartedBy == nil {
370+
continue
371+
}
372+
deploymentID := aws.StringValue(t.Task.StartedBy)
373+
svc := idToService[deploymentID]
374+
// Service not found happen a lot because we only fetch services defined in ServiceConfig.
375+
// However, we fetch all the tasks, which could be started by other services no mentioned in config
376+
// or started using RunTasks API directly.
377+
if svc == nil {
378+
continue
379+
}
380+
t.Service = svc
381+
}
382+
}
383+
384+
// Util Start
385+
386+
func sortStringPointers(ps []*string) {
387+
var ss []string
388+
for _, p := range ps {
389+
ss = append(ss, aws.StringValue(p))
390+
}
391+
sort.Strings(ss)
392+
for i := range ss {
393+
ps[i] = aws.String(ss[i])
394+
}
395+
}
396+
397+
func minInt(a, b int) int {
398+
if a < b {
399+
return a
400+
}
401+
return b
402+
}
403+
404+
// Util End

0 commit comments

Comments
 (0)