Skip to content

Commit 2f0ba69

Browse files
authored
Fall back to process runtime if otel runtime is unsupported (#10087)
* Move component monitoring to its own package Agent monitoring has two separate functions - implementing the control plane monitoring server and self-monitoring for components. Having both in the same packages caused a dependency cycle involving the otel translation package. Resolve this by putting component monitoring in a subpackage * Fall back to process runtime if otel runtime is unsupported * Fix integration test * Normalize import names * Add logstash unit test * Use indices instead of allow_older_version * Add log line for skipped components * Change argument order * Fix linter warning
1 parent aae77c4 commit 2f0ba69

File tree

14 files changed

+368
-54
lines changed

14 files changed

+368
-54
lines changed

internal/pkg/agent/application/application.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import (
1111

1212
"go.elastic.co/apm/v2"
1313

14+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
15+
1416
"github.com/elastic/go-ucfg"
1517

1618
"github.com/elastic/elastic-agent-libs/logp"
1719

1820
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1921
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
2022
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
21-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
@@ -132,7 +133,7 @@ func New(
132133
if err != nil {
133134
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
134135
}
135-
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
136+
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
136137

137138
runtime, err := runtime.NewManager(
138139
log,

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
18+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
1819

1920
"go.opentelemetry.io/collector/component/componentstatus"
2021

@@ -1787,6 +1788,7 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) {
17871788
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
17881789
var otelComponents, runtimeComponents []component.Component
17891790
for _, comp := range model.Components {
1791+
c.maybeOverrideRuntimeForComponent(&comp)
17901792
switch comp.RuntimeManager {
17911793
case component.OtelRuntimeManager:
17921794
otelComponents = append(otelComponents, comp)
@@ -1808,6 +1810,25 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime
18081810
return
18091811
}
18101812

1813+
// maybeOverrideRuntimeForComponent sets the correct runtime for the given component.
1814+
// Normally, we use the runtime set in the component itself via the configuration, but
1815+
// we may also fall back to the process runtime if the otel runtime is unsupported for
1816+
// some reason. One example is the output using unsupported config options.
1817+
func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component) {
1818+
if comp.RuntimeManager == component.ProcessRuntimeManager {
1819+
// do nothing, the process runtime can handle any component
1820+
return
1821+
}
1822+
if comp.RuntimeManager == component.OtelRuntimeManager {
1823+
// check if the component is actually supported
1824+
err := translate.VerifyComponentIsOtelSupported(comp)
1825+
if err != nil {
1826+
c.logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err)
1827+
comp.RuntimeManager = component.ProcessRuntimeManager
1828+
}
1829+
}
1830+
}
1831+
18111832
func (c *Coordinator) isFleetServer() bool {
18121833
for _, s := range c.state.Components {
18131834
if s.Component.InputType == fleetServer {

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 81 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,8 +1036,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t
10361036
secretMarkerFunc: testSecretMarkerFunc,
10371037
}
10381038

1039-
// Create a policy with one input and one output (no otel configuration)
1040-
cfg := config.MustNewConfigFrom(`
1039+
t.Run("mixed policy", func(t *testing.T) {
1040+
// Create a policy with one input and one output (no otel configuration)
1041+
cfg := config.MustNewConfigFrom(`
10411042
outputs:
10421043
default:
10431044
type: elasticsearch
@@ -1064,38 +1065,87 @@ service:
10641065
- nop
10651066
`)
10661067

1067-
// Send the policy change and make sure it was acknowledged.
1068-
cfgChange := &configChange{cfg: cfg}
1069-
configChan <- cfgChange
1070-
coord.runLoopIteration(ctx)
1071-
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1072-
1073-
// Make sure the runtime manager received the expected component update.
1074-
// An assert.Equal on the full component model doesn't play nice with
1075-
// the embedded proto structs, so instead we verify the important fields
1076-
// manually (sorry).
1077-
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1078-
require.Equal(t, 1, len(components), "Test policy should generate one component")
1079-
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1080-
require.NotNil(t, otelConfig, "OTel manager should have config")
1068+
// Send the policy change and make sure it was acknowledged.
1069+
cfgChange := &configChange{cfg: cfg}
1070+
configChan <- cfgChange
1071+
coord.runLoopIteration(ctx)
1072+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1073+
1074+
// Make sure the runtime manager received the expected component update.
1075+
// An assert.Equal on the full component model doesn't play nice with
1076+
// the embedded proto structs, so instead we verify the important fields
1077+
// manually (sorry).
1078+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1079+
require.Equal(t, 1, len(components), "Test policy should generate one component")
1080+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1081+
require.NotNil(t, otelConfig, "OTel manager should have config")
1082+
1083+
runtimeComponent := components[0]
1084+
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1085+
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1086+
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1087+
require.Equal(t, 2, len(runtimeComponent.Units))
1088+
1089+
units := runtimeComponent.Units
1090+
// Verify the input unit
1091+
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1092+
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1093+
assert.Equal(t, "test-other-input", units[0].Config.Id)
1094+
assert.Equal(t, "system/metrics", units[0].Config.Type)
1095+
1096+
// Verify the output unit
1097+
assert.Equal(t, "system/metrics-default", units[1].ID)
1098+
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1099+
assert.Equal(t, "elasticsearch", units[1].Config.Type)
1100+
})
10811101

1082-
runtimeComponent := components[0]
1083-
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1084-
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1085-
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1086-
require.Equal(t, 2, len(runtimeComponent.Units))
1102+
t.Run("unsupported otel output option", func(t *testing.T) {
1103+
// Create a policy with one input and one output (no otel configuration)
1104+
cfg := config.MustNewConfigFrom(`
1105+
outputs:
1106+
default:
1107+
type: elasticsearch
1108+
hosts:
1109+
- localhost:9200
1110+
indices: [] # not supported by the elasticsearch exporter
1111+
inputs:
1112+
- id: test-input
1113+
type: filestream
1114+
use_output: default
1115+
_runtime_experimental: otel
1116+
- id: test-other-input
1117+
type: system/metrics
1118+
use_output: default
1119+
receivers:
1120+
nop:
1121+
exporters:
1122+
nop:
1123+
service:
1124+
pipelines:
1125+
traces:
1126+
receivers:
1127+
- nop
1128+
exporters:
1129+
- nop
1130+
`)
10871131

1088-
units := runtimeComponent.Units
1089-
// Verify the input unit
1090-
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1091-
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1092-
assert.Equal(t, "test-other-input", units[0].Config.Id)
1093-
assert.Equal(t, "system/metrics", units[0].Config.Type)
1132+
// Send the policy change and make sure it was acknowledged.
1133+
cfgChange := &configChange{cfg: cfg}
1134+
configChan <- cfgChange
1135+
coord.runLoopIteration(ctx)
1136+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1137+
1138+
// Make sure the runtime manager received the expected component update.
1139+
// An assert.Equal on the full component model doesn't play nice with
1140+
// the embedded proto structs, so instead we verify the important fields
1141+
// manually (sorry).
1142+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1143+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1144+
require.NotNil(t, otelConfig, "OTel manager should have config")
1145+
1146+
assert.Len(t, components, 2, "both components should be assigned to the runtime manager")
1147+
})
10941148

1095-
// Verify the output unit
1096-
assert.Equal(t, "system/metrics-default", units[1].ID)
1097-
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1098-
assert.Equal(t, "elasticsearch", units[1].Config.Type)
10991149
}
11001150

11011151
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
File renamed without changes.

internal/pkg/agent/application/monitoring/v1_monitor.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"crypto/sha256"

internal/pkg/agent/application/monitoring/v1_monitor_test.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"context"

internal/pkg/agent/application/monitoring/process.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616

1717
"github.com/gorilla/mux"
1818

19+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
20+
1921
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2022
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2123
"github.com/elastic/elastic-agent/pkg/utils"
@@ -120,7 +122,7 @@ func isProcessRedirectable(componentID string) bool {
120122
}
121123

122124
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
123-
endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
125+
endpoint := componentmonitoring.PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
124126
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
125127
if metricsErr != nil {
126128
return metricsErr

internal/pkg/agent/application/monitoring/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"go.elastic.co/apm/module/apmgorilla/v2"
1919
"go.elastic.co/apm/v2"
2020

21+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
22+
2123
"github.com/elastic/elastic-agent-libs/api"
2224
"github.com/elastic/elastic-agent-libs/monitoring"
2325
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
@@ -74,7 +76,7 @@ func NewServer(
7476

7577
srvCfg := api.DefaultConfig()
7678
srvCfg.Enabled = cfg.Enabled
77-
srvCfg.Host = AgentMonitoringEndpoint(cfg)
79+
srvCfg.Host = componentmonitoring.AgentMonitoringEndpoint(cfg)
7880
srvCfg.Port = cfg.HTTP.Port
7981
log.Infof("creating monitoring API with cfg %#v", srvCfg)
8082
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {

internal/pkg/agent/cmd/inspect.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
"github.com/spf13/cobra"
1515
"gopkg.in/yaml.v2"
1616

17+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
18+
1719
"github.com/elastic/elastic-agent-libs/logp"
1820
"github.com/elastic/elastic-agent-libs/service"
1921

2022
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
21-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
@@ -412,7 +413,7 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
412413
}
413414
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
414415
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
415-
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
416+
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
416417
return monitor.MonitoringConfig, nil
417418
}
418419

internal/pkg/otel/manager/diagnostics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121

2222
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
23+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
2324

2425
"google.golang.org/protobuf/types/known/timestamppb"
2526

@@ -208,7 +209,7 @@ func GetBeatInputMetricsDiagnostics(ctx context.Context, componentID string) (*p
208209
}
209210

210211
func GetBeatMetricsPayload(ctx context.Context, componentID string, path string) ([]byte, error) {
211-
endpoint := monitoring.PrefixedEndpoint(monitoring.BeatsMonitoringEndpoint(componentID))
212+
endpoint := componentmonitoring.PrefixedEndpoint(componentmonitoring.BeatsMonitoringEndpoint(componentID))
212213
metricBytes, statusCode, err := monitoring.GetProcessMetrics(ctx, endpoint, path)
213214
if err != nil {
214215
return nil, err

0 commit comments

Comments
 (0)