Skip to content

Support opening up container and service ports based on agent config YAML #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
package adapters

import (
"fmt"
"net"
"sort"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/parser"
receiverParser "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/parser/receiver"
)

type ComponentType int

const (
ComponentTypeReceiver ComponentType = iota
ComponentTypeExporter
)

func (c ComponentType) String() string {
Expand Down Expand Up @@ -58,3 +63,82 @@ func ConfigToMetricsPort(logger logr.Logger, config map[interface{}]interface{})

return int32(i64), nil
}

func GetServicePortsFromCWAgentOtelConfig(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
return nil, err
}

return ports, nil
}

// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// examples:
// ```yaml
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())]
if !ok {
return nil, fmt.Errorf("no %ss available as part of the configuration", cType)
}

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String())
}

compEnabled := getEnabledComponents(config, cType)

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType)
}

ports := []corev1.ServicePort{}
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !compEnabled[key] {
continue
}
extractedComponent, ok := val.(map[interface{}]interface{})
if !ok {
logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key)
extractedComponent = map[interface{}]interface{}{}
}

cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
cmptParser, err = receiverParser.For(logger, cmptName, extractedComponent)
if err != nil {
logger.V(2).Info("no parser found for '%s'", cmptName)
continue
}

exprtPorts, err := cmptParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})

return ports, nil
}
43 changes: 42 additions & 1 deletion internal/manifests/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package collector

import (
"errors"
"fmt"
"sort"

Expand All @@ -27,7 +28,7 @@ func Container(cfg config.Config, logger logr.Logger, agent v1alpha1.AmazonCloud
image = cfg.CollectorImage()
}

ports := getContainerPorts(logger, agent.Spec.Config, agent.Spec.Ports)
ports := getContainerPorts(logger, agent.Spec.Config, agent.Spec.OtelConfig, agent.Spec.Ports)

var volumeMounts []corev1.VolumeMount
argsMap := agent.Spec.Args
Expand Down Expand Up @@ -79,6 +80,19 @@ func Container(cfg config.Config, logger logr.Logger, agent v1alpha1.AmazonCloud
logger.Error(err, "error parsing config")
}

var livenessProbe *corev1.Probe
if configFromString, err := adapters.ConfigFromString(agent.Spec.OtelConfig); err == nil {
if probe, err := getLivenessProbe(configFromString, agent.Spec.LivenessProbe); err == nil {
livenessProbe = probe
} else if errors.Is(err, adapters.ErrNoServiceExtensions) {
logger.Info("extensions not configured, skipping liveness probe creation")
} else if errors.Is(err, adapters.ErrNoServiceExtensionHealthCheck) {
logger.Info("healthcheck extension not configured, skipping liveness probe creation")
} else {
logger.Error(err, "cannot create liveness probe.")
}
}

return corev1.Container{
Name: naming.Container(),
Image: image,
Expand All @@ -91,6 +105,7 @@ func Container(cfg config.Config, logger logr.Logger, agent v1alpha1.AmazonCloud
Resources: agent.Spec.Resources,
Ports: portMapToContainerPortList(ports),
SecurityContext: agent.Spec.SecurityContext,
LivenessProbe: livenessProbe,
Lifecycle: agent.Spec.Lifecycle,
}
}
Expand Down Expand Up @@ -121,3 +136,29 @@ func portMapToContainerPortList(portMap map[string]corev1.ContainerPort) []corev
})
return ports
}

func getLivenessProbe(config map[interface{}]interface{}, probeConfig *v1alpha1.Probe) (*corev1.Probe, error) {
probe, err := adapters.ConfigToContainerProbe(config)
if err != nil {
return nil, err
}
if probeConfig != nil {
if probeConfig.InitialDelaySeconds != nil {
probe.InitialDelaySeconds = *probeConfig.InitialDelaySeconds
}
if probeConfig.PeriodSeconds != nil {
probe.PeriodSeconds = *probeConfig.PeriodSeconds
}
if probeConfig.FailureThreshold != nil {
probe.FailureThreshold = *probeConfig.FailureThreshold
}
if probeConfig.SuccessThreshold != nil {
probe.SuccessThreshold = *probeConfig.SuccessThreshold
}
if probeConfig.TimeoutSeconds != nil {
probe.TimeoutSeconds = *probeConfig.TimeoutSeconds
}
probe.TerminationGracePeriodSeconds = probeConfig.TerminationGracePeriodSeconds
}
return probe, nil
}
187 changes: 187 additions & 0 deletions internal/manifests/collector/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,33 @@ package collector
import (
"testing"

corev1 "k8s.io/api/core/v1"

"github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/config"

"github.com/stretchr/testify/assert"
)

var metricContainerPort = corev1.ContainerPort{
Name: "metrics",
ContainerPort: 8888,
Protocol: corev1.ProtocolTCP,
}

var emfContainerPort = []corev1.ContainerPort{
{
Name: "emf-tcp",
ContainerPort: 25888,
Protocol: corev1.ProtocolTCP,
},
{
Name: "emf-udp",
ContainerPort: 25888,
Protocol: corev1.ProtocolUDP,
},
}

