Skip to content

Commit 16c681e

Browse files
authored
Split Collector from the cobra.Command. (#4074)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 37f476a commit 16c681e

File tree

6 files changed

+84
-68
lines changed

6 files changed

+84
-68
lines changed

CHANGELOG.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66

77
- Move ValidateConfig from configcheck to configtest (#3956)
88
- Remove AttributeMessageType (#4020)
9-
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005).
10-
- Remove `AttributeHTTPStatusText` const, replaced with `"http.status_text"` (#4015, contrib/#5182).
11-
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063).
12-
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063).
9+
- Remove `mem-ballast-size-mib`, already deprecated and no-op (#4005)
10+
- Remove `AttributeHTTPStatusText` const (#4015)
11+
- Remove squash on `configtls.TLSClientSetting` and move TLS client configs under `tls` (#4063)
12+
- Rename TLS server config `*configtls.TLSServerSetting` from `tls_settings` to `tls` (#4063)
13+
- Split `service.Collector` from the `cobra.Command` (#4074)
1314

1415
## v0.35.0 Beta
1516

cmd/otelcol/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ func runInteractive(settings service.CollectorSettings) error {
4848
return fmt.Errorf("failed to construct the collector server: %w", err)
4949
}
5050

51-
err = app.Run()
52-
if err != nil {
51+
cmd := service.NewCommand(app)
52+
if err = cmd.Execute(); err != nil {
5353
return fmt.Errorf("collector server run finished with error: %w", err)
5454
}
5555

service/collector.go

Lines changed: 11 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ package service
1919
import (
2020
"context"
2121
"errors"
22-
"flag"
2322
"fmt"
2423
"os"
2524
"os/signal"
2625
"runtime"
2726
"syscall"
2827

29-
"github.com/spf13/cobra"
3028
"go.opentelemetry.io/contrib/zpages"
3129
"go.opentelemetry.io/otel"
3230
"go.opentelemetry.io/otel/metric"
@@ -36,12 +34,10 @@ import (
3634

3735
"go.opentelemetry.io/collector/component"
3836
"go.opentelemetry.io/collector/config/configcheck"
39-
"go.opentelemetry.io/collector/config/configtelemetry"
4037
"go.opentelemetry.io/collector/config/configunmarshaler"
4138
"go.opentelemetry.io/collector/config/experimental/configsource"
4239
"go.opentelemetry.io/collector/consumer/consumererror"
4340
"go.opentelemetry.io/collector/extension/ballastextension"
44-
"go.opentelemetry.io/collector/internal/collector/telemetry"
4541
"go.opentelemetry.io/collector/service/internal"
4642
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
4743
"go.opentelemetry.io/collector/service/parserprovider"
@@ -59,20 +55,19 @@ const (
5955

6056
// (Internal note) Collector Lifecycle:
6157
// - New constructs a new Collector.
62-
// - Run starts the collector and calls (*Collector).execute.
63-
// - execute calls setupConfigurationComponents to handle configuration.
58+
// - Run starts the collector.
59+
// - Run calls setupConfigurationComponents to handle configuration.
6460
// If configuration parser fails, collector's config can be reloaded.
6561
// Collector can be shutdown if parser gets a shutdown error.
66-
// - execute runs runAndWaitForShutdownEvent and waits for a shutdown event.
62+
// - Run runs runAndWaitForShutdownEvent and waits for a shutdown event.
6763
// SIGINT and SIGTERM, errors, and (*Collector).Shutdown can trigger the shutdown events.
6864
// - Upon shutdown, pipelines are notified, then pipelines and extensions are shut down.
69-
// - Users can call (*Collector).Shutdown anytime to shutdown the collector.
65+
// - Users can call (*Collector).Shutdown anytime to shut down the collector.
7066

7167
// Collector represents a server providing the OpenTelemetry Collector service.
7268
type Collector struct {
73-
set CollectorSettings
74-
rootCmd *cobra.Command
75-
logger *zap.Logger
69+
set CollectorSettings
70+
logger *zap.Logger
7671

7772
tracerProvider trace.TracerProvider
7873
meterProvider metric.MeterProvider
@@ -107,57 +102,17 @@ func New(set CollectorSettings) (*Collector, error) {
107102
set.ConfigUnmarshaler = configunmarshaler.NewDefault()
108103
}
109104

110-
col := &Collector{
105+
return &Collector{
111106
set: set,
112107
stateChannel: make(chan State, Closed+1),
113-
}
114-
115-
rootCmd := &cobra.Command{
116-
Use: set.BuildInfo.Command,
117-
Version: set.BuildInfo.Version,
118-
RunE: func(cmd *cobra.Command, args []string) error {
119-
return col.execute(cmd.Context())
120-
},
121-
}
122-
123-
// TODO: coalesce this code and expose this information to other components.
124-
flagSet := new(flag.FlagSet)
125-
addFlagsFns := []func(*flag.FlagSet){
126-
configtelemetry.Flags,
127-
parserprovider.Flags,
128-
telemetry.Flags,
129-
telemetrylogs.Flags,
130-
}
131-
for _, addFlags := range addFlagsFns {
132-
addFlags(flagSet)
133-
}
134-
rootCmd.Flags().AddGoFlagSet(flagSet)
135-
col.rootCmd = rootCmd
136-
137-
return col, nil
138-
}
139-
140-
// Run starts the collector according to the command and configuration
141-
// given by the user, and waits for it to complete.
142-
// Consecutive calls to Run are not allowed, Run shouldn't be called
143-
// once a collector is shut down.
144-
func (col *Collector) Run() error {
145-
// From this point on do not show usage in case of error.
146-
col.rootCmd.SilenceUsage = true
147-
148-
return col.rootCmd.Execute()
108+
}, nil
149109
}
150110

151111
// GetStateChannel returns state channel of the collector server.
152112
func (col *Collector) GetStateChannel() chan State {
153113
return col.stateChannel
154114
}
155115

156-
// Command returns Collector's root command.
157-
func (col *Collector) Command() *cobra.Command {
158-
return col.rootCmd
159-
}
160-
161116
// GetLogger returns logger used by the Collector.
162117
// The logger is initialized after collector server start.
163118
func (col *Collector) GetLogger() *zap.Logger {
@@ -246,7 +201,9 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
246201
return nil
247202
}
248203

249-
func (col *Collector) execute(ctx context.Context) error {
204+
// Run starts the collector according to the given configuration given, and waits for it to complete.
205+
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
206+
func (col *Collector) Run(ctx context.Context) error {
250207
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
251208
col.tracerProvider = sdktrace.NewTracerProvider(
252209
sdktrace.WithSampler(internal.AlwaysRecord()),

service/collector_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ func TestCollector_Start(t *testing.T) {
5858
LoggingOptions: []zap.Option{zap.Hooks(hook)},
5959
})
6060
require.NoError(t, err)
61-
assert.Equal(t, col.rootCmd, col.Command())
6261

6362
const testPrefix = "a_test"
6463
metricsPort := testutil.GetAvailablePort(t)
65-
col.rootCmd.SetArgs([]string{
64+
cmd := NewCommand(col)
65+
cmd.SetArgs([]string{
6666
"--config=testdata/otelcol-config.yaml",
6767
"--metrics-addr=localhost:" + strconv.FormatUint(uint64(metricsPort), 10),
6868
"--metrics-prefix=" + testPrefix,
@@ -71,7 +71,7 @@ func TestCollector_Start(t *testing.T) {
7171
colDone := make(chan struct{})
7272
go func() {
7373
defer close(colDone)
74-
assert.NoError(t, col.Run())
74+
assert.NoError(t, cmd.Execute())
7575
}()
7676

7777
assert.Equal(t, Starting, <-col.GetStateChannel())
@@ -123,12 +123,13 @@ func TestCollector_ReportError(t *testing.T) {
123123
col, err := New(CollectorSettings{BuildInfo: component.DefaultBuildInfo(), Factories: factories})
124124
require.NoError(t, err)
125125

126-
col.rootCmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})
126+
cmd := NewCommand(col)
127+
cmd.SetArgs([]string{"--config=testdata/otelcol-config-minimal.yaml"})
127128

128129
colDone := make(chan struct{})
129130
go func() {
130131
defer close(colDone)
131-
assert.EqualError(t, col.Run(), "failed to shutdown collector telemetry: err1")
132+
assert.EqualError(t, cmd.Execute(), "failed to shutdown collector telemetry: err1")
132133
}()
133134

134135
assert.Equal(t, Starting, <-col.GetStateChannel())
@@ -154,7 +155,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) {
154155
colDone := make(chan struct{})
155156
go func() {
156157
defer close(colDone)
157-
colErr := col.Run()
158+
colErr := col.Run(context.Background())
158159
if colErr != nil {
159160
err = colErr
160161
}

service/collector_windows.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,12 @@ func (s *WindowsService) start(elog *eventlog.Log, colErrorChannel chan error) e
8787
return err
8888
}
8989

90-
// col.Start blocks until receiving a SIGTERM signal, so needs to be started
90+
// col.Run blocks until receiving a SIGTERM signal, so needs to be started
9191
// asynchronously, but it will exit early if an error occurs on startup
92-
go func() { colErrorChannel <- s.col.Run() }()
92+
go func() {
93+
cmd := NewCommand(s.col)
94+
colErrorChannel <- cmd.Execute()
95+
}()
9396

9497
// wait until the collector server is in the Running state
9598
go func() {

service/command.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package service
16+
17+
import (
18+
"flag"
19+
20+
"github.com/spf13/cobra"
21+
22+
"go.opentelemetry.io/collector/config/configtelemetry"
23+
"go.opentelemetry.io/collector/internal/collector/telemetry"
24+
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
25+
"go.opentelemetry.io/collector/service/parserprovider"
26+
)
27+
28+
// NewCommand constructs a new cobra.Command using the given Collector.
29+
// TODO: Make this independent of the collector internals.
30+
func NewCommand(col *Collector) *cobra.Command {
31+
rootCmd := &cobra.Command{
32+
Use: col.set.BuildInfo.Command,
33+
Version: col.set.BuildInfo.Version,
34+
SilenceUsage: true,
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
return col.Run(cmd.Context())
37+
},
38+
}
39+
40+
// TODO: coalesce this code and expose this information to other components.
41+
flagSet := new(flag.FlagSet)
42+
addFlagsFns := []func(*flag.FlagSet){
43+
configtelemetry.Flags,
44+
parserprovider.Flags,
45+
telemetry.Flags,
46+
telemetrylogs.Flags,
47+
}
48+
for _, addFlags := range addFlagsFns {
49+
addFlags(flagSet)
50+
}
51+
52+
rootCmd.Flags().AddGoFlagSet(flagSet)
53+
return rootCmd
54+
}

0 commit comments

Comments
 (0)