diff --git a/.chloggen/supervisor-no-pipelines.yaml b/.chloggen/supervisor-no-pipelines.yaml new file mode 100644 index 0000000000000..0116156cd8867 --- /dev/null +++ b/.chloggen/supervisor-no-pipelines.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow controlling Collectors that don't include the nopreceiver and nopexporer + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38809] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This requires Collectors built with Collector API v0.122.0+. The nopreceiver + and nopexporter will continue to be supported for a few releases, after which + only v0.122.0+ will be supported. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index f57493cf12a88..27c35bf673226 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -617,77 +617,109 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { } func TestSupervisorBootstrapsCollector(t *testing.T) { - agentDescription := atomic.Value{} + tests := []struct { + name string + cfg string + env []string + precheck func(t *testing.T) + }{ + { + name: "With service.AllowNoPipelines", + cfg: "nocap", + }, + { + name: "Without service.AllowNoPipelines", + cfg: "no_fg", + env: []string{ + "COLLECTOR_BIN=../../bin/otelcontribcol_" + runtime.GOOS + "_" + runtime.GOARCH, + }, + precheck: func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("This test requires a shell script, which may not be supported by Windows") + } + }, + }, + } - // Load the Supervisor config so we can get the location of - // the Collector that will be run. - var cfg config.Supervisor - cfgFile := getSupervisorConfig(t, "nocap", map[string]string{}) - k := koanf.New("::") - err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser()) - require.NoError(t, err) - err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{ - Tag: "mapstructure", - }) - require.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agentDescription := atomic.Value{} - // Get the binary name and version from the Collector binary - // using the `components` command that prints a YAML-encoded - // map of information about the Collector build. Some of this - // information will be used as defaults for the telemetry - // attributes. - agentPath := cfg.Agent.Executable - componentsInfo, err := exec.Command(agentPath, "components").Output() - require.NoError(t, err) - k = koanf.New("::") - err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser()) - require.NoError(t, err) - buildinfo := k.StringMap("buildinfo") - command := buildinfo["command"] - version := buildinfo["version"] + // Load the Supervisor config so we can get the location of + // the Collector that will be run. + var cfg config.Supervisor + cfgFile := getSupervisorConfig(t, tt.cfg, map[string]string{}) + k := koanf.New("::") + err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser()) + require.NoError(t, err) + err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{ + Tag: "mapstructure", + }) + require.NoError(t, err) - server := newOpAMPServer( - t, - defaultConnectingHandler, - types.ConnectionCallbacks{ - OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { - if message.AgentDescription != nil { - agentDescription.Store(message.AgentDescription) - } + // Get the binary name and version from the Collector binary + // using the `components` command that prints a YAML-encoded + // map of information about the Collector build. Some of this + // information will be used as defaults for the telemetry + // attributes. + agentPath := cfg.Agent.Executable + cmd := exec.Command(agentPath, "components") + for _, env := range tt.env { + cmd.Env = append(cmd.Env, env) + } + componentsInfo, err := cmd.Output() + require.NoError(t, err) + k = koanf.New("::") + err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser()) + require.NoError(t, err) + buildinfo := k.StringMap("buildinfo") + command := buildinfo["command"] + version := buildinfo["version"] + + server := newOpAMPServer( + t, + defaultConnectingHandler, + types.ConnectionCallbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.AgentDescription != nil { + agentDescription.Store(message.AgentDescription) + } - return &protobufs.ServerToAgent{} - }, - }) + return &protobufs.ServerToAgent{} + }, + }) - s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) - require.Nil(t, s.Start()) - defer s.Shutdown() + require.Nil(t, s.Start()) + defer s.Shutdown() - waitForSupervisorConnection(server.supervisorConnected, true) + waitForSupervisorConnection(server.supervisorConnected, true) - require.Eventually(t, func() bool { - ad, ok := agentDescription.Load().(*protobufs.AgentDescription) - if !ok { - return false - } + require.Eventually(t, func() bool { + ad, ok := agentDescription.Load().(*protobufs.AgentDescription) + if !ok { + return false + } - var agentName, agentVersion string - identAttr := ad.IdentifyingAttributes - for _, attr := range identAttr { - switch attr.Key { - case semconv.AttributeServiceName: - agentName = attr.Value.GetStringValue() - case semconv.AttributeServiceVersion: - agentVersion = attr.Value.GetStringValue() - } - } + var agentName, agentVersion string + identAttr := ad.IdentifyingAttributes + for _, attr := range identAttr { + switch attr.Key { + case semconv.AttributeServiceName: + agentName = attr.Value.GetStringValue() + case semconv.AttributeServiceVersion: + agentVersion = attr.Value.GetStringValue() + } + } - // By default the Collector should report its name and version - // from the component.BuildInfo struct built into the Collector - // binary. - return agentName == command && agentVersion == version - }, 5*time.Second, 250*time.Millisecond) + // By default the Collector should report its name and version + // from the component.BuildInfo struct built into the Collector + // binary. + return agentName == command && agentVersion == version + }, 5*time.Second, 250*time.Millisecond) + }) + } } func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) { diff --git a/cmd/opampsupervisor/supervisor/commander/commander.go b/cmd/opampsupervisor/supervisor/commander/commander.go index 009f066c77333..d13df3fd90cf7 100644 --- a/cmd/opampsupervisor/supervisor/commander/commander.go +++ b/cmd/opampsupervisor/supervisor/commander/commander.go @@ -201,6 +201,108 @@ func (c *Commander) watch() { c.exitCh <- struct{}{} } +// StartOneShot starts the Collector with the expectation that it will immediately +// exit after it finishes a quick operation. This is useful for situations like reading stdout/sterr +// to e.g. check the feature gate the Collector supports. +func (c *Commander) StartOneShot() ([]byte, []byte, error) { + stdout := []byte{} + stderr := []byte{} + ctx := context.Background() + + cmd := exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204 + cmd.Env = common.EnvVarMapToEnvMapSlice(c.cfg.Env) + cmd.SysProcAttr = sysProcAttrs() + // grab cmd pipes + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, fmt.Errorf("stdoutPipe: %w", err) + } + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, nil, fmt.Errorf("stderrPipe: %w", err) + } + + // start agent + if err := cmd.Start(); err != nil { + return nil, nil, fmt.Errorf("start: %w", err) + } + // capture agent output + go func() { + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + stdout = append(stdout, scanner.Bytes()...) + stdout = append(stdout, byte('\n')) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stdout: %w", zap.Error(err)) + } + }() + go func() { + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + stderr = append(stderr, scanner.Bytes()...) + stderr = append(stderr, byte('\n')) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stderr: %w", zap.Error(err)) + } + }() + + c.logger.Debug("Agent process started", zap.Int("pid", cmd.Process.Pid)) + + doneCh := make(chan struct{}, 1) + + go func() { + err := cmd.Wait() + if err != nil { + c.logger.Error("One-shot Collector encountered an error during execution", zap.Error(err)) + } + doneCh <- struct{}{} + }() + + waitCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + + defer cancel() + + select { + case <-doneCh: + case <-waitCtx.Done(): + pid := cmd.Process.Pid + c.logger.Debug("Stopping agent process", zap.Int("pid", pid)) + + // Gracefully signal process to stop. + if err := sendShutdownSignal(cmd.Process); err != nil { + return nil, nil, err + } + + innerWaitCtx, innerCancel := context.WithTimeout(ctx, 10*time.Second) + + // Setup a goroutine to wait a while for process to finish and send kill signal + // to the process if it doesn't finish. + var innerErr error + go func() { + <-innerWaitCtx.Done() + + if !errors.Is(innerWaitCtx.Err(), context.DeadlineExceeded) { + c.logger.Debug("Agent process successfully stopped.", zap.Int("pid", pid)) + return + } + + // Time is out. Kill the process. + c.logger.Debug( + "Agent process is not responding to SIGTERM. Sending SIGKILL to kill forcibly.", + zap.Int("pid", pid)) + if innerErr = cmd.Process.Signal(os.Kill); innerErr != nil { + return + } + }() + + innerCancel() + } + + return stdout, stderr, nil +} + // Exited returns a channel that will send a signal when the Agent process exits. func (c *Commander) Exited() <-chan struct{} { return c.exitCh diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index c938e27153ff2..409f9477b634a 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -4,6 +4,7 @@ package supervisor import ( + "bufio" "bytes" "context" "crypto/tls" @@ -16,6 +17,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "text/template" @@ -73,8 +75,9 @@ var ( ) const ( - persistentStateFileName = "persistent_state.yaml" - agentConfigFileName = "effective.yaml" + persistentStateFileName = "persistent_state.yaml" + agentConfigFileName = "effective.yaml" + AllowNoPipelinesFeatureGate = "service.AllowNoPipelines" ) const maxBufferedCustomMessages = 10 @@ -182,6 +185,8 @@ type Supervisor struct { opampServerPort int telemetrySettings telemetrySettings + + featureGates map[string]struct{} } func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) { @@ -196,6 +201,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro doneChan: make(chan struct{}), customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages), agentConn: &atomic.Value{}, + featureGates: map[string]struct{}{}, } if err := s.createTemplates(); err != nil { return nil, err @@ -312,6 +318,10 @@ func (s *Supervisor) Start() error { return err } + if err = s.getFeatureGates(); err != nil { + return fmt.Errorf("could not get feature gates from the Collector: %w", err) + } + if err = s.getBootstrapInfo(); err != nil { return fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } @@ -365,6 +375,41 @@ func (s *Supervisor) Start() error { return nil } +func (s *Supervisor) getFeatureGates() error { + cmd, err := commander.NewCommander( + s.telemetrySettings.Logger, + s.config.Storage.Directory, + s.config.Agent, + "featuregate", + ) + if err != nil { + return err + } + + stdout, _, err := cmd.StartOneShot() + if err != nil { + return err + } + scanner := bufio.NewScanner(bytes.NewBuffer(stdout)) + + // First line only contains headers, discard it. + _ = scanner.Scan() + for scanner.Scan() { + line := scanner.Text() + i := strings.Index(line, " ") + flag := line[0:i] + + if flag == AllowNoPipelinesFeatureGate { + s.featureGates[AllowNoPipelinesFeatureGate] = struct{}{} + } + } + if err := scanner.Err(); err != nil { + fmt.Fprintln(os.Stderr, "reading standard input:", err) + } + + return nil +} + func (s *Supervisor) createTemplates() error { var err error @@ -498,11 +543,18 @@ func (s *Supervisor) getBootstrapInfo() (err error) { } }() + flags := []string{ + "--config", s.agentConfigFilePath(), + } + featuregateFlag := s.getFeatureGateFlag() + if len(featuregateFlag) > 0 { + flags = append(flags, featuregateFlag...) + } cmd, err := commander.NewCommander( s.telemetrySettings.Logger, s.config.Storage.Directory, s.config.Agent, - "--config", s.agentConfigFilePath(), + flags..., ) if err != nil { span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err)) @@ -902,12 +954,12 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot func (s *Supervisor) composeNoopPipeline() ([]byte, error) { var cfg bytes.Buffer - err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{ - "InstanceUid": s.persistentState.InstanceID.String(), - "SupervisorPort": s.opampServerPort, - }) - if err != nil { - return nil, err + + if !s.isFeatureGateSupported(AllowNoPipelinesFeatureGate) { + err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{}) + if err != nil { + return nil, err + } } return cfg.Bytes(), nil @@ -1664,6 +1716,24 @@ func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) { return s.findRandomPort() } +func (s *Supervisor) getFeatureGateFlag() []string { + flags := []string{} + for k := range s.featureGates { + flags = append(flags, k) + } + + if len(flags) == 0 { + return []string{} + } + + return []string{"--feature-gates", strings.Join(flags, ",")} +} + +func (s *Supervisor) isFeatureGateSupported(gate string) bool { + _, ok := s.featureGates[gate] + return ok +} + func (s *Supervisor) findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") if err != nil { diff --git a/cmd/opampsupervisor/testdata/collector/no_pipelines_fg.sh b/cmd/opampsupervisor/testdata/collector/no_pipelines_fg.sh new file mode 100755 index 0000000000000..f6d46b1773bf8 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/no_pipelines_fg.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +${COLLECTOR_BIN} "$@" | grep -v "service.AllowNoPipelines" diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_no_fg.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_no_fg.yaml new file mode 100644 index 0000000000000..188eecd95284d --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_no_fg.yaml @@ -0,0 +1,20 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + +capabilities: + reports_available_components: false + reports_effective_config: false + reports_own_metrics: false + reports_own_logs: false + reports_own_traces: false + reports_health: false + accepts_remote_config: false + reports_remote_config: false + +storage: + directory: '{{.storage_dir}}' + +agent: + executable: ./testdata/collector/no_pipelines_fg.sh + env: + COLLECTOR_BIN: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}