func TestGetVolumeMounts(t *testing.T) {
volumeMount := getVolumeMounts("windows")
assert.Equal(t, volumeMount.MountPath, "C:\\Program Files\\Amazon\\AmazonCloudWatchAgent\\cwagentconfig")
Expand All @@ -19,3 +43,166 @@ func TestGetVolumeMounts(t *testing.T) {
volumeMount = getVolumeMounts("")
assert.Equal(t, volumeMount.MountPath, "/etc/cwagentconfig")
}

func TestContainerPorts(t *testing.T) {
var sampleJSONConfig = `{
"logs": {
"metrics_collected": {
"emf": {}
}
}
}`

var goodOtelConfig = `receivers:
examplereceiver:
endpoint: "0.0.0.0:12345"
exporters:
debug:
service:
pipelines:
metrics:
receivers: [examplereceiver]
exporters: [debug]`

tests := []struct {
description string
specConfig string
specPorts []corev1.ServicePort
expectedPorts []corev1.ContainerPort
}{
{
description: "bad otel spec config",
specConfig: "🦄",
specPorts: nil,
expectedPorts: emfContainerPort,
},
{
description: "couldn't build ports from spec config",
specConfig: "",
specPorts: []corev1.ServicePort{
{
Name: "metrics",
Port: 8888,
Protocol: corev1.ProtocolTCP,
},
},
expectedPorts: append(emfContainerPort, metricContainerPort),
},
{
description: "ports in spec Config",
specConfig: goodOtelConfig,
specPorts: nil,
expectedPorts: append(emfContainerPort, corev1.ContainerPort{
Name: "examplereceiver",
ContainerPort: 12345,
}),
},
}

for _, testCase := range tests {
t.Run(testCase.description, func(t *testing.T) {
// prepare
otelcol := v1alpha1.AmazonCloudWatchAgent{
Spec: v1alpha1.AmazonCloudWatchAgentSpec{
Config: sampleJSONConfig,
OtelConfig: testCase.specConfig,
Ports: testCase.specPorts,
},
}

cfg := config.New(config.WithCollectorImage("default-image"))

// test
c := Container(cfg, logger, otelcol, true)
// verify
assert.ElementsMatch(t, testCase.expectedPorts, c.Ports, testCase.description)
})
}
}

func TestContainerProbe(t *testing.T) {
// prepare
initialDelaySeconds := int32(10)
timeoutSeconds := int32(11)
periodSeconds := int32(12)
successThreshold := int32(13)
failureThreshold := int32(14)
terminationGracePeriodSeconds := int64(15)
otelcol := v1alpha1.AmazonCloudWatchAgent{
Spec: v1alpha1.AmazonCloudWatchAgentSpec{
OtelConfig: `extensions:
health_check:
service:
extensions: [health_check]`,
LivenessProbe: &v1alpha1.Probe{
InitialDelaySeconds: &initialDelaySeconds,
TimeoutSeconds: &timeoutSeconds,
PeriodSeconds: &periodSeconds,
SuccessThreshold: &successThreshold,
FailureThreshold: &failureThreshold,
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
},
},
}
cfg := config.New()

// test
c := Container(cfg, logger, otelcol, true)

// verify
assert.Equal(t, "/", c.LivenessProbe.HTTPGet.Path)
assert.Equal(t, int32(13133), c.LivenessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "", c.LivenessProbe.HTTPGet.Host)

assert.Equal(t, initialDelaySeconds, c.LivenessProbe.InitialDelaySeconds)
assert.Equal(t, timeoutSeconds, c.LivenessProbe.TimeoutSeconds)
assert.Equal(t, periodSeconds, c.LivenessProbe.PeriodSeconds)
assert.Equal(t, successThreshold, c.LivenessProbe.SuccessThreshold)
assert.Equal(t, failureThreshold, c.LivenessProbe.FailureThreshold)
assert.Equal(t, terminationGracePeriodSeconds, *c.LivenessProbe.TerminationGracePeriodSeconds)
}

func TestContainerProbeEmptyConfig(t *testing.T) {
// prepare

otelcol := v1alpha1.AmazonCloudWatchAgent{
Spec: v1alpha1.AmazonCloudWatchAgentSpec{
OtelConfig: `extensions:
health_check:
service:
extensions: [health_check]`,
LivenessProbe: &v1alpha1.Probe{},
},
}
cfg := config.New()

// test
c := Container(cfg, logger, otelcol, true)

// verify
assert.Equal(t, "/", c.LivenessProbe.HTTPGet.Path)
assert.Equal(t, int32(13133), c.LivenessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "", c.LivenessProbe.HTTPGet.Host)
}

func TestContainerProbeNoConfig(t *testing.T) {
// prepare

otelcol := v1alpha1.AmazonCloudWatchAgent{
Spec: v1alpha1.AmazonCloudWatchAgentSpec{
OtelConfig: `extensions:
health_check:
service:
extensions: [health_check]`,
},
}
cfg := config.New()

// test
c := Container(cfg, logger, otelcol, true)

// verify
assert.Equal(t, "/", c.LivenessProbe.HTTPGet.Path)
assert.Equal(t, int32(13133), c.LivenessProbe.HTTPGet.Port.IntVal)
assert.Equal(t, "", c.LivenessProbe.HTTPGet.Host)
}
Loading