Skip to content

Commit ea50558

Browse files
[opampsupervisor]: Skip executing the collector if no config is provided (open-telemetry#35430)
**Description:** <Describe what has changed.> If an empty config map is received, the supervisor does not run the agent. ~The current logic here works fine, but I'm considering adding an option to only do this if the user opts into it. I'm not sure if there's a reason why a user might want to run the collector with the noop config though (maybe for the agent's self-telemetry?)~ I've thought about it some more, and I don't think we need a config option here. If users want the collector to use a noop config, they can send a basic noop config. I think we should also implement open-telemetry#32598 (closed as stale, we'll want to re-open this or open a new issue for it), which would allow users to configure a backup config to use when no config is provided by the server, if they would like. **Link to tracking Issue:** Closes open-telemetry#33680 **Testing:** e2e test added Manually tested with a modified OpAMP server to send an empty config map **Documentation:** Update spec where it seemed applicable to call out this behavior.
1 parent b084ef1 commit ea50558

File tree

5 files changed

+169
-44
lines changed

5 files changed

+169
-44
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Skip executing the collector if no config is provided
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33680]

cmd/opampsupervisor/e2e_test.go

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -349,20 +349,10 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
349349
require.Nil(t, s.Start())
350350
defer s.Shutdown()
351351

352-
// Verify the collector is running by checking the metrics endpoint
353-
require.Eventually(t, func() bool {
354-
resp, err := http.DefaultClient.Get("http://localhost:12345")
355-
if err != nil {
356-
t.Logf("Failed agent healthcheck request: %s", err)
357-
return false
358-
}
359-
require.NoError(t, resp.Body.Close())
360-
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
361-
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
362-
return false
363-
}
364-
return true
365-
}, 3*time.Second, 100*time.Millisecond)
352+
// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
353+
time.Sleep(250 * time.Millisecond)
354+
_, err := http.DefaultClient.Get("http://localhost:12345")
355+
require.ErrorContains(t, err, "connection refused")
366356

367357
// Start the server and wait for the supervisor to connect
368358
server.start()
@@ -1266,6 +1256,95 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
12661256
require.FileExists(t, filepath.Join(storageDir, "effective.yaml"))
12671257
}
12681258

