Skip to content

Commit e70295a

Browse files
swiatekmmergify[bot]
authored andcommitted
Fall back to process runtime if otel runtime is unsupported (#10087)
* Move component monitoring to its own package Agent monitoring has two separate functions - implementing the control plane monitoring server and self-monitoring for components. Having both in the same packages caused a dependency cycle involving the otel translation package. Resolve this by putting component monitoring in a subpackage * Fall back to process runtime if otel runtime is unsupported * Fix integration test * Normalize import names * Add logstash unit test * Use indices instead of allow_older_version * Add log line for skipped components * Change argument order * Fix linter warning (cherry picked from commit 2f0ba69) # Conflicts: # internal/pkg/agent/application/application.go # internal/pkg/agent/application/coordinator/coordinator.go # internal/pkg/otel/translate/otelconfig.go # internal/pkg/otel/translate/otelconfig_test.go
1 parent 1e37b36 commit e70295a

File tree

14 files changed

+416
-49
lines changed

14 files changed

+416
-49
lines changed

internal/pkg/agent/application/application.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@ import (
1111

1212
"go.elastic.co/apm/v2"
1313

14+
<<<<<<< HEAD
15+
=======
16+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
17+
18+
"github.com/elastic/go-ucfg"
19+
20+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
1421
"github.com/elastic/elastic-agent-libs/logp"
1522

1623
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1724
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
1825
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
19-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2026
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2127
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
2228
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
@@ -130,7 +136,7 @@ func New(
130136
if err != nil {
131137
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
132138
}
133-
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
139+
monitor := componentmonitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
134140

135141
runtime, err := runtime.NewManager(
136142
log,

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
<<<<<<< HEAD
17+
=======
18+
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
19+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
20+
21+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
1622
"go.opentelemetry.io/collector/component/componentstatus"
1723

1824
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
@@ -1635,6 +1641,7 @@ func (c *Coordinator) updateManagersWithConfig(model *component.Model) {
16351641
func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) {
16361642
var otelComponents, runtimeComponents []component.Component
16371643
for _, comp := range model.Components {
1644+
c.maybeOverrideRuntimeForComponent(&comp)
16381645
switch comp.RuntimeManager {
16391646
case component.OtelRuntimeManager:
16401647
otelComponents = append(otelComponents, comp)
@@ -1656,6 +1663,59 @@ func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtime
16561663
return
16571664
}
16581665

1666+
<<<<<<< HEAD
1667+
=======
1668+
// maybeOverrideRuntimeForComponent sets the correct runtime for the given component.
1669+
// Normally, we use the runtime set in the component itself via the configuration, but
1670+
// we may also fall back to the process runtime if the otel runtime is unsupported for
1671+
// some reason. One example is the output using unsupported config options.
1672+
func (c *Coordinator) maybeOverrideRuntimeForComponent(comp *component.Component) {
1673+
if comp.RuntimeManager == component.ProcessRuntimeManager {
1674+
// do nothing, the process runtime can handle any component
1675+
return
1676+
}
1677+
if comp.RuntimeManager == component.OtelRuntimeManager {
1678+
// check if the component is actually supported
1679+
err := translate.VerifyComponentIsOtelSupported(comp)
1680+
if err != nil {
1681+
c.logger.Warnf("otel runtime is not supported for component %s, switching to process runtime, reason: %v", comp.ID, err)
1682+
comp.RuntimeManager = component.ProcessRuntimeManager
1683+
}
1684+
}
1685+
}
1686+
1687+
func (c *Coordinator) isFleetServer() bool {
1688+
for _, s := range c.state.Components {
1689+
if s.Component.InputType == fleetServer {
1690+
return true
1691+
}
1692+
}
1693+
return false
1694+
}
1695+
1696+
func (c *Coordinator) HasEndpoint() bool {
1697+
for _, component := range c.state.Components {
1698+
if component.Component.InputType == endpoint {
1699+
return true
1700+
}
1701+
}
1702+
1703+
return false
1704+
}
1705+
1706+
func (c *Coordinator) ackMigration(ctx context.Context, action *fleetapi.ActionMigrate, acker acker.Acker) error {
1707+
if err := acker.Ack(ctx, action); err != nil {
1708+
return fmt.Errorf("failed to ack migrate action: %w", err)
1709+
}
1710+
1711+
if err := acker.Commit(ctx); err != nil {
1712+
return fmt.Errorf("failed to commit migrate action: %w", err)
1713+
}
1714+
1715+
return nil
1716+
}
1717+
1718+
>>>>>>> 2f0ba69f2 (Fall back to process runtime if otel runtime is unsupported (#10087))
16591719
// generateComponentModel regenerates the configuration tree and
16601720
// components from the current AST and vars and returns the result.
16611721
// Called from both the main Coordinator goroutine and from external

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 80 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,8 +1023,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t
10231023
secretMarkerFunc: testSecretMarkerFunc,
10241024
}
10251025

1026-
// Create a policy with one input and one output (no otel configuration)
1027-
cfg := config.MustNewConfigFrom(`
1026+
t.Run("mixed policy", func(t *testing.T) {
1027+
// Create a policy with one input and one output (no otel configuration)
1028+
cfg := config.MustNewConfigFrom(`
10281029
outputs:
10291030
default:
10301031
type: elasticsearch
@@ -1051,38 +1052,87 @@ service:
10511052
- nop
10521053
`)
10531054

1054-
// Send the policy change and make sure it was acknowledged.
1055-
cfgChange := &configChange{cfg: cfg}
1056-
configChan <- cfgChange
1057-
coord.runLoopIteration(ctx)
1058-
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1055+
// Send the policy change and make sure it was acknowledged.
1056+
cfgChange := &configChange{cfg: cfg}
1057+
configChan <- cfgChange
1058+
coord.runLoopIteration(ctx)
1059+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1060+
1061+
// Make sure the runtime manager received the expected component update.
1062+
// An assert.Equal on the full component model doesn't play nice with
1063+
// the embedded proto structs, so instead we verify the important fields
1064+
// manually (sorry).
1065+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1066+
require.Equal(t, 1, len(components), "Test policy should generate one component")
1067+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1068+
require.NotNil(t, otelConfig, "OTel manager should have config")
1069+
1070+
runtimeComponent := components[0]
1071+
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1072+
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1073+
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1074+
require.Equal(t, 2, len(runtimeComponent.Units))
1075+
1076+
units := runtimeComponent.Units
1077+
// Verify the input unit
1078+
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1079+
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1080+
assert.Equal(t, "test-other-input", units[0].Config.Id)
1081+
assert.Equal(t, "system/metrics", units[0].Config.Type)
1082+
1083+
// Verify the output unit
1084+
assert.Equal(t, "system/metrics-default", units[1].ID)
1085+
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1086+
assert.Equal(t, "elasticsearch", units[1].Config.Type)
1087+
})
1088+
1089+
t.Run("unsupported otel output option", func(t *testing.T) {
1090+
// Create a policy with one input and one output (no otel configuration)
1091+
cfg := config.MustNewConfigFrom(`
1092+
outputs:
1093+
default:
1094+
type: elasticsearch
1095+
hosts:
1096+
- localhost:9200
1097+
indices: [] # not supported by the elasticsearch exporter
1098+
inputs:
1099+
- id: test-input
1100+
type: filestream
1101+
use_output: default
1102+
_runtime_experimental: otel
1103+
- id: test-other-input
1104+
type: system/metrics
1105+
use_output: default
1106+
receivers:
1107+
nop:
1108+
exporters:
1109+
nop:
1110+
service:
1111+
pipelines:
1112+
traces:
1113+
receivers:
1114+
- nop
1115+
exporters:
1116+
- nop
1117+
`)
10591118

1060-
// Make sure the runtime manager received the expected component update.
1061-
// An assert.Equal on the full component model doesn't play nice with
1062-
// the embedded proto structs, so instead we verify the important fields
1063-
// manually (sorry).
1064-
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1065-
require.Equal(t, 1, len(components), "Test policy should generate one component")
1066-
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1067-
require.NotNil(t, otelConfig, "OTel manager should have config")
1119+
// Send the policy change and make sure it was acknowledged.
1120+
cfgChange := &configChange{cfg: cfg}
1121+
configChan <- cfgChange
1122+
coord.runLoopIteration(ctx)
1123+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
10681124

1069-
runtimeComponent := components[0]
1070-
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1071-
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1072-
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1073-
require.Equal(t, 2, len(runtimeComponent.Units))
1125+
// Make sure the runtime manager received the expected component update.
1126+
// An assert.Equal on the full component model doesn't play nice with
1127+
// the embedded proto structs, so instead we verify the important fields
1128+
// manually (sorry).
1129+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1130+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1131+
require.NotNil(t, otelConfig, "OTel manager should have config")
10741132

1075-
units := runtimeComponent.Units
1076-
// Verify the input unit
1077-
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1078-
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1079-
assert.Equal(t, "test-other-input", units[0].Config.Id)
1080-
assert.Equal(t, "system/metrics", units[0].Config.Type)
1133+
assert.Len(t, components, 2, "both components should be assigned to the runtime manager")
1134+
})
10811135

1082-
// Verify the output unit
1083-
assert.Equal(t, "system/metrics-default", units[1].ID)
1084-
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1085-
assert.Equal(t, "elasticsearch", units[1].Config.Type)
10861136
}
10871137

10881138
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
File renamed without changes.

internal/pkg/agent/application/monitoring/v1_monitor.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"crypto/sha256"

internal/pkg/agent/application/monitoring/v1_monitor_test.go renamed to internal/pkg/agent/application/monitoring/component/v1_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License 2.0;
33
// you may not use this file except in compliance with the Elastic License 2.0.
44

5-
package monitoring
5+
package component
66

77
import (
88
"context"

internal/pkg/agent/application/monitoring/process.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616

1717
"github.com/gorilla/mux"
1818

19+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
20+
1921
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2022
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2123
"github.com/elastic/elastic-agent/pkg/utils"
@@ -120,7 +122,7 @@ func isProcessRedirectable(componentID string) bool {
120122
}
121123

122124
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
123-
endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
125+
endpoint := componentmonitoring.PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
124126
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
125127
if metricsErr != nil {
126128
return metricsErr

internal/pkg/agent/application/monitoring/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"go.elastic.co/apm/module/apmgorilla/v2"
1919
"go.elastic.co/apm/v2"
2020

21+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
22+
2123
"github.com/elastic/elastic-agent-libs/api"
2224
"github.com/elastic/elastic-agent-libs/monitoring"
2325
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
@@ -74,7 +76,7 @@ func NewServer(
7476

7577
srvCfg := api.DefaultConfig()
7678
srvCfg.Enabled = cfg.Enabled
77-
srvCfg.Host = AgentMonitoringEndpoint(cfg)
79+
srvCfg.Host = componentmonitoring.AgentMonitoringEndpoint(cfg)
7880
srvCfg.Port = cfg.HTTP.Port
7981
log.Infof("creating monitoring API with cfg %#v", srvCfg)
8082
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {

internal/pkg/agent/cmd/inspect.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
"github.com/spf13/cobra"
1515
"gopkg.in/yaml.v2"
1616

17+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
18+
1719
"github.com/elastic/elastic-agent-libs/logp"
1820
"github.com/elastic/elastic-agent-libs/service"
1921

2022
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
21-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
2425
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
@@ -412,7 +413,7 @@ func getMonitoringFn(ctx context.Context, logger *logger.Logger, cfg map[string]
412413
}
413414
otelExecMode := otelconfig.GetExecutionModeFromConfig(logger, config)
414415
isOtelExecModeSubprocess := otelExecMode == manager.SubprocessExecutionMode
415-
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
416+
monitor := componentmonitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo, isOtelExecModeSubprocess)
416417
return monitor.MonitoringConfig, nil
417418
}
418419

internal/pkg/otel/manager/diagnostics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121

2222
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring"
23+
componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
2324

2425
"google.golang.org/protobuf/types/known/timestamppb"
2526

@@ -208,7 +209,7 @@ func GetBeatInputMetricsDiagnostics(ctx context.Context, componentID string) (*p
208209
}
209210

210211
func GetBeatMetricsPayload(ctx context.Context, componentID string, path string) ([]byte, error) {
211-
endpoint := monitoring.PrefixedEndpoint(monitoring.BeatsMonitoringEndpoint(componentID))
212+
endpoint := componentmonitoring.PrefixedEndpoint(componentmonitoring.BeatsMonitoringEndpoint(componentID))
212213
metricBytes, statusCode, err := monitoring.GetProcessMetrics(ctx, endpoint, path)
213214
if err != nil {
214215
return nil, err

0 commit comments

Comments
 (0)