Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (

"go.elastic.co/apm/v2"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/go-ucfg"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
Expand Down Expand Up @@ -132,7 +133,7 @@ func New(
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)

runtime, err := runtime.NewManager(
log,
Expand Down
21 changes: 21 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"

"go.opentelemetry.io/collector/component/componentstatus"

Expand Down Expand Up @@ -1749,6 +1750,7 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) {
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
var otelComponents, runtimeComponents []component.Component
for _, comp := range model.Components {
c.maybeOverrideRuntimeForComponent(&comp)
switch comp.RuntimeManager {
case component.OtelRuntimeManager:
otelComponents = append(otelComponents, comp)
Expand All @@ -1770,6 +1772,25 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime
return
}

// maybeOverrideRuntimeForComponent sets the correct runtime for the given component.
// Normally, we use the runtime set in the component itself via the configuration, but
// we may also fall back to the process runtime if the otel runtime is unsupported for
// some reason. One example is the output using unsupported config options.
func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component) {
if comp.RuntimeManager == component.ProcessRuntimeManager {
// do nothing, the process runtime can handle any component
return
}
if comp.RuntimeManager == component.OtelRuntimeManager {
// check if the component is actually supported
err := translate.VerifyComponentIsOtelSupported(comp)
if err != nil {
c.logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err)
comp.RuntimeManager = component.ProcessRuntimeManager
}
}
}

func (c *Coordinator) isFleetServer() bool {
for _, s := range c.state.Components {
if s.Component.InputType == fleetServer {
Expand Down
112 changes: 81 additions & 31 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t
secretMarkerFunc: testSecretMarkerFunc,
}

// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
t.Run("mixed policy", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
Expand All @@ -1064,38 +1065,87 @@ service:
- nop
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// Make sure the runtime manager received the expected component update.
// An assert.Equal on the full component model doesn't play nice with
// the embedded proto structs, so instead we verify the important fields
// manually (sorry).
assert.True(t, updated, "Runtime manager should be updated after a policy change")
require.Equal(t, 1, len(components), "Test policy should generate one component")
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
require.NotNil(t, otelConfig, "OTel manager should have config")
// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// Make sure the runtime manager received the expected component update.
// An assert.Equal on the full component model doesn't play nice with
// the embedded proto structs, so instead we verify the important fields
// manually (sorry).
assert.True(t, updated, "Runtime manager should be updated after a policy change")
require.Equal(t, 1, len(components), "Test policy should generate one component")
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
require.NotNil(t, otelConfig, "OTel manager should have config")

runtimeComponent := components[0]
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
require.Equal(t, 2, len(runtimeComponent.Units))

units := runtimeComponent.Units
// Verify the input unit
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
assert.Equal(t, client.UnitTypeInput, units[0].Type)
assert.Equal(t, "test-other-input", units[0].Config.Id)
assert.Equal(t, "system/metrics", units[0].Config.Type)

// Verify the output unit
assert.Equal(t, "system/metrics-default", units[1].ID)
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
assert.Equal(t, "elasticsearch", units[1].Config.Type)
})

runtimeComponent := components[0]
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
require.Equal(t, 2, len(runtimeComponent.Units))
t.Run("unsupported otel output option", func(t *testing.T) {
// Create a policy with one input and one output (no otel configuration)
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
hosts:
- localhost:9200
indices: [] # not supported by the elasticsearch exporter
inputs:
- id: test-input
type: filestream
use_output: default
_runtime_experimental: otel
- id: test-other-input
type: system/metrics
use_output: default
receivers:
nop:
exporters:
nop:
service:
pipelines:
traces:
receivers:
- nop
exporters:
- nop
`)

units := runtimeComponent.Units
// Verify the input unit
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
assert.Equal(t, client.UnitTypeInput, units[0].Type)
assert.Equal(t, "test-other-input", units[0].Config.Id)
assert.Equal(t, "system/metrics", units[0].Config.Type)
// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// Make sure the runtime manager received the expected component update.
// An assert.Equal on the full component model doesn't play nice with
// the embedded proto structs, so instead we verify the important fields
// manually (sorry).
assert.True(t, updated, "Runtime manager should be updated after a policy change")
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
require.NotNil(t, otelConfig, "OTel manager should have config")

assert.Len(t, components, 2, "both components should be assigned to the runtime manager")
})

// Verify the output unit
assert.Equal(t, "system/metrics-default", units[1].ID)
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
assert.Equal(t, "elasticsearch", units[1].Config.Type)
}

func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package monitoring
package component

import (
"crypto/sha256"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package monitoring
package component

import (
"context"
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/application/monitoring/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/gorilla/mux"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/utils"
Expand Down Expand Up @@ -120,7 +122,7 @@ func isProcessRedirectable(componentID string) bool {
}

func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
endpoint := componentmonitoring.PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
if metricsErr != nil {
return metricsErr
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/agent/application/monitoring/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.elastic.co/apm/module/apmgorilla/v2"
"go.elastic.co/apm/v2"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/elastic-agent-libs/api"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
Expand Down Expand Up @@ -74,7 +76,7 @@ func NewServer(

srvCfg := api.DefaultConfig()
srvCfg.Enabled = cfg.Enabled
srvCfg.Host = AgentMonitoringEndpoint(cfg)
srvCfg.Host = componentmonitoring.AgentMonitoringEndpoint(cfg)
srvCfg.Port = cfg.HTTP.Port
log.Infof("creating monitoring API with cfg %#v", srvCfg)
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/service"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
Expand Down Expand Up @@ -412,7 +413,7 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
}
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
return monitor.MonitoringConfig, nil
}

Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/otel/manager/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -208,7 +209,7 @@ func GetBeatInputMetricsDiagnostics(ctx context.Context, componentID string) (*p
}

func GetBeatMetricsPayload(ctx context.Context, componentID string, path string) ([]byte, error) {
endpoint := monitoring.PrefixedEndpoint(monitoring.BeatsMonitoringEndpoint(componentID))
endpoint := componentmonitoring.PrefixedEndpoint(componentmonitoring.BeatsMonitoringEndpoint(componentID))
metricBytes, statusCode, err := monitoring.GetProcessMetrics(ctx, endpoint, path)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/otel/manager/diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"testing"
"time"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/pkg/utils"

Expand Down Expand Up @@ -150,7 +150,7 @@ func TestBeatMetrics(t *testing.T) {
expectedMetricData, err := json.MarshalIndent(map[string]any{"test": "test"}, "", " ")
require.NoError(t, err)

fileName := strings.TrimPrefix(monitoring.BeatsMonitoringEndpoint(compID), fmt.Sprintf("%s://", utils.SocketScheme))
fileName := strings.TrimPrefix(componentmonitoring.BeatsMonitoringEndpoint(compID), fmt.Sprintf("%s://", utils.SocketScheme))
err = os.MkdirAll(filepath.Dir(fileName), 0o755)
require.NoError(t, err)

Expand Down
43 changes: 31 additions & 12 deletions internal/pkg/otel/translate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
package translate

import (
"errors"
"fmt"
"path/filepath"
"slices"
"strings"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"

koanfmaps "github.com/knadh/koanf/maps"

"github.com/elastic/elastic-agent-libs/logp"
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"

otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"
Expand Down Expand Up @@ -59,7 +59,7 @@ func GetOtelConfig(
beatMonitoringConfigGetter BeatMonitoringConfigGetter,
logger *logp.Logger,
) (*confmap.Conf, error) {
components := getSupportedComponents(model)
components := getSupportedComponents(logger, model)
if len(components) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -92,19 +92,38 @@ func GetOtelConfig(
return otelConfig, nil
}

// IsComponentOtelSupported checks if the given component can be run in an Otel Collector.
func IsComponentOtelSupported(comp *component.Component) bool {
return slices.Contains(OtelSupportedOutputTypes, comp.OutputType) &&
slices.Contains(OtelSupportedInputTypes, comp.InputType)
// VerifyComponentIsOtelSupported verifies that the given component can be run in an Otel Collector. It returns an error
// indicating what the problem is, if it can't.
func VerifyComponentIsOtelSupported(comp *component.Component) error {
if !slices.Contains(OtelSupportedOutputTypes, comp.OutputType) {
return fmt.Errorf("unsupported output type: %s", comp.OutputType)
}

if !slices.Contains(OtelSupportedInputTypes, comp.InputType) {
return fmt.Errorf("unsupported input type: %s", comp.InputType)
}

// check if the actual configuration is supported. We need to actually generate the config and look for
// the right kind of error
_, compErr := getCollectorConfigForComponent(comp, &info.AgentInfo{}, func(unitID, binary string) map[string]any {
return nil
}, logp.NewNopLogger())
if errors.Is(compErr, errors.ErrUnsupported) {
return fmt.Errorf("unsupported configuration for %s: %w", comp.ID, compErr)
}

return nil
}

// getSupportedComponents returns components from the given model that can be run in an Otel Collector.
func getSupportedComponents(model *component.Model) []*component.Component {
func getSupportedComponents(logger *logp.Logger, model *component.Model) []*component.Component {
var supportedComponents []*component.Component

for _, comp := range model.Components {
if IsComponentOtelSupported(&comp) {
if err := VerifyComponentIsOtelSupported(&comp); err == nil {
supportedComponents = append(supportedComponents, &comp)
} else {
logger.Errorf("unsupported component %s submitted to otel manager, skipping: %v", comp.ID, err)
}
}

Expand Down Expand Up @@ -271,7 +290,7 @@ func getReceiversConfigForComponent(
// agent self-monitoring is disabled
monitoringConfig := beatMonitoringConfigGetter(comp.ID, beatName)
if monitoringConfig == nil {
endpoint := monitoring.BeatsMonitoringEndpoint(comp.ID)
endpoint := componentmonitoring.BeatsMonitoringEndpoint(comp.ID)
monitoringConfig = map[string]any{
"http": map[string]any{
"enabled": true,
Expand Down
Loading
Loading