1259+
func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
1260+
agentCfgChan := make(chan string, 1)
1261+
server := newOpAMPServer(
1262+
t,
1263+
defaultConnectingHandler,
1264+
server.ConnectionCallbacksStruct{
1265+
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
1266+
if message.EffectiveConfig != nil {
1267+
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
1268+
if config != nil {
1269+
select {
1270+
case agentCfgChan <- string(config.Body):
1271+
default:
1272+
}
1273+
}
1274+
}
1275+
1276+
return &protobufs.ServerToAgent{}
1277+
},
1278+
})
1279+
1280+
s := newSupervisor(t, "healthcheck_port", map[string]string{
1281+
"url": server.addr,
1282+
"healthcheck_port": "12345",
1283+
})
1284+
1285+
require.Nil(t, s.Start())
1286+
defer s.Shutdown()
1287+
1288+
waitForSupervisorConnection(server.supervisorConnected, true)
1289+
1290+
cfg, hash, _, _ := createSimplePipelineCollectorConf(t)
1291+
1292+
server.sendToSupervisor(&protobufs.ServerToAgent{
1293+
RemoteConfig: &protobufs.AgentRemoteConfig{
1294+
Config: &protobufs.AgentConfigMap{
1295+
ConfigMap: map[string]*protobufs.AgentConfigFile{
1296+
"": {Body: cfg.Bytes()},
1297+
},
1298+
},
1299+
ConfigHash: hash,
1300+
},
1301+
})
1302+
1303+
select {
1304+
case <-agentCfgChan:
1305+
case <-time.After(1 * time.Second):
1306+
require.FailNow(t, "timed out waitng for agent to report its initial config")
1307+
}
1308+
1309+
// Use health check endpoint to determine if the collector is actually running
1310+
require.Eventually(t, func() bool {
1311+
resp, err := http.DefaultClient.Get("http://localhost:12345")
1312+
if err != nil {
1313+
t.Logf("Failed agent healthcheck request: %s", err)
1314+
return false
1315+
}
1316+
require.NoError(t, resp.Body.Close())
1317+
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
1318+
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
1319+
return false
1320+
}
1321+
return true
1322+
}, 3*time.Second, 100*time.Millisecond)
1323+
1324+
// Send empty config
1325+
emptyHash := sha256.Sum256([]byte{})
1326+
server.sendToSupervisor(&protobufs.ServerToAgent{
1327+
RemoteConfig: &protobufs.AgentRemoteConfig{
1328+
Config: &protobufs.AgentConfigMap{
1329+
ConfigMap: map[string]*protobufs.AgentConfigFile{},
1330+
},
1331+
ConfigHash: emptyHash[:],
1332+
},
1333+
})
1334+
1335+
select {
1336+
case <-agentCfgChan:
1337+
case <-time.After(1 * time.Second):
1338+
require.FailNow(t, "timed out waitng for agent to report its noop config")
1339+
}
1340+
1341+
// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
1342+
time.Sleep(250 * time.Millisecond)
1343+
_, err := http.DefaultClient.Get("http://localhost:12345")
1344+
require.ErrorContains(t, err, "connection refused")
1345+
1346+
}
1347+
12691348
func findRandomPort() (int, error) {
12701349
l, err := net.Listen("tcp", "localhost:0")
12711350

cmd/opampsupervisor/specification/README.md

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ agent:
166166
### Operation When OpAMP Server is Unavailable
167167
168168
When the supervisor cannot connect to the OpAMP server, the collector will
169-
be run with the last known configuration, or with a "noop" configuration
170-
if no previous configuration is persisted. The supervisor will continually
171-
attempt to reconnect to the OpAMP server with exponential backoff.
169+
be run with the last known configuration if a previous configuration is persisted.
170+
If no previous configuration has been persisted, the collector does not run.
171+
The supervisor will continually attempt to reconnect to the OpAMP server with exponential backoff.
172172
173173
### Executing Collector
174174
@@ -204,6 +204,10 @@ Configuration*](https://github.com/open-telemetry/opamp-spec/blob/main/specifica
204204
from the OpAMP Backend, merges it with an optional local config file and
205205
writes it to the Collector's config file, then restarts the Collector.
206206
207+
If the remote configuration from the OpAMP Backend contains an empty config map,
208+
the collector will be stopped and will not be run again until a non-empty config map
209+
is received from the OpAMP Backend.
210+
207211
In the future once config file watching is implemented the Collector can
208212
reload the config without the need for the Supervisor to restart the
209213
Collector process.
@@ -244,13 +248,13 @@ configuration.
244248
To overcome this problem the Supervisor starts the Collector with an
245249
"noop" configuration that collects nothing but allows the opamp
246250
extension to be started. The "noop" configuration is a single pipeline
247-
with an OTLP receiver that listens on a random port and a debug
248-
exporter, and the opamp extension. The purpose of the "noop"
249-
configuration is to make sure the Collector starts and the opamp
250-
extension communicates with the Supervisor.
251+
with an nop receiver, a nop exporter, and the opamp extension.
252+
The purpose of the "noop" configuration is to make sure the Collector starts
253+
and the opamp extension communicates with the Supervisor. The Collector is stopped
254+
after the AgentDescription is received from the Collector.
251255
252256
Once the initial Collector launch is successful and the remote
253-
configuration is received by the Supervisor the Supervisor restarts the
257+
configuration is received by the Supervisor the Supervisor starts the
254258
Collector with the new config. The new config is also cached by the
255259
Supervisor in a local file, so that subsequent restarts no longer need
256260
to start the Collector using the "noop" configuration. Caching of the

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ const (
6767

6868
const maxBufferedCustomMessages = 10
6969

70+
type configState struct {
71+
// Supervisor-assembled config to be given to the Collector.
72+
mergedConfig string
73+
// true if the server provided configmap was empty
74+
configMapIsEmpty bool
75+
}
76+
77+
func (c *configState) equal(other *configState) bool {
78+
return other.mergedConfig == c.mergedConfig && other.configMapIsEmpty == c.configMapIsEmpty
79+
}
80+
7081
// Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient
7182
// to work with an OpAMP Server.
7283
type Supervisor struct {
@@ -107,8 +118,8 @@ type Supervisor struct {
107118
// will listen on for health check requests from the Supervisor.
108119
agentHealthCheckEndpoint string
109120

110-
// Supervisor-assembled config to be given to the Collector.
111-
mergedConfig *atomic.Value
121+
// Internal config state for agent use. See the configState struct for more details.
122+
cfgState *atomic.Value
112123

113124
// Final effective config of the Collector.
114125
effectiveConfig *atomic.Value
@@ -143,7 +154,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
143154
pidProvider: defaultPIDProvider{},
144155
hasNewConfig: make(chan struct{}, 1),
145156
agentConfigOwnMetricsSection: &atomic.Value{},
146-
mergedConfig: &atomic.Value{},
157+
cfgState: &atomic.Value{},
147158
effectiveConfig: &atomic.Value{},
148159
agentDescription: &atomic.Value{},
149160
doneChan: make(chan struct{}),
@@ -793,8 +804,8 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
793804
}
794805

795806
// write the initial merged config to disk
796-
cfg := s.mergedConfig.Load().(string)
797-
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
807+
cfgState := s.cfgState.Load().(*configState)
808+
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
798809
s.logger.Error("Failed to write agent config.", zap.Error(err))
799810
}
800811

@@ -806,9 +817,11 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
806817
func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig {
807818
cfgStr, ok := s.effectiveConfig.Load().(string)
808819
if !ok {
809-
cfgStr, ok = s.mergedConfig.Load().(string)
820+
cfgState, ok := s.cfgState.Load().(*configState)
810821
if !ok {
811822
cfgStr = ""
823+
} else {
824+
cfgStr = cfgState.mergedConfig
812825
}
813826
}
814827

@@ -870,7 +883,11 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele
870883
func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) {
871884
var k = koanf.New("::")
872885

873-
if c := config.GetConfig(); c != nil {
886+
configMapIsEmpty := len(config.GetConfig().GetConfigMap()) == 0
887+
888+
if !configMapIsEmpty {
889+
c := config.GetConfig()
890+
874891
// Sort to make sure the order of merging is stable.
875892
var names []string
876893
for name := range c.ConfigMap {
@@ -939,11 +956,16 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c
939956
}
940957

941958
// Check if supervisor's merged config is changed.
942-
newMergedConfig := string(newMergedConfigBytes)
959+
960+
newConfigState := &configState{
961+
mergedConfig: string(newMergedConfigBytes),
962+
configMapIsEmpty: configMapIsEmpty,
963+
}
964+
943965
configChanged = false
944966

945-
oldConfig := s.mergedConfig.Swap(newMergedConfig)
946-
if oldConfig == nil || oldConfig.(string) != newMergedConfig {
967+
oldConfigState := s.cfgState.Swap(newConfigState)
968+
if oldConfigState == nil || !oldConfigState.(*configState).equal(newConfigState) {
947969
s.logger.Debug("Merged config changed.")
948970
configChanged = true
949971
}
@@ -963,6 +985,12 @@ func (s *Supervisor) handleRestartCommand() error {
963985
}
964986

965987
func (s *Supervisor) startAgent() {
988+
if s.cfgState.Load().(*configState).configMapIsEmpty {
989+
// Don't start the agent if there is no config to run
990+
s.logger.Info("No config present, not starting agent.")
991+
return
992+
}
993+
966994
err := s.commander.Start(context.Background())
967995
if err != nil {
968996
s.logger.Error("Cannot start the agent", zap.Error(err))
@@ -1104,14 +1132,14 @@ func (s *Supervisor) runAgentProcess() {
11041132

11051133
func (s *Supervisor) stopAgentApplyConfig() {
11061134
s.logger.Debug("Stopping the agent to apply new config")
1107-
cfg := s.mergedConfig.Load().(string)
1135+
cfgState := s.cfgState.Load().(*configState)
11081136
err := s.commander.Stop(context.Background())
11091137

11101138
if err != nil {
11111139
s.logger.Error("Could not stop agent process", zap.Error(err))
11121140
}
11131141

1114-
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
1142+
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
11151143
s.logger.Error("Failed to write agent config.", zap.Error(err))
11161144
}
11171145
}

cmd/opampsupervisor/supervisor/supervisor_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func Test_composeEffectiveConfig(t *testing.T) {
104104
pidProvider: staticPIDProvider(1234),
105105
hasNewConfig: make(chan struct{}, 1),
106106
agentConfigOwnMetricsSection: &atomic.Value{},
107-
mergedConfig: &atomic.Value{},
107+
cfgState: &atomic.Value{},
108108
agentHealthCheckEndpoint: "localhost:8000",
109109
}
110110

@@ -159,7 +159,7 @@ service:
159159
expectedConfig = bytes.ReplaceAll(expectedConfig, []byte("\r\n"), []byte("\n"))
160160

161161
require.True(t, configChanged)
162-
require.Equal(t, string(expectedConfig), s.mergedConfig.Load().(string))
162+
require.Equal(t, string(expectedConfig), s.cfgState.Load().(*configState).mergedConfig)
163163
}
164164

165165
func Test_onMessage(t *testing.T) {
@@ -176,7 +176,7 @@ func Test_onMessage(t *testing.T) {
176176
persistentState: &persistentState{InstanceID: initialID},
177177
agentDescription: agentDesc,
178178
agentConfigOwnMetricsSection: &atomic.Value{},
179-
mergedConfig: &atomic.Value{},
179+
cfgState: &atomic.Value{},
180180
effectiveConfig: &atomic.Value{},
181181
agentHealthCheckEndpoint: "localhost:8000",
182182
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
@@ -205,7 +205,7 @@ func Test_onMessage(t *testing.T) {
205205
persistentState: &persistentState{InstanceID: testUUID},
206206
agentDescription: agentDesc,
207207
agentConfigOwnMetricsSection: &atomic.Value{},
208-
mergedConfig: &atomic.Value{},
208+
cfgState: &atomic.Value{},
209209
effectiveConfig: &atomic.Value{},
210210
agentHealthCheckEndpoint: "localhost:8000",
211211
}
@@ -251,7 +251,7 @@ func Test_onMessage(t *testing.T) {
251251
hasNewConfig: make(chan struct{}, 1),
252252
persistentState: &persistentState{InstanceID: testUUID},
253253
agentConfigOwnMetricsSection: &atomic.Value{},
254-
mergedConfig: &atomic.Value{},
254+
cfgState: &atomic.Value{},
255255
effectiveConfig: &atomic.Value{},
256256
agentConn: agentConnAtomic,
257257
agentHealthCheckEndpoint: "localhost:8000",
@@ -332,7 +332,7 @@ func Test_onMessage(t *testing.T) {
332332
persistentState: &persistentState{InstanceID: initialID},
333333
agentDescription: agentDesc,
334334
agentConfigOwnMetricsSection: &atomic.Value{},
335-
mergedConfig: &atomic.Value{},
335+
cfgState: &atomic.Value{},
336336
effectiveConfig: &atomic.Value{},
337337
agentHealthCheckEndpoint: "localhost:8000",
338338
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
@@ -358,10 +358,11 @@ func Test_onMessage(t *testing.T) {
358358
})
359359

360360
require.Equal(t, newID, s.persistentState.InstanceID)
361-
t.Log(s.mergedConfig.Load())
362-
require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics")
363-
require.Contains(t, s.mergedConfig.Load(), newID.String())
364-
require.Contains(t, s.mergedConfig.Load(), "runtime.type: test")
361+
t.Log(s.cfgState.Load())
362+
mergedCfg := s.cfgState.Load().(*configState).mergedConfig
363+
require.Contains(t, mergedCfg, "prometheus/own_metrics")
364+
require.Contains(t, mergedCfg, newID.String())
365+
require.Contains(t, mergedCfg, "runtime.type: test")
365366
})
366367
}
367368

0 commit comments

Comments
 (0)