Skip to content

Commit 8253bee

Browse files
swiatekmmergify[bot]
authored andcommitted
Fall back to process runtime if otel runtime is unsupported (#10087)
* Move component monitoring to its own package Agent monitoring has two separate functions - implementing the control plane monitoring server and self-monitoring for components. Having both in the same packages caused a dependency cycle involving the otel translation package. Resolve this by putting component monitoring in a subpackage * Fall back to process runtime if otel runtime is unsupported * Fix integration test * Normalize import names * Add logstash unit test * Use indices instead of allow_older_version * Add log line for skipped components * Change argument order * Fix linter warning (cherry picked from commit 2f0ba69) # Conflicts: # internal/pkg/agent/application/application.go # internal/pkg/agent/application/coordinator/coordinator.go # internal/pkg/agent/application/monitoring/process.go # internal/pkg/agent/cmd/inspect.go # internal/pkg/otel/manager/diagnostics.go # internal/pkg/otel/manager/diagnostics_test.go # internal/pkg/otel/translate/otelconfig.go # internal/pkg/otel/translate/otelconfig_test.go # testing/integration/ess/beat_receivers_test.go
1 parent 1f32480 commit 8253bee

File tree

14 files changed

+1481
-42
lines changed

14 files changed

+1481
-42
lines changed

internal/pkg/agent/application/application.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import (
1111

1212
"go.elastic.co/apm/v2"
1313

14+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
15+
1416
"github.com/elastic/go-ucfg"
1517

1618
"github.com/elastic/elastic-agent-libs/logp"
1719

1820
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1921
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
2022
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
21-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
@@ -128,7 +129,11 @@ func New(
128129
if err != nil {
129130
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
130131
}
132+
<<<<<<< HEAD
131133
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
134+
=======
135+
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
136+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
132137

133138
runtime, err := runtime.NewManager(
134139
log,

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
<<<<<<< HEAD
17+
=======
18+
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
19+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
1620
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
1721

1822
"go.opentelemetry.io/collector/component/componentstatus"
@@ -1678,6 +1682,7 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
16781682
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
16791683
var otelComponents, runtimeComponents []component.Component
16801684
for _, comp := range model.Components {
1685+
c.maybeOverrideRuntimeForComponent(&comp)
16811686
switch comp.RuntimeManager {
16821687
case component.OtelRuntimeManager:
16831688
otelComponents = append(otelComponents, comp)
@@ -1699,6 +1704,59 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime
16991704
return
17001705
}
17011706

1707+
<<<<<<< HEAD
1708+
=======
1709+
// maybeOverrideRuntimeForComponent sets the correct runtime for the given component.
1710+
// Normally, we use the runtime set in the component itself via the configuration, but
1711+
// we may also fall back to the process runtime if the otel runtime is unsupported for
1712+
// some reason. One example is the output using unsupported config options.
1713+
func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component) {
1714+
if comp.RuntimeManager == component.ProcessRuntimeManager {
1715+
// do nothing, the process runtime can handle any component
1716+
return
1717+
}
1718+
if comp.RuntimeManager == component.OtelRuntimeManager {
1719+
// check if the component is actually supported
1720+
err := translate.VerifyComponentIsOtelSupported(comp)
1721+
if err != nil {
1722+
c.logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err)
1723+
comp.RuntimeManager = component.ProcessRuntimeManager
1724+
}
1725+
}
1726+
}
1727+
1728+
func (c *Coordinator) isFleetServer() bool {
1729+
for _, s := range c.state.Components {
1730+
if s.Component.InputType == fleetServer {
1731+
return true
1732+
}
1733+
}
1734+
return false
1735+
}
1736+
1737+
func (c *Coordinator) HasEndpoint() bool {
1738+
for _, component := range c.state.Components {
1739+
if component.Component.InputType == endpoint {
1740+
return true
1741+
}
1742+
}
1743+
1744+
return false
1745+
}
1746+
1747+
func (c *Coordinator) ackMigration(ctx context.Context, action *fleetapi.ActionMigrate, acker acker.Acker) error {
1748+
if err := acker.Ack(ctx, action); err != nil {
1749+
return fmt.Errorf("failed to ack migrate action: %w", err)
1750+
}
1751+
1752+
if err := acker.Commit(ctx); err != nil {
1753+
return fmt.Errorf("failed to commit migrate action: %w", err)
1754+
}
1755+
1756+
return nil
1757+
}
1758+
1759+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
17021760
// generateComponentModel regenerates the configuration tree and
17031761
// components from the current AST and vars and returns the result.
17041762
// Called from both the main Coordinator goroutine and from external

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 80 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,8 +1024,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t
10241024
secretMarkerFunc: testSecretMarkerFunc,
10251025
}
10261026

1027-
// Create a policy with one input and one output (no otel configuration)
1028-
cfg := config.MustNewConfigFrom(`
1027+
t.Run("mixed policy", func(t *testing.T) {
1028+
// Create a policy with one input and one output (no otel configuration)
1029+
cfg := config.MustNewConfigFrom(`
10291030
outputs:
10301031
default:
10311032
type: elasticsearch
@@ -1052,38 +1053,87 @@ service:
10521053
- nop
10531054
`)
10541055

1055-
// Send the policy change and make sure it was acknowledged.
1056-
cfgChange := &configChange{cfg: cfg}
1057-
configChan <- cfgChange
1058-
coord.runLoopIteration(ctx)
1059-
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1056+
// Send the policy change and make sure it was acknowledged.
1057+
cfgChange := &configChange{cfg: cfg}
1058+
configChan <- cfgChange
1059+
coord.runLoopIteration(ctx)
1060+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1061+
1062+
// Make sure the runtime manager received the expected component update.
1063+
// An assert.Equal on the full component model doesn't play nice with
1064+
// the embedded proto structs, so instead we verify the important fields
1065+
// manually (sorry).
1066+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1067+
require.Equal(t, 1, len(components), "Test policy should generate one component")
1068+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1069+
require.NotNil(t, otelConfig, "OTel manager should have config")
1070+
1071+
runtimeComponent := components[0]
1072+
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1073+
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1074+
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1075+
require.Equal(t, 2, len(runtimeComponent.Units))
1076+
1077+
units := runtimeComponent.Units
1078+
// Verify the input unit
1079+
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1080+
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1081+
assert.Equal(t, "test-other-input", units[0].Config.Id)
1082+
assert.Equal(t, "system/metrics", units[0].Config.Type)
1083+
1084+
// Verify the output unit
1085+
assert.Equal(t, "system/metrics-default", units[1].ID)
1086+
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1087+
assert.Equal(t, "elasticsearch", units[1].Config.Type)
1088+
})
1089+
1090+
t.Run("unsupported otel output option", func(t *testing.T) {
1091+
// Create a policy with one input and one output (no otel configuration)
1092+
cfg := config.MustNewConfigFrom(`
1093+
outputs:
1094+
default:
1095+
type: elasticsearch
1096+
hosts:
1097+
- localhost:9200
1098+
indices: [] # not supported by the elasticsearch exporter
1099+
inputs:
1100+
- id: test-input
1101+
type: filestream
1102+
use_output: default
1103+
_runtime_experimental: otel
1104+
- id: test-other-input
1105+
type: system/metrics
1106+
use_output: default
1107+
receivers:
1108+
nop:
1109+
exporters:
1110+
nop:
1111+
service:
1112+
pipelines:
1113+
traces:
1114+
receivers:
1115+
- nop
1116+
exporters:
1117+
- nop
1118+
`)
10601119

1061-
// Make sure the runtime manager received the expected component update.
1062-
// An assert.Equal on the full component model doesn't play nice with
1063-
// the embedded proto structs, so instead we verify the important fields
1064-
// manually (sorry).
1065-
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1066-
require.Equal(t, 1, len(components), "Test policy should generate one component")
1067-
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1068-
require.NotNil(t, otelConfig, "OTel manager should have config")
1120+
// Send the policy change and make sure it was acknowledged.
1121+
cfgChange := &configChange{cfg: cfg}
1122+
configChan <- cfgChange
1123+
coord.runLoopIteration(ctx)
1124+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
10691125

1070-
runtimeComponent := components[0]
1071-
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1072-
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1073-
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1074-
require.Equal(t, 2, len(runtimeComponent.Units))
1126+
// Make sure the runtime manager received the expected component update.
1127+
// An assert.Equal on the full component model doesn't play nice with
1128+
// the embedded proto structs, so instead we verify the important fields
1129+
// manually (sorry).
1130+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1131+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1132+
require.NotNil(t, otelConfig, "OTel manager should have config")
10751133

1076-
units := runtimeComponent.Units
1077-
// Verify the input unit
1078-
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1079-
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1080-
assert.Equal(t, "test-other-input", units[0].Config.Id)
1081-
assert.Equal(t, "system/metrics", units[0].Config.Type)
1134+
assert.Len(t, components, 2, "both components should be assigned to the runtime manager")
1135+
})
10821136

1083-
// Verify the output unit
1084-
assert.Equal(t, "system/metrics-default", units[1].ID)
1085-
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1086-
assert.Equal(t, "elasticsearch", units[1].Config.Type)
10871137
}
10881138

10891139
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
File renamed without changes.

internal/pkg/agent/application/monitoring/v1_monitor.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"crypto/sha256"

internal/pkg/agent/application/monitoring/v1_monitor_test.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"context"

internal/pkg/agent/application/monitoring/process.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616

1717
"github.com/gorilla/mux"
1818

19+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
20+
1921
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2022
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2123
"github.com/elastic/elastic-agent/pkg/utils"
@@ -120,8 +122,13 @@ func isProcessRedirectable(componentID string) bool {
120122
}
121123

122124
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
125+
<<<<<<< HEAD
123126
endpoint := prefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
124127
metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), endpoint, path)
128+
=======
129+
endpoint := componentmonitoring.PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
130+
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
131+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
125132
if metricsErr != nil {
126133
return metricsErr
127134
}

internal/pkg/agent/application/monitoring/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"go.elastic.co/apm/module/apmgorilla/v2"
1919
"go.elastic.co/apm/v2"
2020

21+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
22+
2123
"github.com/elastic/elastic-agent-libs/api"
2224
"github.com/elastic/elastic-agent-libs/monitoring"
2325
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
@@ -74,7 +76,7 @@ func NewServer(
7476

7577
srvCfg := api.DefaultConfig()
7678
srvCfg.Enabled = cfg.Enabled
77-
srvCfg.Host = AgentMonitoringEndpoint(cfg)
79+
srvCfg.Host = componentmonitoring.AgentMonitoringEndpoint(cfg)
7880
srvCfg.Port = cfg.HTTP.Port
7981
log.Infof("creating monitoring API with cfg %#v", srvCfg)
8082
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {

internal/pkg/agent/cmd/inspect.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
"github.com/spf13/cobra"
1515
"gopkg.in/yaml.v2"
1616

17+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
18+
1719
"github.com/elastic/elastic-agent-libs/logp"
1820
"github.com/elastic/elastic-agent-libs/service"
1921

2022
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
21-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
@@ -408,8 +409,14 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
408409
if err != nil {
409410
return nil, fmt.Errorf("could not load agent info: %w", err)
410411
}
412+
<<<<<<< HEAD
411413

412414
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
415+
=======
416+
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
417+
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
418+
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
419+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
413420
return monitor.MonitoringConfig, nil
414421
}
415422

0 commit comments

Comments
 (0)