Skip to content

Commit 1e6e7a8

Browse files
committed
[cmd/opampsupervisor] Control Collectors with only the OpAMP extension
1 parent 0d3e1eb commit 1e6e7a8

File tree

3 files changed

+205
-9
lines changed

3 files changed

+205
-9
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow controlling Collectors that don't include the nopreceiver and nopexporer
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This requires Collectors built with Collector API v0.122.0+. The nopreceiver
20+
and nopexporter will continue to be supported for a few releases, after which
21+
only v0.122.0+ will be supported.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: []

cmd/opampsupervisor/supervisor/commander/commander.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,102 @@ func (c *Commander) watch() {
201201
c.exitCh <- struct{}{}
202202
}
203203

204+
func (c *Commander) StartOneShot() ([]byte, []byte, error) {
205+
stdout := []byte{}
206+
stderr := []byte{}
207+
ctx := context.Background()
208+
209+
cmd := exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204
210+
cmd.Env = common.EnvVarMapToEnvMapSlice(c.cfg.Env)
211+
cmd.SysProcAttr = sysProcAttrs()
212+
// grab cmd pipes
213+
stdoutPipe, err := cmd.StdoutPipe()
214+
if err != nil {
215+
return nil, nil, fmt.Errorf("stdoutPipe: %w", err)
216+
}
217+
stderrPipe, err := cmd.StderrPipe()
218+
if err != nil {
219+
return nil, nil, fmt.Errorf("stderrPipe: %w", err)
220+
}
221+
222+
// start agent
223+
if err := cmd.Start(); err != nil {
224+
return nil, nil, fmt.Errorf("start: %w", err)
225+
}
226+
// capture agent output
227+
go func() {
228+
scanner := bufio.NewScanner(stdoutPipe)
229+
for scanner.Scan() {
230+
stdout = append(stdout, scanner.Bytes()...)
231+
stdout = append(stdout, byte('\n'))
232+
}
233+
if err := scanner.Err(); err != nil {
234+
c.logger.Error("Error reading agent stdout: %w", zap.Error(err))
235+
}
236+
}()
237+
go func() {
238+
scanner := bufio.NewScanner(stderrPipe)
239+
for scanner.Scan() {
240+
stderr = append(stdout, scanner.Bytes()...)
241+
stderr = append(stderr, byte('\n'))
242+
}
243+
if err := scanner.Err(); err != nil {
244+
c.logger.Error("Error reading agent stderr: %w", zap.Error(err))
245+
}
246+
}()
247+
248+
c.logger.Debug("Agent process started", zap.Int("pid", cmd.Process.Pid))
249+
250+
doneCh := make(chan struct{}, 1)
251+
252+
go func() {
253+
cmd.Wait()
254+
doneCh <- struct{}{}
255+
}()
256+
257+
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
258+
259+
defer cancel()
260+
261+
select {
262+
case <-doneCh:
263+
case <-waitCtx.Done():
264+
pid := cmd.Process.Pid
265+
c.logger.Debug("Stopping agent process", zap.Int("pid", pid))
266+
267+
// Gracefully signal process to stop.
268+
if err := sendShutdownSignal(cmd.Process); err != nil {
269+
return nil, nil, err
270+
}
271+
272+
innerWaitCtx, innerCancel := context.WithTimeout(ctx, 10*time.Second)
273+
274+
// Setup a goroutine to wait a while for process to finish and send kill signal
275+
// to the process if it doesn't finish.
276+
var innerErr error
277+
go func() {
278+
<-innerWaitCtx.Done()
279+
280+
if !errors.Is(innerWaitCtx.Err(), context.DeadlineExceeded) {
281+
c.logger.Debug("Agent process successfully stopped.", zap.Int("pid", pid))
282+
return
283+
}
284+
285+
// Time is out. Kill the process.
286+
c.logger.Debug(
287+
"Agent process is not responding to SIGTERM. Sending SIGKILL to kill forcibly.",
288+
zap.Int("pid", pid))
289+
if innerErr = cmd.Process.Signal(os.Kill); innerErr != nil {
290+
return
291+
}
292+
}()
293+
294+
innerCancel()
295+
}
296+
297+
return stdout, stderr, nil
298+
}
299+
204300
// Exited returns a channel that will send a signal when the Agent process exits.
205301
func (c *Commander) Exited() <-chan struct{} {
206302
return c.exitCh

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package supervisor
55

66
import (
7+
"bufio"
78
"bytes"
89
"context"
910
"crypto/tls"
@@ -16,6 +17,7 @@ import (
1617
"os"
1718
"path/filepath"
1819
"sort"
20+
"strings"
1921
"sync"
2022
"sync/atomic"
2123
"text/template"
@@ -73,8 +75,9 @@ var (
7375
)
7476

7577
const (
76-
persistentStateFileName = "persistent_state.yaml"
77-
agentConfigFileName = "effective.yaml"
78+
persistentStateFileName = "persistent_state.yaml"
79+
agentConfigFileName = "effective.yaml"
80+
AllowNoPipelinesFeatureGate = "service.AllowNoPipelines"
7881
)
7982

8083
const maxBufferedCustomMessages = 10
@@ -182,6 +185,8 @@ type Supervisor struct {
182185
opampServerPort int
183186

184187
telemetrySettings telemetrySettings
188+
189+
featureGates map[string]struct{}
185190
}
186191

187192
func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
@@ -196,6 +201,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
196201
doneChan: make(chan struct{}),
197202
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
198203
agentConn: &atomic.Value{},
204+
featureGates: map[string]struct{}{},
199205
}
200206
if err := s.createTemplates(); err != nil {
201207
return nil, err
@@ -312,6 +318,10 @@ func (s *Supervisor) Start() error {
312318
return err
313319
}
314320

321+
if err = s.getFeatureGates(); err != nil {
322+
return fmt.Errorf("could not get feature gates from the Collector: %w", err)
323+
}
324+
315325
if err = s.getBootstrapInfo(); err != nil {
316326
return fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
317327
}
@@ -365,6 +375,41 @@ func (s *Supervisor) Start() error {
365375
return nil
366376
}
367377

378+
func (s *Supervisor) getFeatureGates() error {
379+
cmd, err := commander.NewCommander(
380+
s.telemetrySettings.Logger,
381+
s.config.Storage.Directory,
382+
s.config.Agent,
383+
"featuregate",
384+
)
385+
if err != nil {
386+
return err
387+
}
388+
389+
stdout, _, err := cmd.StartOneShot()
390+
if err != nil {
391+
return err
392+
}
393+
scanner := bufio.NewScanner(bytes.NewBuffer(stdout))
394+
395+
// First line only contains headers, discard it.
396+
_ = scanner.Scan()
397+
for scanner.Scan() {
398+
line := scanner.Text()
399+
i := strings.Index(line, " ")
400+
flag := line[0:i]
401+
402+
if flag == AllowNoPipelinesFeatureGate {
403+
s.featureGates[AllowNoPipelinesFeatureGate] = struct{}{}
404+
}
405+
}
406+
if err := scanner.Err(); err != nil {
407+
fmt.Fprintln(os.Stderr, "reading standard input:", err)
408+
}
409+
410+
return nil
411+
}
412+
368413
func (s *Supervisor) createTemplates() error {
369414
var err error
370415

@@ -498,11 +543,18 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
498543
}
499544
}()
500545

546+
flags := []string{
547+
"--config", s.agentConfigFilePath(),
548+
}
549+
featuregateFlag := s.getFeatureGateFlag()
550+
if len(featuregateFlag) > 0 {
551+
flags = append(flags, featuregateFlag...)
552+
}
501553
cmd, err := commander.NewCommander(
502554
s.telemetrySettings.Logger,
503555
s.config.Storage.Directory,
504556
s.config.Agent,
505-
"--config", s.agentConfigFilePath(),
557+
flags...,
506558
)
507559
if err != nil {
508560
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
@@ -902,12 +954,12 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot
902954

903955
func (s *Supervisor) composeNoopPipeline() ([]byte, error) {
904956
var cfg bytes.Buffer
905-
err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{
906-
"InstanceUid": s.persistentState.InstanceID.String(),
907-
"SupervisorPort": s.opampServerPort,
908-
})
909-
if err != nil {
910-
return nil, err
957+
958+
if !s.isFeatureGateSupported(AllowNoPipelinesFeatureGate) {
959+
err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{})
960+
if err != nil {
961+
return nil, err
962+
}
911963
}
912964

913965
return cfg.Bytes(), nil
@@ -1664,6 +1716,24 @@ func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {
16641716
return s.findRandomPort()
16651717
}
16661718

1719+
func (s *Supervisor) getFeatureGateFlag() []string {
1720+
flags := []string{}
1721+
for k, _ := range s.featureGates {
1722+
flags = append(flags, k)
1723+
}
1724+
1725+
if len(flags) == 0 {
1726+
return []string{}
1727+
}
1728+
1729+
return []string{"--feature-gates", strings.Join(flags, ",")}
1730+
}
1731+
1732+
func (s *Supervisor) isFeatureGateSupported(gate string) bool {
1733+
_, ok := s.featureGates[gate]
1734+
return ok
1735+
}
1736+
16671737
func (s *Supervisor) findRandomPort() (int, error) {
16681738
l, err := net.Listen("tcp", "localhost:0")
16691739
if err != nil {

0 commit comments

Comments
 (0)