Skip to content

Commit c016bb9

Browse files
authored
Fix default discovery crash on Windows (#5202)
1 parent 6ba46aa commit c016bb9

File tree

4 files changed

+295
-24
lines changed

4 files changed

+295
-24
lines changed

cmd/otelcol/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package main
1919

2020
import (
21+
"context"
2122
"fmt"
2223
"log"
2324
"os"
@@ -36,10 +37,14 @@ import (
3637
)
3738

3839
func main() {
40+
runFromCmdLine(os.Args)
41+
}
42+
43+
func runFromCmdLine(args []string) {
3944
// TODO: Use same format as the collector
4045
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
4146

42-
collectorSettings, err := settings.New(os.Args[1:])
47+
collectorSettings, err := settings.New(args[1:])
4348
if err != nil {
4449
// Exit if --help flag was supplied and usage help was displayed.
4550
if err == flag.ErrHelp {
@@ -80,14 +85,21 @@ func main() {
8085
},
8186
}
8287

83-
os.Args = append(os.Args[:1], collectorSettings.ColCoreArgs()...)
88+
allArgs := args[:1]
89+
allArgs = append(allArgs, collectorSettings.ColCoreArgs()...)
90+
os.Args = allArgs
8491
if err = run(serviceSettings); err != nil {
8592
log.Fatal(err)
8693
}
8794
}
8895

96+
var otelcolCmdTestCtx context.Context // Use to control termination during tests.
97+
8998
func runInteractive(settings otelcol.CollectorSettings) error {
9099
cmd := otelcol.NewCommand(settings)
100+
if otelcolCmdTestCtx != nil {
101+
cmd.SetContext(otelcolCmdTestCtx)
102+
}
91103
if err := cmd.Execute(); err != nil {
92104
return fmt.Errorf("application run finished with error: %w", err)
93105
}

cmd/otelcol/main_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,111 @@
1414
// limitations under the License.
1515

1616
package main
17+
18+
import (
19+
"context"
20+
"net"
21+
"os"
22+
"runtime"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func TestRunFromCmdLine(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
panicMsg string
34+
skipMsg string
35+
args []string
36+
timeout time.Duration
37+
}{
38+
{
39+
name: "agent",
40+
args: []string{"otelcol", "--config=config/collector/agent_config.yaml"},
41+
timeout: 15 * time.Second,
42+
},
43+
{
44+
name: "gateway",
45+
args: []string{"otelcol", "--config=config/collector/gateway_config.yaml"},
46+
timeout: 15 * time.Second,
47+
},
48+
{
49+
name: "default_discovery",
50+
args: []string{"otelcol", "--discovery", "--config=config/collector/agent_config.yaml"},
51+
timeout: 30 * time.Second,
52+
},
53+
// Running the discovery with --dry-run in CI is not desirable because of the use of os.Exit(0) to end the execution.
54+
// That prevents the test from releasing resources like ports. The test needs to catch the panic to not fail the test,
55+
// however, the resources won't be properly released for the remaining tests that may use the same resources.
56+
// Skipping the test by default but keeping it around to deliberate runs on dev box.
57+
{
58+
name: "dry-run_discovery",
59+
args: []string{"otelcol", "--discovery", "--dry-run", "--config=config/collector/agent_config.yaml"},
60+
timeout: 30 * time.Second,
61+
panicMsg: "unexpected call to os.Exit(0) during test", // os.Exit(0) in the normal execution is expected for '--dry-run'.
62+
skipMsg: "Skipping this test by default because --dry-run uses os.Exit(0) to end the execution",
63+
},
64+
}
65+
66+
// Set execution environment
67+
requiredEnvVars := map[string]string{
68+
"SPLUNK_ACCESS_TOKEN": "access_token",
69+
"SPLUNK_HEC_TOKEN": "hec_token",
70+
"SPLUNK_REALM": "test_realm",
71+
"SPLUNK_LISTEN_INTERFACE": "127.0.0.1",
72+
"NO_WINDOWS_SERVICE": "true", // Avoid using the Windows service manager
73+
}
74+
for key, value := range requiredEnvVars {
75+
os.Setenv(key, value)
76+
defer os.Unsetenv(key)
77+
}
78+
79+
for _, tt := range tests {
80+
t.Run(tt.name, func(t *testing.T) {
81+
if tt.skipMsg != "" {
82+
t.Skip(tt.skipMsg)
83+
}
84+
85+
// GH darwin runners don't have docker installed, skip discovery tests on them
86+
// given that the docker_observer is enabled by default.
87+
if runtime.GOOS == "darwin" && (tt.name == "default_discovery" || tt.name == "dry-run_discovery") {
88+
if os.Getenv("GITHUB_ACTIONS") == "true" {
89+
t.Skip("skipping discovery tests on darwin runners since they don't have docker installed")
90+
}
91+
}
92+
93+
testCtx, cancel := context.WithTimeout(context.Background(), tt.timeout)
94+
defer cancel()
95+
96+
otelcolCmdTestCtx = testCtx
97+
defer func() {
98+
otelcolCmdTestCtx = nil
99+
}()
100+
101+
// Wait for the ConfigServer to be down after the test.
102+
defer waitForPort(t, "55554")
103+
104+
if tt.panicMsg != "" {
105+
assert.PanicsWithValue(t, tt.panicMsg, func() { runFromCmdLine(tt.args) })
106+
return
107+
}
108+
109+
waitForPort(t, "55554")
110+
runFromCmdLine(tt.args)
111+
})
112+
}
113+
}
114+
115+
func waitForPort(t *testing.T, port string) {
116+
require.Eventually(t, func() bool {
117+
ln, err := net.Listen("tcp", "localhost:"+port)
118+
if err == nil {
119+
ln.Close()
120+
return true
121+
}
122+
return false
123+
}, 60*time.Second, 500*time.Millisecond)
124+
}

internal/confmapprovider/discovery/discoverer.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/base64"
2020
"fmt"
2121
"os"
22+
"runtime"
2223
"sort"
2324
"strconv"
2425
"strings"
@@ -182,6 +183,19 @@ func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
182183
return nil, nil
183184
}
184185

186+
err = d.performDiscovery(discoveryReceivers, discoveryObservers)
187+
if err != nil {
188+
return nil, err
189+
}
190+
191+
discoveryConfig, err := d.discoveryConfig(cfg)
192+
if err != nil {
193+
return nil, fmt.Errorf("failed constructing discovery config: %w", err)
194+
}
195+
return discoveryConfig, nil
196+
}
197+
198+
func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelcolreceiver.Logs, discoveryObservers map[component.ID]otelcolextension.Extension) error {
185199
var cancels []context.CancelFunc
186200

187201
defer func() {
@@ -199,19 +213,35 @@ func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
199213
fmt.Sprintf("%q startup failed. Won't proceed with %q-based discovery", observerID, observerID.Type()),
200214
zap.Error(e),
201215
)
216+
return e
202217
}
218+
defer func(obsID component.ID, obsExt otelcolextension.Extension) {
219+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
220+
cancels = append(cancels, cancel)
221+
if e := obsExt.Shutdown(ctx); e != nil {
222+
d.logger.Warn(fmt.Sprintf("error shutting down observer %q", obsID), zap.Error(e))
223+
}
224+
}(observerID, observer)
203225
}
204226

