Skip to content

Commit dc63ccf

Browse files
[cmd/opampsupervisor] Refactor NewSupervisor function (#34597)
**Description:** Pass config structure to `NewSupervisor` instead of a file path. **Link to tracking Issue:** #34379 **Testing:** I run tests and adjusted the existing ones **Documentation:** - --------- Co-authored-by: Evan Bradley <[email protected]>
1 parent 2797fa0 commit dc63ccf

File tree

6 files changed

+150
-32
lines changed

6 files changed

+150
-32
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Pass config structure instead of file path when using NewSupervisor function
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34379]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

cmd/opampsupervisor/e2e_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,11 @@ func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFa
154154

155155
func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor {
156156
cfgFile := getSupervisorConfig(t, configType, extraConfigData)
157-
s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())
157+
158+
cfg, err := config.Load(cfgFile.Name())
159+
require.NoError(t, err)
160+
161+
s, err := supervisor.NewSupervisor(zap.NewNop(), cfg)
158162
require.NoError(t, err)
159163

160164
return s

cmd/opampsupervisor/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap"
1212

1313
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
1415
)
1516

1617
func main() {
@@ -19,7 +20,14 @@ func main() {
1920

2021
logger, _ := zap.NewDevelopment()
2122

22-
supervisor, err := supervisor.NewSupervisor(logger, *configFlag)
23+
cfg, err := config.Load(*configFlag)
24+
if err != nil {
25+
logger.Error(err.Error())
26+
os.Exit(-1)
27+
return
28+
}
29+
30+
supervisor, err := supervisor.NewSupervisor(logger, cfg)
2331
if err != nil {
2432
logger.Error(err.Error())
2533
os.Exit(-1)

cmd/opampsupervisor/supervisor/config/config.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
"runtime"
1414
"time"
1515

16+
"github.com/knadh/koanf/parsers/yaml"
17+
"github.com/knadh/koanf/providers/file"
18+
"github.com/knadh/koanf/v2"
1619
"github.com/open-telemetry/opamp-go/protobufs"
1720
"go.opentelemetry.io/collector/config/configtls"
1821
)
@@ -25,6 +28,33 @@ type Supervisor struct {
2528
Storage Storage `mapstructure:"storage"`
2629
}
2730

31+
// Load loads the Supervisor config from a file.
32+
func Load(configFile string) (Supervisor, error) {
33+
if configFile == "" {
34+
return Supervisor{}, errors.New("path to config file cannot be empty")
35+
}
36+
37+
k := koanf.New("::")
38+
if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil {
39+
return Supervisor{}, err
40+
}
41+
42+
decodeConf := koanf.UnmarshalConf{
43+
Tag: "mapstructure",
44+
}
45+
46+
cfg := DefaultSupervisor()
47+
if err := k.UnmarshalWithConf("", &cfg, decodeConf); err != nil {
48+
return Supervisor{}, fmt.Errorf("cannot parse %s: %w", configFile, err)
49+
}
50+
51+
if err := cfg.Validate(); err != nil {
52+
return Supervisor{}, fmt.Errorf("cannot validate supervisor config %s: %w", configFile, err)
53+
}
54+
55+
return cfg, nil
56+
}
57+
2858
func (s Supervisor) Validate() error {
2959
if err := s.Server.Validate(); err != nil {
3060
return err

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/google/uuid"
2424
"github.com/knadh/koanf/maps"
2525
"github.com/knadh/koanf/parsers/yaml"
26-
"github.com/knadh/koanf/providers/file"
2726
"github.com/knadh/koanf/providers/rawbytes"
2827
"github.com/knadh/koanf/v2"
2928
"github.com/open-telemetry/opamp-go/client"
@@ -136,7 +135,7 @@ type Supervisor struct {
136135
opampServerPort int
137136
}
138137

139-
func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
138+
func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
140139
s := &Supervisor{
141140
logger: logger,
142141
pidProvider: defaultPIDProvider{},
@@ -153,14 +152,12 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
153152
return nil, err
154153
}
155154

156-
if err := s.loadConfig(configFile); err != nil {
157-
return nil, fmt.Errorf("error loading config: %w", err)
158-
}
159-
160-
if err := s.config.Validate(); err != nil {
155+
if err := cfg.Validate(); err != nil {
161156
return nil, fmt.Errorf("error validating config: %w", err)
162157
}
163158

159+
s.config = cfg
160+
164161
if err := os.MkdirAll(s.config.Storage.Directory, 0700); err != nil {
165162
return nil, fmt.Errorf("error creating storage dir: %w", err)
166163
}
@@ -248,28 +245,6 @@ func (s *Supervisor) createTemplates() error {
248245
return nil
249246
}
250247

251-
func (s *Supervisor) loadConfig(configFile string) error {
252-
if configFile == "" {
253-
return errors.New("path to config file cannot be empty")
254-
}
255-
256-
k := koanf.New("::")
257-
if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil {
258-
return err
259-
}
260-
261-
decodeConf := koanf.UnmarshalConf{
262-
Tag: "mapstructure",
263-
}
264-
265-
s.config = config.DefaultSupervisor()
266-
if err := k.UnmarshalWithConf("", &s.config, decodeConf); err != nil {
267-
return fmt.Errorf("cannot parse %v: %w", configFile, err)
268-
}
269-
270-
return nil
271-
}
272-
273248
// getBootstrapInfo obtains the Collector's agent description by
274249
// starting a Collector with a specific config that only starts
275250
// an OpAMP extension, obtains the agent description, then
@@ -461,11 +436,17 @@ func (s *Supervisor) startOpAMPClient() error {
461436
func (s *Supervisor) startOpAMPServer() error {
462437
s.opampServer = server.New(newLoggerFromZap(s.logger))
463438

439+
var err error
440+
s.opampServerPort, err = s.findRandomPort()
441+
if err != nil {
442+
return err
443+
}
444+
464445
s.logger.Debug("Starting OpAMP server...")
465446

466447
connected := &atomic.Bool{}
467448

468-
err := s.opampServer.Start(flattenedSettings{
449+
err = s.opampServer.Start(flattenedSettings{
469450
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
470451
onConnectingFunc: func(_ *http.Request) (bool, int) {
471452
// Only allow one agent to be connected the this server at a time.

cmd/opampsupervisor/supervisor/supervisor_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ package supervisor
66
import (
77
"bytes"
88
"context"
9+
"fmt"
910
"net"
1011
"os"
12+
"path/filepath"
1113
"sync/atomic"
1214
"testing"
1315
"time"
@@ -23,6 +25,72 @@ import (
2325
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
2426
)
2527

28+
func setupSupervisorConfig(t *testing.T) config.Supervisor {
29+
t.Helper()
30+
31+
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
32+
require.NoError(t, err)
33+
34+
executablePath := filepath.Join(tmpDir, "binary")
35+
err = os.WriteFile(executablePath, []byte{}, 0o600)
36+
require.NoError(t, err)
37+
38+
configuration := `
39+
server:
40+
endpoint: ws://localhost/v1/opamp
41+
tls:
42+
insecure: true
43+
44+
capabilities:
45+
reports_effective_config: true
46+
reports_own_metrics: true
47+
reports_health: true
48+
accepts_remote_config: true
49+
reports_remote_config: true
50+
accepts_restart_command: true
51+
52+
storage:
53+
directory: %s
54+
55+
agent:
56+
executable: %s
57+
`
58+
configuration = fmt.Sprintf(configuration, filepath.Join(tmpDir, "storage"), executablePath)
59+
60+
cfgPath := filepath.Join(tmpDir, "config.yaml")
61+
err = os.WriteFile(cfgPath, []byte(configuration), 0o600)
62+
require.NoError(t, err)
63+
64+
cfg, err := config.Load(cfgPath)
65+
require.NoError(t, err)
66+
67+
t.Cleanup(func() {
68+
require.NoError(t, os.Chmod(tmpDir, 0o700))
69+
require.NoError(t, os.RemoveAll(tmpDir))
70+
})
71+
72+
return cfg
73+
}
74+
75+
func Test_NewSupervisor(t *testing.T) {
76+
cfg := setupSupervisorConfig(t)
77+
supervisor, err := NewSupervisor(zap.L(), cfg)
78+
require.NoError(t, err)
79+
require.NotNil(t, supervisor)
80+
}
81+
82+
func Test_NewSupervisorFailedStorageCreation(t *testing.T) {
83+
cfg := setupSupervisorConfig(t)
84+
85+
dir := filepath.Dir(cfg.Storage.Directory)
86+
require.NoError(t, os.Chmod(dir, 0o500))
87+
88+
supervisor, err := NewSupervisor(zap.L(), cfg)
89+
require.Error(t, err)
90+
require.ErrorContains(t, err, "error creating storage dir")
91+
require.Nil(t, supervisor)
92+
}
93+
2694
func Test_composeEffectiveConfig(t *testing.T) {
2795
acceptsRemoteConfig := true
2896
s := Supervisor{

0 commit comments

Comments
 (0)