From 90be16c03dbbb150134a8df47352fa969a9e8ca2 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 29 Oct 2024 13:14:23 -0700 Subject: [PATCH 1/6] add support to extract ec2 tags from IMDS endpoint Signed-off-by: Kavindu Dodanduwa --- .../add_cloud_metadata/provider_aws_ec2.go | 121 +++++++++-- .../provider_aws_ec2_test.go | 199 ++++++++++-------- .../add_cloud_metadata/providers.go | 2 +- 3 files changed, 214 insertions(+), 108 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go index ea945ce4bbad..735fccb235f3 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go @@ -20,12 +20,15 @@ package add_cloud_metadata import ( "context" "fmt" + "io" "net/http" + "strings" "github.com/elastic/elastic-agent-libs/logp" awssdk "github.com/aws/aws-sdk-go-v2/aws" awscfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials/ec2rolecreds" "github.com/aws/aws-sdk-go-v2/feature/ec2/imds" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" @@ -35,7 +38,14 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" ) +const ( + eksClusterNameTagKey = "eks:cluster-name" + tagsCategory = "tags/instance" + tagPrefix = "ec2.tag" +) + type IMDSClient interface { + ec2rolecreds.GetMetadataAPIClient GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) } @@ -90,30 +100,17 @@ func fetchRawProviderMetadata( result.err = fmt.Errorf("failed loading AWS default configuration: %w", err) return } - awsClient := NewIMDSClient(awsConfig) - instanceIdentity, err := awsClient.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{}) + imdsClient := NewIMDSClient(awsConfig) + instanceIdentity, err := imdsClient.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{}) if err != nil { result.err = fmt.Errorf("failed fetching EC2 Identity Document: %w", err) return } - // AWS Region must be set to be able to get EC2 Tags awsRegion := instanceIdentity.InstanceIdentityDocument.Region - awsConfig.Region = awsRegion accountID := instanceIdentity.InstanceIdentityDocument.AccountID - - clusterName, err := fetchEC2ClusterNameTag(awsConfig, instanceIdentity.InstanceIdentityDocument.InstanceID) - if err != nil { - logger.Warnf("error fetching cluster name metadata: %s.", err) - } else if clusterName != "" { - // for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example: - // arn:aws:eks:us-east-2:627286350134:cluster/cluster-name - clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, clusterName) - - _, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN) - _, _ = result.metadata.Put("orchestrator.cluster.name", clusterName) - } + instanceID := instanceIdentity.InstanceIdentityDocument.InstanceID _, _ = result.metadata.Put("cloud.instance.id", instanceIdentity.InstanceIdentityDocument.InstanceID) _, _ = result.metadata.Put("cloud.machine.type", instanceIdentity.InstanceIdentityDocument.InstanceType) @@ -122,10 +119,90 @@ func fetchRawProviderMetadata( _, _ = result.metadata.Put("cloud.account.id", accountID) _, _ = result.metadata.Put("cloud.image.id", instanceIdentity.InstanceIdentityDocument.ImageID) + // AWS Region must be set to be able to get EC2 Tags + awsConfig.Region = awsRegion + tags := getTags(ctx, imdsClient, NewEC2Client(awsConfig), instanceID, logger) + + if tags[eksClusterNameTagKey] != "" { + // for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example: + // arn:aws:eks:us-east-2:627286350134:cluster/cluster-name + clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, tags[eksClusterNameTagKey]) + + _, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN) + _, _ = result.metadata.Put("orchestrator.cluster.name", tags[eksClusterNameTagKey]) + } + + if len(tags) == 0 { + return + } + + logger.Infof("Adding retrieved tags with key: %s", tagPrefix) + for k, v := range tags { + _, _ = result.metadata.Put(fmt.Sprintf("%s.%s", tagPrefix, k), v) + } } -func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, error) { - svc := NewEC2Client(awsConfig) +// getTags is a helper to extract EC2 tags. Internally it utilize multiple extraction methods. +func getTags(ctx context.Context, imdsClient IMDSClient, ec2Client EC2Client, instanceId string, logger *logp.Logger) map[string]string { + logger.Info("Extracting EC2 tags from IMDS endpoint") + tags, ok := getTagsFromIMDS(ctx, imdsClient, logger) + if ok { + return tags + } + + logger.Info("Tag extraction from IMDS failed, fallback to DescribeTags API to obtain EKS cluster name.") + clusterName, err := clusterNameFromDescribeTag(ctx, ec2Client, instanceId) + if err != nil { + logger.Warnf("error obtaining cluster name: %s.", err) + return tags + } + + if clusterName != "" { + tags[eksClusterNameTagKey] = clusterName + } + return tags +} + +// getTagsFromIMDS is a helper to extract EC2 tags using instance metadata service. +// Note that this call could get throttled and currently does not implement a retry mechanism. +// See - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html#instancedata-throttling +func getTagsFromIMDS(ctx context.Context, client IMDSClient, logger *logp.Logger) (tags map[string]string, ok bool) { + tags = make(map[string]string) + + metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: tagsCategory}) + if err != nil { + logger.Warnf("error from IMDS tags category request: %s", err) + return tags, false + } + + b, err := io.ReadAll(metadata.Content) + if err != nil { + logger.Warnf("error extracting tags category payload: %s", err) + return tags, false + } + + for _, tag := range strings.Split(string(b), "\n") { + tagPath := fmt.Sprintf("%s/%s", tagsCategory, tag) + metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: tagPath}) + if err != nil { + logger.Warnf("error from IMDS tag request: %s", err) + return tags, false + } + + b, err := io.ReadAll(metadata.Content) + if err != nil { + logger.Warnf("error extracting tag value payload: %s", err) + return tags, false + } + + tags[tag] = string(b) + } + + return tags, true +} + +// clusterNameFromDescribeTag is a helper to extract EKS cluster name using DescribeTag. +func clusterNameFromDescribeTag(ctx context.Context, ec2Client EC2Client, instanceID string) (string, error) { input := &ec2.DescribeTagsInput{ Filters: []types.Filter{ { @@ -135,15 +212,13 @@ func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, }, }, { - Name: awssdk.String("key"), - Values: []string{ - "eks:cluster-name", - }, + Name: awssdk.String("key"), + Values: []string{eksClusterNameTagKey}, }, }, } - tagsResult, err := svc.DescribeTags(context.TODO(), input) + tagsResult, err := ec2Client.DescribeTags(ctx, input) if err != nil { return "", fmt.Errorf("error fetching EC2 Tags: %w", err) } diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go index 76ddea084a78..92bf0c01d4d9 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go @@ -19,8 +19,11 @@ package add_cloud_metadata import ( "context" + "errors" "fmt" + "io" "os" + "strings" "testing" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -43,8 +46,17 @@ func init() { os.Setenv("AWS_EC2_METADATA_DISABLED", "true") } +type getInstanceIDFunc func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) +type getMetaFunc func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) +type getTagFunc func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) + type MockIMDSClient struct { - GetInstanceIdentityDocumentFunc func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) + GetInstanceIdentityDocumentFunc getInstanceIDFunc + GetMetadataFunc getMetaFunc +} + +func (m *MockIMDSClient) GetMetadata(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + return m.GetMetadataFunc(ctx, input, f...) } func (m *MockIMDSClient) GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { @@ -52,13 +64,12 @@ func (m *MockIMDSClient) GetInstanceIdentityDocument(ctx context.Context, params } type MockEC2Client struct { - DescribeTagsFunc func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) + DescribeTagsFunc getTagFunc } func (e *MockEC2Client) DescribeTags(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return e.DescribeTagsFunc(ctx, params, optFns...) } - func TestMain(m *testing.M) { logp.TestingSetup() code := m.Run() @@ -76,33 +87,64 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { imageIDDoc1 = "ami-abcd1234" instanceTypeDoc1 = "t2.medium" instanceIDDoc2 = "i-22222222" - clusterNameKey = "eks:cluster-name" + clusterNameKey = eksClusterNameTagKey clusterNameValue = "test" instanceIDDoc1 = "i-11111111" + customTagKey = "organization" + customTagValue = "orgName" ) + // generic getTagFunc implementation with IMDS disabled error to avoid IMDS response + var disabledIMDS getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + return nil, errors.New("IMDS disabled mock error") + } + + // set up a generic getTagFunc implementation with valid tags + var enabledIMDS getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + if input.Path == tagsCategory { + // tag category request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagKey)), + }, nil + } + + if strings.HasSuffix(input.Path, customTagKey) { + // customTagKey request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagValue)), + }, nil + } + + return nil, errors.New("invalid request") + } + + // generic getInstanceIDFunc implementation with known response values and no error + var genericInstanceIDResponse getInstanceIDFunc = func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { + return &imds.GetInstanceIdentityDocumentOutput{ + InstanceIdentityDocument: imds.InstanceIdentityDocument{ + AvailabilityZone: availabilityZoneDoc1, + Region: regionDoc1, + InstanceID: instanceIDDoc1, + InstanceType: instanceTypeDoc1, + AccountID: accountIDDoc1, + ImageID: imageIDDoc1, + }, + }, nil + } + var tests = []struct { testName string - mockGetInstanceIdentity func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) - mockEc2Tags func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) + mockGetInstanceIdentity getInstanceIDFunc + mockMetadata getMetaFunc + mockEc2Tags getTagFunc processorOverwrite bool previousEvent mapstr.M expectedEvent mapstr.M }{ { - testName: "valid instance identity document, no cluster tags", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "valid instance identity document, no cluster tags", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{}, @@ -124,19 +166,9 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { }, }, { - testName: "all fields from processor", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "all fields from processor", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{ @@ -168,22 +200,17 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, + "ec2": mapstr.M{ + "tag": mapstr.M{ + eksClusterNameTagKey: clusterNameValue, + }, + }, }, }, { - testName: "instanceId pre-informed, no overwrite", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "instanceId pre-informed, no overwrite", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{ @@ -212,25 +239,20 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, + "ec2": mapstr.M{ + "tag": mapstr.M{ + eksClusterNameTagKey: clusterNameValue, + }, + }, }, }, { // NOTE: In this case, add_cloud_metadata will overwrite cloud fields because // it won't detect cloud.provider as a cloud field. This is not the behavior we // expect and will find a better solution later in issue 11697. - testName: "only cloud.provider pre-informed, no overwrite", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "only cloud.provider pre-informed, no overwrite", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{ @@ -265,22 +287,17 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, + "ec2": mapstr.M{ + "tag": mapstr.M{ + eksClusterNameTagKey: clusterNameValue, + }, + }, }, }, { - testName: "instanceId pre-informed, overwrite", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "instanceId pre-informed, overwrite", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{}, @@ -306,19 +323,9 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { }, }, { - testName: "only cloud.provider pre-informed, overwrite", - mockGetInstanceIdentity: func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, - }, nil - }, + testName: "only cloud.provider pre-informed, overwrite", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: disabledIMDS, mockEc2Tags: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return &ec2.DescribeTagsOutput{ Tags: []types.TagDescription{}, @@ -342,6 +349,29 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { }, }, }, + { + testName: "if enabled, extract tags from IMDS endpoint", + mockGetInstanceIdentity: genericInstanceIDResponse, + mockMetadata: enabledIMDS, + mockEc2Tags: nil, // could be nil as IMDS response fulfills tag + expectedEvent: mapstr.M{ + "cloud": mapstr.M{ + "provider": "aws", + "account": mapstr.M{"id": accountIDDoc1}, + "instance": mapstr.M{"id": instanceIDDoc1}, + "machine": mapstr.M{"type": instanceTypeDoc1}, + "image": mapstr.M{"id": imageIDDoc1}, + "region": regionDoc1, + "availability_zone": availabilityZoneDoc1, + "service": mapstr.M{"name": "EC2"}, + }, + "ec2": mapstr.M{ + "tag": mapstr.M{ + customTagKey: customTagValue, + }, + }, + }, + }, } for _, tc := range tests { @@ -350,6 +380,7 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { NewIMDSClient = func(cfg awssdk.Config) IMDSClient { return &MockIMDSClient{ GetInstanceIdentityDocumentFunc: tc.mockGetInstanceIdentity, + GetMetadataFunc: tc.mockMetadata, } } defer func() { NewIMDSClient = func(cfg awssdk.Config) IMDSClient { return imds.NewFromConfig(cfg) } }() diff --git a/libbeat/processors/add_cloud_metadata/providers.go b/libbeat/processors/add_cloud_metadata/providers.go index a9978251cfd6..ea56a5e669b3 100644 --- a/libbeat/processors/add_cloud_metadata/providers.go +++ b/libbeat/processors/add_cloud_metadata/providers.go @@ -187,7 +187,7 @@ func (p *addCloudMetadata) fetchMetadata() *result { if result.err == nil && result.metadata != nil { return &result } else if result.err != nil { - p.logger.Errorf("add_cloud_metadata: received error %v", result.err) + p.logger.Errorf("add_cloud_metadata: received error for provider %s: %v", result.provider, result.err) } case <-ctx.Done(): p.logger.Debugf("add_cloud_metadata: timed-out waiting for all responses") From 48b582f7d1e0388569f64dc8038b4d1fca6817f4 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 29 Oct 2024 13:41:36 -0700 Subject: [PATCH 2/6] add dedicated tests for tag extractor Signed-off-by: Kavindu Dodanduwa --- .../provider_aws_ec2_test.go | 195 +++++++++++++----- 1 file changed, 141 insertions(+), 54 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go index 92bf0c01d4d9..101b8d4c25bc 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go @@ -70,68 +70,76 @@ type MockEC2Client struct { func (e *MockEC2Client) DescribeTags(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { return e.DescribeTagsFunc(ctx, params, optFns...) } -func TestMain(m *testing.M) { - logp.TestingSetup() - code := m.Run() - os.Exit(code) -} -func TestRetrieveAWSMetadataEC2(t *testing.T) { - var ( - // not the best way to use a response template - // but this should serve until we need to test - // documents containing very different values - accountIDDoc1 = "111111111111111" - regionDoc1 = "us-east-1" - availabilityZoneDoc1 = "us-east-1c" - imageIDDoc1 = "ami-abcd1234" - instanceTypeDoc1 = "t2.medium" - instanceIDDoc2 = "i-22222222" - clusterNameKey = eksClusterNameTagKey - clusterNameValue = "test" - instanceIDDoc1 = "i-11111111" - customTagKey = "organization" - customTagValue = "orgName" - ) +var ( + // not the best way to use a response template + // but this should serve until we need to test + // documents containing very different values + accountIDDoc1 = "111111111111111" + regionDoc1 = "us-east-1" + availabilityZoneDoc1 = "us-east-1c" + imageIDDoc1 = "ami-abcd1234" + instanceTypeDoc1 = "t2.medium" + instanceIDDoc2 = "i-22222222" + clusterNameKey = eksClusterNameTagKey + clusterNameValue = "test" + instanceIDDoc1 = "i-11111111" + customTagKey = "organization" + customTagValue = "orgName" +) - // generic getTagFunc implementation with IMDS disabled error to avoid IMDS response - var disabledIMDS getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { - return nil, errors.New("IMDS disabled mock error") - } +// generic getTagFunc implementation with IMDS disabled error to avoid IMDS response +var disabledIMDS getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + return nil, errors.New("IMDS disabled mock error") +} - // set up a generic getTagFunc implementation with valid tags - var enabledIMDS getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { - if input.Path == tagsCategory { - // tag category request - return &imds.GetMetadataOutput{ - Content: io.NopCloser(strings.NewReader(customTagKey)), - }, nil - } +// set up a generic getTagFunc implementation with valid tags +var genericImdsGet getMetaFunc = func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + tagKeys := fmt.Sprintf("%s\n%s", customTagKey, eksClusterNameTagKey) - if strings.HasSuffix(input.Path, customTagKey) { - // customTagKey request - return &imds.GetMetadataOutput{ - Content: io.NopCloser(strings.NewReader(customTagValue)), - }, nil - } + if input.Path == tagsCategory { + // tag category request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(tagKeys)), + }, nil + } - return nil, errors.New("invalid request") + // tag request + if strings.HasSuffix(input.Path, customTagKey) { + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagValue)), + }, nil } - // generic getInstanceIDFunc implementation with known response values and no error - var genericInstanceIDResponse getInstanceIDFunc = func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { - return &imds.GetInstanceIdentityDocumentOutput{ - InstanceIdentityDocument: imds.InstanceIdentityDocument{ - AvailabilityZone: availabilityZoneDoc1, - Region: regionDoc1, - InstanceID: instanceIDDoc1, - InstanceType: instanceTypeDoc1, - AccountID: accountIDDoc1, - ImageID: imageIDDoc1, - }, + if strings.HasSuffix(input.Path, eksClusterNameTagKey) { + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(clusterNameValue)), }, nil } + return nil, errors.New("invalid request") +} + +// generic getInstanceIDFunc implementation with known response values and no error +var genericInstanceIDResponse getInstanceIDFunc = func(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error) { + return &imds.GetInstanceIdentityDocumentOutput{ + InstanceIdentityDocument: imds.InstanceIdentityDocument{ + AvailabilityZone: availabilityZoneDoc1, + Region: regionDoc1, + InstanceID: instanceIDDoc1, + InstanceType: instanceTypeDoc1, + AccountID: accountIDDoc1, + ImageID: imageIDDoc1, + }, + }, nil +} + +func TestMain(m *testing.M) { + logp.TestingSetup() + code := m.Run() + os.Exit(code) +} +func TestRetrieveAWSMetadataEC2(t *testing.T) { var tests = []struct { testName string mockGetInstanceIdentity getInstanceIDFunc @@ -352,8 +360,24 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { { testName: "if enabled, extract tags from IMDS endpoint", mockGetInstanceIdentity: genericInstanceIDResponse, - mockMetadata: enabledIMDS, - mockEc2Tags: nil, // could be nil as IMDS response fulfills tag + mockMetadata: func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + if input.Path == tagsCategory { + // tag category request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagKey)), + }, nil + } + + if strings.HasSuffix(input.Path, customTagKey) { + // customTagKey request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagValue)), + }, nil + } + + return nil, errors.New("invalid request") + }, + mockEc2Tags: nil, // could be nil as IMDS response fulfills tag expectedEvent: mapstr.M{ "cloud": mapstr.M{ "provider": "aws", @@ -412,3 +436,66 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { }) } } + +func Test_getTags(t *testing.T) { + ctx := context.Background() + instanceId := "ami-abcd1234" + logger := logp.NewLogger("add_cloud_metadata test logger") + + tests := []struct { + name string + imdsClient IMDSClient + ec2Client EC2Client + want map[string]string + }{ + { + name: "tags extracted from IMDS if possible", + imdsClient: &MockIMDSClient{ + GetMetadataFunc: genericImdsGet, + }, + want: map[string]string{ + customTagKey: customTagValue, + eksClusterNameTagKey: clusterNameValue, + }, + }, + { + name: "tag extraction fallback to DescribeTag if IMDS fetch results in an error", + imdsClient: &MockIMDSClient{ + GetMetadataFunc: disabledIMDS, + }, + ec2Client: &MockEC2Client{ + DescribeTagsFunc: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { + return &ec2.DescribeTagsOutput{ + Tags: []types.TagDescription{ + { + Key: &clusterNameKey, + ResourceId: &instanceId, + ResourceType: "instance", + Value: &clusterNameValue, + }, + }, + }, nil + }}, + want: map[string]string{ + eksClusterNameTagKey: clusterNameValue, + }, + }, + { + name: "empty tags if all methods failed", + imdsClient: &MockIMDSClient{ + GetMetadataFunc: disabledIMDS, + }, + ec2Client: &MockEC2Client{ + DescribeTagsFunc: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { + return nil, errors.New("some error from DescribeTag") + }}, + want: map[string]string{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tags := getTags(ctx, tt.imdsClient, tt.ec2Client, instanceId, logger) + assert.Equal(t, tags, tt.want) + }) + } +} From 2023e0f2fee6ce4d7329e56a82f70c0f43ba8ce8 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 29 Oct 2024 14:19:29 -0700 Subject: [PATCH 3/6] expand test case and add documentation Signed-off-by: Kavindu Dodanduwa --- .../docs/add_cloud_metadata.asciidoc | 18 ++++++++++++ .../provider_aws_ec2_test.go | 29 +++++++------------ 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc b/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc index a80cd7a8be4e..df52b9937c48 100644 --- a/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc +++ b/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc @@ -83,6 +83,8 @@ examples for each of the supported providers. _AWS_ +Metadata given below are extracted from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html[instance identity document], + [source,json] ------------------------------------------------------------------------------- { @@ -98,6 +100,22 @@ _AWS_ } ------------------------------------------------------------------------------- +If the EC2 instance has IMDS enabled and if tags are allowed through IMDS endpoint, the processor will further append tags in metadata. +Please refer official documentation on https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html[IMDS endpoint] for further details. + +[source,json] +------------------------------------------------------------------------------- +{ + "ec2": { + "tag": { + "org" : "myOrg", + "owner": "userID" + } + } +} +------------------------------------------------------------------------------- + + _Digital Ocean_ [source,json] diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go index 101b8d4c25bc..fefd0455c7b9 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go @@ -360,24 +360,8 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { { testName: "if enabled, extract tags from IMDS endpoint", mockGetInstanceIdentity: genericInstanceIDResponse, - mockMetadata: func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { - if input.Path == tagsCategory { - // tag category request - return &imds.GetMetadataOutput{ - Content: io.NopCloser(strings.NewReader(customTagKey)), - }, nil - } - - if strings.HasSuffix(input.Path, customTagKey) { - // customTagKey request - return &imds.GetMetadataOutput{ - Content: io.NopCloser(strings.NewReader(customTagValue)), - }, nil - } - - return nil, errors.New("invalid request") - }, - mockEc2Tags: nil, // could be nil as IMDS response fulfills tag + mockMetadata: genericImdsGet, + mockEc2Tags: nil, // could be nil as IMDS response fulfills tag expectedEvent: mapstr.M{ "cloud": mapstr.M{ "provider": "aws", @@ -389,9 +373,16 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "availability_zone": availabilityZoneDoc1, "service": mapstr.M{"name": "EC2"}, }, + "orchestrator": mapstr.M{ + "cluster": mapstr.M{ + "name": clusterNameValue, + "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), + }, + }, "ec2": mapstr.M{ "tag": mapstr.M{ - customTagKey: customTagValue, + eksClusterNameTagKey: clusterNameValue, + customTagKey: customTagValue, }, }, }, From d7a41557446e5649d9714b1c43660e72217710df Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 29 Oct 2024 14:37:49 -0700 Subject: [PATCH 4/6] add changelog entry Signed-off-by: Kavindu Dodanduwa --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 05345fb5ec03..3cf6a2a31815 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -337,6 +337,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Libbeat* +- enrich events with EC2 tags in add_cloud_metadata processor {pull}41477[41477] *Heartbeat* From 7e53ca08538d46dce81a71b33a9d5333251907a8 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 30 Oct 2024 09:10:44 -0700 Subject: [PATCH 5/6] handle empty tags, add tests and close underlying body Signed-off-by: Kavindu Dodanduwa --- .../add_cloud_metadata/provider_aws_ec2.go | 48 ++++++++++++------- .../provider_aws_ec2_test.go | 27 +++++++++++ 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go index 735fccb235f3..1e98fae2c215 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go @@ -153,7 +153,7 @@ func getTags(ctx context.Context, imdsClient IMDSClient, ec2Client EC2Client, in logger.Info("Tag extraction from IMDS failed, fallback to DescribeTags API to obtain EKS cluster name.") clusterName, err := clusterNameFromDescribeTag(ctx, ec2Client, instanceId) if err != nil { - logger.Warnf("error obtaining cluster name: %s.", err) + logger.Warnf("error obtaining cluster name: %v.", err) return tags } @@ -169,38 +169,54 @@ func getTags(ctx context.Context, imdsClient IMDSClient, ec2Client EC2Client, in func getTagsFromIMDS(ctx context.Context, client IMDSClient, logger *logp.Logger) (tags map[string]string, ok bool) { tags = make(map[string]string) - metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: tagsCategory}) + b, err := getMetadataHelper(ctx, client, tagsCategory, logger) if err != nil { - logger.Warnf("error from IMDS tags category request: %s", err) - return tags, false - } - - b, err := io.ReadAll(metadata.Content) - if err != nil { - logger.Warnf("error extracting tags category payload: %s", err) + logger.Warnf("error obtaining tags category: %v", err) return tags, false } for _, tag := range strings.Split(string(b), "\n") { tagPath := fmt.Sprintf("%s/%s", tagsCategory, tag) - metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: tagPath}) + b, err := getMetadataHelper(ctx, client, tagPath, logger) if err != nil { - logger.Warnf("error from IMDS tag request: %s", err) + logger.Warnf("error extracting tag value of %s: %v", tag, err) return tags, false } - b, err := io.ReadAll(metadata.Content) - if err != nil { - logger.Warnf("error extracting tag value payload: %s", err) - return tags, false + tagValue := string(b) + if tagValue == "" { + logger.Infof("Ignoring tag key %s as value is empty", tag) + continue } - tags[tag] = string(b) + tags[tag] = tagValue } return tags, true } +// getMetadataHelper performs the IMDS call for the given path and returns the response content after closing the underlying content reader. +func getMetadataHelper(ctx context.Context, client IMDSClient, path string, logger *logp.Logger) (content []byte, err error) { + metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: path}) + if err != nil { + return nil, fmt.Errorf("error from IMDS metadata request: %w", err) + } + + defer func(Content io.ReadCloser) { + err := Content.Close() + if err != nil { + logger.Warnf("error closing IMDS metadata response body: %v", err) + } + }(metadata.Content) + + content, err = io.ReadAll(metadata.Content) + if err != nil { + return nil, fmt.Errorf("error extracting metadata from the IMDS response: %w", err) + } + + return content, nil +} + // clusterNameFromDescribeTag is a helper to extract EKS cluster name using DescribeTag. func clusterNameFromDescribeTag(ctx context.Context, ec2Client EC2Client, instanceID string) (string, error) { input := &ec2.DescribeTagsInput{ diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go index fefd0455c7b9..d4e8c14a4f6d 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go @@ -482,6 +482,33 @@ func Test_getTags(t *testing.T) { }}, want: map[string]string{}, }, + { + name: "Empty tags values are ignored", + imdsClient: &MockIMDSClient{ + GetMetadataFunc: func(ctx context.Context, input *imds.GetMetadataInput, f ...func(*imds.Options)) (*imds.GetMetadataOutput, error) { + if input.Path == tagsCategory { + // tag category request + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader(customTagKey)), + }, nil + } + + // tag request + if strings.HasSuffix(input.Path, customTagKey) { + return &imds.GetMetadataOutput{ + Content: io.NopCloser(strings.NewReader("")), + }, nil + } + + return nil, errors.New("invalid request") + }, + }, + ec2Client: &MockEC2Client{ + DescribeTagsFunc: func(ctx context.Context, params *ec2.DescribeTagsInput, optFns ...func(*ec2.Options)) (*ec2.DescribeTagsOutput, error) { + return nil, errors.New("some error from DescribeTag") + }}, + want: map[string]string{}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 091c5ca031430e1b492883f5691aececfe9aa2ea Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 1 Nov 2024 14:49:31 -0700 Subject: [PATCH 6/6] review change - use aws.tags as tag prefix Signed-off-by: Kavindu Dodanduwa --- .../docs/add_cloud_metadata.asciidoc | 4 ++-- .../add_cloud_metadata/provider_aws_ec2.go | 2 +- .../add_cloud_metadata/provider_aws_ec2_test.go | 16 ++++++++-------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc b/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc index df52b9937c48..1e05e1d2c24a 100644 --- a/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc +++ b/libbeat/processors/add_cloud_metadata/docs/add_cloud_metadata.asciidoc @@ -106,8 +106,8 @@ Please refer official documentation on https://docs.aws.amazon.com/AWSEC2/latest [source,json] ------------------------------------------------------------------------------- { - "ec2": { - "tag": { + "aws": { + "tags": { "org" : "myOrg", "owner": "userID" } diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go index 1e98fae2c215..ae7dfbf9865d 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go @@ -41,7 +41,7 @@ import ( const ( eksClusterNameTagKey = "eks:cluster-name" tagsCategory = "tags/instance" - tagPrefix = "ec2.tag" + tagPrefix = "aws.tags" ) type IMDSClient interface { diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go index d4e8c14a4f6d..b36d566b64d1 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2_test.go @@ -208,8 +208,8 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, - "ec2": mapstr.M{ - "tag": mapstr.M{ + "aws": mapstr.M{ + "tags": mapstr.M{ eksClusterNameTagKey: clusterNameValue, }, }, @@ -247,8 +247,8 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, - "ec2": mapstr.M{ - "tag": mapstr.M{ + "aws": mapstr.M{ + "tags": mapstr.M{ eksClusterNameTagKey: clusterNameValue, }, }, @@ -295,8 +295,8 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, - "ec2": mapstr.M{ - "tag": mapstr.M{ + "aws": mapstr.M{ + "tags": mapstr.M{ eksClusterNameTagKey: clusterNameValue, }, }, @@ -379,8 +379,8 @@ func TestRetrieveAWSMetadataEC2(t *testing.T) { "id": fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%s", regionDoc1, accountIDDoc1, clusterNameValue), }, }, - "ec2": mapstr.M{ - "tag": mapstr.M{ + "aws": mapstr.M{ + "tags": mapstr.M{ eksClusterNameTagKey: clusterNameValue, customTagKey: customTagValue, },