Skip to content

Commit c031332

Browse files
authored
[cmd/opampsupervisor]: Configurable Supervisor OpAmp server port (#36002)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Allows the Supervisor's OpAmp server port to be configurable. This is useful for restricted environments deploying the supervisor. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #36001 <!--Describe what testing was performed and which tests were added.--> #### Testing Updated tests and added an e2e test <!--Describe the documentation added.--> #### Documentation Spec was updated accordingly. Also added documentation for previously added `health_check_port` that wasn't present in spec. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 92e336e commit c031332

File tree

7 files changed

+181
-6
lines changed

7 files changed

+181
-6
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Makes the Supervisor's OpAmp server port configurable with 'agent::opamp_server_port'.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36001]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/e2e_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1443,6 +1443,71 @@ func TestSupervisorLogging(t *testing.T) {
14431443
require.NoError(t, logFile.Close())
14441444
}
14451445

1446+
func TestSupervisorOpAmpServerPort(t *testing.T) {
1447+
var agentConfig atomic.Value
1448+
server := newOpAMPServer(
1449+
t,
1450+
defaultConnectingHandler,
1451+
server.ConnectionCallbacksStruct{
1452+
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
1453+
if message.EffectiveConfig != nil {
1454+
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
1455+
if config != nil {
1456+
agentConfig.Store(string(config.Body))
1457+
}
1458+
}
1459+
1460+
return &protobufs.ServerToAgent{}
1461+
},
1462+
})
1463+
1464+
supervisorOpAmpServerPort, err := findRandomPort()
1465+
require.NoError(t, err)
1466+
1467+
s := newSupervisor(t, "server_port", map[string]string{"url": server.addr, "supervisor_opamp_server_port": fmt.Sprintf("%d", supervisorOpAmpServerPort)})
1468+
1469+
require.Nil(t, s.Start())
1470+
defer s.Shutdown()
1471+
1472+
waitForSupervisorConnection(server.supervisorConnected, true)
1473+
1474+
cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)
1475+
1476+
server.sendToSupervisor(&protobufs.ServerToAgent{
1477+
RemoteConfig: &protobufs.AgentRemoteConfig{
1478+
Config: &protobufs.AgentConfigMap{
1479+
ConfigMap: map[string]*protobufs.AgentConfigFile{
1480+
"": {Body: cfg.Bytes()},
1481+
},
1482+
},
1483+
ConfigHash: hash,
1484+
},
1485+
})
1486+
1487+
require.Eventually(t, func() bool {
1488+
cfg, ok := agentConfig.Load().(string)
1489+
if ok {
1490+
// The effective config may be structurally different compared to what was sent,
1491+
// and will also have some data redacted,
1492+
// so just check that it includes the filelog receiver
1493+
return strings.Contains(cfg, "filelog")
1494+
}
1495+
1496+
return false
1497+
}, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config")
1498+
1499+
n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
1500+
require.NotZero(t, n, "Could not write to input file")
1501+
require.NoError(t, err)
1502+
1503+
require.Eventually(t, func() bool {
1504+
logRecord := make([]byte, 1024)
1505+
n, _ := outputFile.Read(logRecord)
1506+
1507+
return n != 0
1508+
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
1509+
}
1510+
14461511
func findRandomPort() (int, error) {
14471512
l, err := net.Listen("tcp", "localhost:0")
14481513

cmd/opampsupervisor/specification/README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,14 @@ agent:
160160
client.id: "01HWWSK84BMT7J45663MBJMTPJ"
161161
non_identifying_attributes:
162162
custom.attribute: "custom-value"
163-
163+
164+
# The port the Collector's health check extension will be configured to use
165+
health_check_port:
166+
167+
# The port the Supervisor will start its OpAmp server on and the Collector's
168+
# OpAmp extension will connect to
169+
opamp_server_port:
170+
164171
```
165172

166173
### Operation When OpAMP Server is Unavailable

cmd/opampsupervisor/supervisor/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ type Agent struct {
155155
Description AgentDescription `mapstructure:"description"`
156156
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
157157
HealthCheckPort int `mapstructure:"health_check_port"`
158+
OpAMPServerPort int `mapstructure:"opamp_server_port"`
158159
PassthroughLogs bool `mapstructure:"passthrough_logs"`
159160
}
160161

@@ -171,6 +172,10 @@ func (a Agent) Validate() error {
171172
return errors.New("agent::health_check_port must be a valid port number")
172173
}
173174

175+
if a.OpAMPServerPort < 0 || a.OpAMPServerPort > 65535 {
176+
return errors.New("agent::opamp_server_port must be a valid port number")
177+
}
178+
174179
if a.Executable == "" {
175180
return errors.New("agent::executable must be specified")
176181
}

cmd/opampsupervisor/supervisor/config/config_test.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func TestValidate(t *testing.T) {
227227
expectedError: "agent::orphan_detection_interval must be positive",
228228
},
229229
{
230-
name: "Invalid port number",
230+
name: "Invalid health check port number",
231231
config: Supervisor{
232232
Server: OpAMPServer{
233233
Endpoint: "wss://localhost:9090/opamp",
@@ -254,7 +254,7 @@ func TestValidate(t *testing.T) {
254254
expectedError: "agent::health_check_port must be a valid port number",
255255
},
256256
{
257-
name: "Zero value port number",
257+
name: "Zero value health check port number",
258258
config: Supervisor{
259259
Server: OpAMPServer{
260260
Endpoint: "wss://localhost:9090/opamp",
@@ -280,7 +280,7 @@ func TestValidate(t *testing.T) {
280280
},
281281
},
282282
{
283-
name: "Normal port number",
283+
name: "Normal health check port number",
284284
config: Supervisor{
285285
Server: OpAMPServer{
286286
Endpoint: "wss://localhost:9090/opamp",
@@ -331,6 +331,53 @@ func TestValidate(t *testing.T) {
331331
},
332332
expectedError: "agent::bootstrap_timeout must be positive",
333333
},
334+
{
335+
name: "Invalid opamp server port number",
336+
config: Supervisor{
337+
Server: OpAMPServer{
338+
Endpoint: "wss://localhost:9090/opamp",
339+
Headers: http.Header{
340+
"Header1": []string{"HeaderValue"},
341+
},
342+
},
343+
Agent: Agent{
344+
Executable: "${file_path}",
345+
OrphanDetectionInterval: 5 * time.Second,
346+
OpAMPServerPort: 65536,
347+
BootstrapTimeout: 5 * time.Second,
348+
},
349+
Capabilities: Capabilities{
350+
AcceptsRemoteConfig: true,
351+
},
352+
Storage: Storage{
353+
Directory: "/etc/opamp-supervisor/storage",
354+
},
355+
},
356+
expectedError: "agent::opamp_server_port must be a valid port number",
357+
},
358+
{
359+
name: "Zero value opamp server port number",
360+
config: Supervisor{
361+
Server: OpAMPServer{
362+
Endpoint: "wss://localhost:9090/opamp",
363+
Headers: http.Header{
364+
"Header1": []string{"HeaderValue"},
365+
},
366+
},
367+
Agent: Agent{
368+
Executable: "${file_path}",
369+
OrphanDetectionInterval: 5 * time.Second,
370+
OpAMPServerPort: 0,
371+
BootstrapTimeout: 5 * time.Second,
372+
},
373+
Capabilities: Capabilities{
374+
AcceptsRemoteConfig: true,
375+
},
376+
Storage: Storage{
377+
Directory: "/etc/opamp-supervisor/storage",
378+
},
379+
},
380+
},
334381
}
335382

336383
// create some fake files for validating agent config

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (s *Supervisor) createTemplates() error {
263263
// shuts down the Collector. This only needs to happen
264264
// once per Collector binary.
265265
func (s *Supervisor) getBootstrapInfo() (err error) {
266-
s.opampServerPort, err = s.findRandomPort()
266+
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
267267
if err != nil {
268268
return err
269269
}
@@ -457,7 +457,7 @@ func (s *Supervisor) startOpAMPServer() error {
457457
s.opampServer = server.New(newLoggerFromZap(s.logger))
458458

459459
var err error
460-
s.opampServerPort, err = s.findRandomPort()
460+
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
461461
if err != nil {
462462
return err
463463
}
@@ -1345,6 +1345,13 @@ func (s *Supervisor) agentConfigFilePath() string {
13451345
return filepath.Join(s.config.Storage.Directory, agentConfigFileName)
13461346
}
13471347

1348+
func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {
1349+
if s.config.Agent.OpAMPServerPort != 0 {
1350+
return s.config.Agent.OpAMPServerPort, nil
1351+
}
1352+
return s.findRandomPort()
1353+
}
1354+
13481355
func (s *Supervisor) findRandomPort() (int, error) {
13491356
l, err := net.Listen("tcp", "localhost:0")
13501357

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
server:
2+
endpoint: ws://{{.url}}/v1/opamp
3+
4+
capabilities:
5+
reports_effective_config: true
6+
reports_own_metrics: true
7+
reports_health: true
8+
accepts_remote_config: true
9+
reports_remote_config: true
10+
accepts_restart_command: true
11+
12+
storage:
13+
directory: '{{.storage_dir}}'
14+
15+
agent:
16+
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
17+
opamp_server_port: {{ .supervisor_opamp_server_port }}

0 commit comments

Comments
 (0)