Skip to content

Commit d4a23bf

Browse files
srikanthccvmichael-burt
authored andcommitted
[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks to report remote config status (open-telemetry#34907)
**Description:** This pull request addresses the remote config status reporting issue discussed in open-telemetry#21079 by introducing the following options to the Agent config: 1. `config_apply_timeout`: config update is successful if we receive a healthy status and then observe no failure updates for the entire duration of the timeout period; otherwise, failure is reported. **Link to tracking Issue:** open-telemetry#21079 **Testing:** Added e2e test **Documentation:** <Describe the documentation added.>
1 parent 435ad0a commit d4a23bf

File tree

7 files changed

+283
-25
lines changed

7 files changed

+283
-25
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Supervisor waits for configurable healthchecks to report remote config status.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [21079]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

cmd/opampsupervisor/e2e_test.go

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s
162162
cfg, err := config.Load(cfgFile.Name())
163163
require.NoError(t, err)
164164

165-
s, err := supervisor.NewSupervisor(zap.NewNop(), cfg)
165+
logger, err := zap.NewDevelopment()
166+
require.NoError(t, err)
167+
168+
s, err := supervisor.NewSupervisor(logger, cfg)
166169
require.NoError(t, err)
167170

168171
return s
@@ -1443,6 +1446,128 @@ func TestSupervisorLogging(t *testing.T) {
14431446
require.NoError(t, logFile.Close())
14441447
}
14451448

1449+
func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
1450+
var agentConfig atomic.Value
1451+
var healthReport atomic.Value
1452+
var remoteConfigStatus atomic.Value
1453+
server := newOpAMPServer(
1454+
t,
1455+
defaultConnectingHandler,
1456+
server.ConnectionCallbacksStruct{
1457+
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
1458+
if message.EffectiveConfig != nil {
1459+
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
1460+
if config != nil {
1461+
agentConfig.Store(string(config.Body))
1462+
}
1463+
}
1464+
if message.Health != nil {
1465+
healthReport.Store(message.Health)
1466+
}
1467+
if message.RemoteConfigStatus != nil {
1468+
remoteConfigStatus.Store(message.RemoteConfigStatus)
1469+
}
1470+
1471+
return &protobufs.ServerToAgent{}
1472+
},
1473+
})
1474+
1475+
s := newSupervisor(t, "report_status", map[string]string{
1476+
"url": server.addr,
1477+
"config_apply_timeout": "3s",
1478+
})
1479+
require.Nil(t, s.Start())
1480+
defer s.Shutdown()
1481+
1482+
waitForSupervisorConnection(server.supervisorConnected, true)
1483+
1484+
cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)
1485+
1486+
server.sendToSupervisor(&protobufs.ServerToAgent{
1487+
RemoteConfig: &protobufs.AgentRemoteConfig{
1488+
Config: &protobufs.AgentConfigMap{
1489+
ConfigMap: map[string]*protobufs.AgentConfigFile{
1490+
"": {Body: cfg.Bytes()},
1491+
},
1492+
},
1493+
ConfigHash: hash,
1494+
},
1495+
})
1496+
1497+
// Check that the status is set to APPLYING
1498+
require.Eventually(t, func() bool {
1499+
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1500+
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
1501+
}, 5*time.Second, 100*time.Millisecond, "Remote config status was not set to APPLYING")
1502+
1503+
// Wait for collector to become healthy
1504+
require.Eventually(t, func() bool {
1505+
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
1506+
return ok && health.Healthy
1507+
}, 10*time.Second, 10*time.Millisecond, "Collector did not become healthy")
1508+
1509+
// Check that the status is set to APPLIED
1510+
require.Eventually(t, func() bool {
1511+
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1512+
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
1513+
}, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED")
1514+
1515+
require.Eventually(t, func() bool {
1516+
cfg, ok := agentConfig.Load().(string)
1517+
if ok {
1518+
// The effective config may be structurally different compared to what was sent,
1519+
// and will also have some data redacted,
1520+
// so just check that it includes the filelog receiver
1521+
return strings.Contains(cfg, "filelog")
1522+
}
1523+
1524+
return false
1525+
}, 5*time.Second, 10*time.Millisecond, "Collector was not started with remote config")
1526+
1527+
n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
1528+
require.NotZero(t, n, "Could not write to input file")
1529+
require.NoError(t, err)
1530+
1531+
require.Eventually(t, func() bool {
1532+
logRecord := make([]byte, 1024)
1533+
n, _ := outputFile.Read(logRecord)
1534+
1535+
return n != 0
1536+
}, 10*time.Second, 100*time.Millisecond, "Log never appeared in output")
1537+
1538+
// Test with bad configuration
1539+
badCfg, badHash := createBadCollectorConf(t)
1540+
1541+
server.sendToSupervisor(&protobufs.ServerToAgent{
1542+
RemoteConfig: &protobufs.AgentRemoteConfig{
1543+
Config: &protobufs.AgentConfigMap{
1544+
ConfigMap: map[string]*protobufs.AgentConfigFile{
1545+
"": {Body: badCfg.Bytes()},
1546+
},
1547+
},
1548+
ConfigHash: badHash,
1549+
},
1550+
})
1551+
1552+
// Check that the status is set to APPLYING
1553+
require.Eventually(t, func() bool {
1554+
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1555+
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
1556+
}, 5*time.Second, 200*time.Millisecond, "Remote config status was not set to APPLYING for bad config")
1557+
1558+
// Wait for the health checks to fail
1559+
require.Eventually(t, func() bool {
1560+
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
1561+
return ok && !health.Healthy
1562+
}, 30*time.Second, 100*time.Millisecond, "Collector did not become unhealthy with bad config")
1563+
1564+
// Check that the status is set to FAILED after failed health checks
1565+
require.Eventually(t, func() bool {
1566+
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
1567+
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED
1568+
}, 15*time.Second, 100*time.Millisecond, "Remote config status was not set to FAILED for bad config")
1569+
}
1570+
14461571
func TestSupervisorOpAmpServerPort(t *testing.T) {
14471572
var agentConfig atomic.Value
14481573
server := newOpAMPServer(

cmd/opampsupervisor/supervisor/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type Agent struct {
153153
Executable string
154154
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
155155
Description AgentDescription `mapstructure:"description"`
156+
ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"`
156157
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
157158
HealthCheckPort int `mapstructure:"health_check_port"`
158159
OpAMPServerPort int `mapstructure:"opamp_server_port"`
@@ -185,6 +186,10 @@ func (a Agent) Validate() error {
185186
return fmt.Errorf("could not stat agent::executable path: %w", err)
186187
}
187188

189+
if a.ConfigApplyTimeout <= 0 {
190+
return errors.New("agent::config_apply_timeout must be valid duration")
191+
}
192+
188193
return nil
189194
}
190195

@@ -234,6 +239,7 @@ func DefaultSupervisor() Supervisor {
234239
},
235240
Agent: Agent{
236241
OrphanDetectionInterval: 5 * time.Second,
242+
ConfigApplyTimeout: 5 * time.Second,
237243
BootstrapTimeout: 3 * time.Second,
238244
PassthroughLogs: false,
239245
},

cmd/opampsupervisor/supervisor/config/config_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func TestValidate(t *testing.T) {
3636
Agent: Agent{
3737
Executable: "${file_path}",
3838
OrphanDetectionInterval: 5 * time.Second,
39+
ConfigApplyTimeout: 2 * time.Second,
3940
BootstrapTimeout: 5 * time.Second,
4041
},
4142
Capabilities: Capabilities{
@@ -59,6 +60,7 @@ func TestValidate(t *testing.T) {
5960
},
6061
Agent: Agent{
6162
Executable: "${file_path}",
63+
ConfigApplyTimeout: 2 * time.Second,
6264
OrphanDetectionInterval: 5 * time.Second,
6365
},
6466
Capabilities: Capabilities{
@@ -84,6 +86,7 @@ func TestValidate(t *testing.T) {
8486
},
8587
Agent: Agent{
8688
Executable: "${file_path}",
89+
ConfigApplyTimeout: 2 * time.Second,
8790
OrphanDetectionInterval: 5 * time.Second,
8891
},
8992
Capabilities: Capabilities{
@@ -109,6 +112,7 @@ func TestValidate(t *testing.T) {
109112
},
110113
Agent: Agent{
111114
Executable: "${file_path}",
115+
ConfigApplyTimeout: 2 * time.Second,
112116
OrphanDetectionInterval: 5 * time.Second,
113117
},
114118
Capabilities: Capabilities{
@@ -138,6 +142,7 @@ func TestValidate(t *testing.T) {
138142
},
139143
Agent: Agent{
140144
Executable: "${file_path}",
145+
ConfigApplyTimeout: 2 * time.Second,
141146
OrphanDetectionInterval: 5 * time.Second,
142147
},
143148
Capabilities: Capabilities{
@@ -164,6 +169,7 @@ func TestValidate(t *testing.T) {
164169
Agent: Agent{
165170
Executable: "",
166171
OrphanDetectionInterval: 5 * time.Second,
172+
ConfigApplyTimeout: 2 * time.Second,
167173
BootstrapTimeout: 5 * time.Second,
168174
},
169175
Capabilities: Capabilities{
@@ -190,6 +196,7 @@ func TestValidate(t *testing.T) {
190196
Agent: Agent{
191197
Executable: "./path/does/not/exist",
192198
OrphanDetectionInterval: 5 * time.Second,
199+
ConfigApplyTimeout: 2 * time.Second,
193200
BootstrapTimeout: 5 * time.Second,
194201
},
195202
Capabilities: Capabilities{
@@ -215,6 +222,7 @@ func TestValidate(t *testing.T) {
215222
},
216223
Agent: Agent{
217224
Executable: "${file_path}",
225+
ConfigApplyTimeout: 2 * time.Second,
218226
OrphanDetectionInterval: -1,
219227
},
220228
Capabilities: Capabilities{
@@ -242,6 +250,7 @@ func TestValidate(t *testing.T) {
242250
Executable: "${file_path}",
243251
OrphanDetectionInterval: 5 * time.Second,
244252
HealthCheckPort: 65536,
253+
ConfigApplyTimeout: 2 * time.Second,
245254
BootstrapTimeout: 5 * time.Second,
246255
},
247256
Capabilities: Capabilities{
@@ -269,6 +278,7 @@ func TestValidate(t *testing.T) {
269278
Executable: "${file_path}",
270279
OrphanDetectionInterval: 5 * time.Second,
271280
HealthCheckPort: 0,
281+
ConfigApplyTimeout: 2 * time.Second,
272282
BootstrapTimeout: 5 * time.Second,
273283
},
274284
Capabilities: Capabilities{
@@ -295,6 +305,7 @@ func TestValidate(t *testing.T) {
295305
Executable: "${file_path}",
296306
OrphanDetectionInterval: 5 * time.Second,
297307
HealthCheckPort: 29848,
308+
ConfigApplyTimeout: 2 * time.Second,
298309
BootstrapTimeout: 5 * time.Second,
299310
},
300311
Capabilities: Capabilities{
@@ -320,6 +331,7 @@ func TestValidate(t *testing.T) {
320331
Agent: Agent{
321332
Executable: "${file_path}",
322333
OrphanDetectionInterval: 5 * time.Second,
334+
ConfigApplyTimeout: 2 * time.Second,
323335
BootstrapTimeout: -5 * time.Second,
324336
},
325337
Capabilities: Capabilities{
@@ -343,6 +355,7 @@ func TestValidate(t *testing.T) {
343355
Agent: Agent{
344356
Executable: "${file_path}",
345357
OrphanDetectionInterval: 5 * time.Second,
358+
ConfigApplyTimeout: 2 * time.Second,
346359
OpAMPServerPort: 65536,
347360
BootstrapTimeout: 5 * time.Second,
348361
},
@@ -367,6 +380,7 @@ func TestValidate(t *testing.T) {
367380
Agent: Agent{
368381
Executable: "${file_path}",
369382
OrphanDetectionInterval: 5 * time.Second,
383+
ConfigApplyTimeout: 2 * time.Second,
370384
OpAMPServerPort: 0,
371385
BootstrapTimeout: 5 * time.Second,
372386
},
@@ -378,6 +392,33 @@ func TestValidate(t *testing.T) {
378392
},
379393
},
380394
},
395+
{
396+
name: "Invalid config apply timeout",
397+
config: Supervisor{
398+
Server: OpAMPServer{
399+
Endpoint: "wss://localhost:9090/opamp",
400+
Headers: http.Header{
401+
"Header1": []string{"HeaderValue"},
402+
},
403+
TLSSetting: configtls.ClientConfig{
404+
Insecure: true,
405+
},
406+
},
407+
Agent: Agent{
408+
Executable: "${file_path}",
409+
OrphanDetectionInterval: 5 * time.Second,
410+
OpAMPServerPort: 8080,
411+
BootstrapTimeout: 5 * time.Second,
412+
},
413+
Capabilities: Capabilities{
414+
AcceptsRemoteConfig: true,
415+
},
416+
Storage: Storage{
417+
Directory: "/etc/opamp-supervisor/storage",
418+
},
419+
},
420+
expectedError: "agent::config_apply_timeout must be valid duration",
421+
},
381422
}
382423

383424
// create some fake files for validating agent config

0 commit comments

Comments
 (0)