205227
for receiverID, receiver := range discoveryReceivers {
206228
d.logger.Debug(fmt.Sprintf("starting receiver %q", receiverID))
207229
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
208230
cancels = append(cancels, cancel)
209-
if err = receiver.Start(ctx, d); err != nil {
231+
if err := receiver.Start(ctx, d); err != nil {
210232
d.logger.Warn(
211233
fmt.Sprintf("%q startup failed.", receiverID),
212234
zap.Error(err),
213235
)
236+
return err
214237
}
238+
defer func(rcvID component.ID, rcv otelcolreceiver.Logs) {
239+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
240+
cancels = append(cancels, cancel)
241+
if e := rcv.Shutdown(ctx); e != nil {
242+
d.logger.Warn(fmt.Sprintf("error shutting down receiver %q", rcvID), zap.Error(e))
243+
}
244+
}(receiverID, receiver)
215245
}
216246

217247
_, _ = fmt.Fprintf(os.Stderr, "Discovering for next %s...\n", d.duration)
@@ -221,26 +251,7 @@ func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
221251
}
222252
_, _ = fmt.Fprintf(os.Stderr, "Discovery complete.\n")
223253

224-
for receiverID, receiver := range discoveryReceivers {
225-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
226-
cancels = append(cancels, cancel)
227-
if e := receiver.Shutdown(ctx); e != nil {
228-
d.logger.Warn(fmt.Sprintf("error shutting down receiver %q", receiverID), zap.Error(e))
229-
}
230-
}
231-
for observerID, observer := range discoveryObservers {
232-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
233-
cancels = append(cancels, cancel)
234-
if e := observer.Shutdown(ctx); e != nil {
235-
d.logger.Warn(fmt.Sprintf("error shutting down observer %q", observerID), zap.Error(e))
236-
}
237-
}
238-
239-
discoveryConfig, err := d.discoveryConfig(cfg)
240-
if err != nil {
241-
return nil, fmt.Errorf("failed constructing discovery config: %w", err)
242-
}
243-
return discoveryConfig, nil
254+
return nil
244255
}
245256

246257
func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[component.ID]otelcolreceiver.Logs, map[component.ID]otelcolextension.Extension, error) {
@@ -449,11 +460,15 @@ func (d *discoverer) updateReceiverForObserver(receiverID component.ID, receiver
449460

450461
func factoryForObserverType(extType component.Type) (otelcolextension.Factory, error) {
451462
factories := map[component.Type]otelcolextension.Factory{
452-
component.MustNewType("docker_observer"): dockerobserver.NewFactory(),
453463
component.MustNewType("host_observer"): hostobserver.NewFactory(),
454464
component.MustNewType("k8s_observer"): k8sobserver.NewFactory(),
455465
component.MustNewType("ecs_task_observer"): ecstaskobserver.NewFactory(),
456466
}
467+
if runtime.GOOS != "windows" {
468+
// Docker observer currently always crashes on Windows with the default configuration.
469+
// The observer is being temporarily disabled until it is fixed upstream.
470+
factories[component.MustNewType("docker_observer")] = dockerobserver.NewFactory()
471+
}
457472
ef, ok := factories[extType]
458473
if !ok {
459474
return nil, fmt.Errorf("unsupported discovery observer %q. Please remove its .discovery.yaml from your config directory", extType)

0 commit comments

Comments
 (0)