Skip to content

Commit 4ba000d

Browse files
authored
[beatreceiver] - add manager for otel mode (#46539)
* iniital commit * cleanupt * license * comments * comments * comments * rename * comments and unit test case * go.mod * lint and notice * use existing flag * go.sum * log input test and UnderAgent test * test disabled * stub and cleanup * comment * fixes and comments
1 parent d3be9bf commit 4ba000d

File tree

6 files changed

+366
-228
lines changed

6 files changed

+366
-228
lines changed

NOTICE.txt

Lines changed: 212 additions & 212 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ require (
238238
github.com/cilium/ebpf v0.19.0
239239
github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension v0.2.0
240240
go.opentelemetry.io/collector/client v1.41.0
241+
go.opentelemetry.io/collector/consumer/consumertest v0.135.0
241242
go.opentelemetry.io/collector/exporter v0.135.0
242243
go.opentelemetry.io/collector/exporter/exporterhelper v0.135.0
243244
go.opentelemetry.io/collector/exporter/exportertest v0.135.0
@@ -441,7 +442,6 @@ require (
441442
go.opentelemetry.io/collector/connector/connectortest v0.135.0 // indirect
442443
go.opentelemetry.io/collector/connector/xconnector v0.135.0 // indirect
443444
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.135.0 // indirect
444-
go.opentelemetry.io/collector/consumer/consumertest v0.135.0 // indirect
445445
go.opentelemetry.io/collector/consumer/xconsumer v0.135.0 // indirect
446446
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.135.0 // indirect
447447
go.opentelemetry.io/collector/exporter/xexporter v0.135.0 // indirect

libbeat/management/management.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func NewManager(cfg *config.C, registry *reload.Registry, logger *logp.Logger) (
101101
return managerFactory(cfg, registry, logger)
102102
}
103103
}
104-
return &fallbackManager{
104+
return &FallbackManager{
105105
logger: logger.Named("mgmt"),
106106
status: status.Unknown,
107107
msg: "",
@@ -118,8 +118,8 @@ func SetManagerFactory(factory ManagerFactory) {
118118
managerFactory = factory
119119
}
120120

121-
// fallbackManager, fallback when no manager is present
122-
type fallbackManager struct {
121+
// FallbackManager, fallback when no manager is present
122+
type FallbackManager struct {
123123
logger *logp.Logger
124124
lock sync.Mutex
125125
status status.Status
@@ -128,7 +128,7 @@ type fallbackManager struct {
128128
stopOnce sync.Once
129129
}
130130

131-
func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
131+
func (n *FallbackManager) UpdateStatus(status status.Status, msg string) {
132132
n.lock.Lock()
133133
defer n.lock.Unlock()
134134
if n.status != status || n.msg != msg {
@@ -138,13 +138,13 @@ func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
138138
}
139139
}
140140

141-
func (n *fallbackManager) SetStopCallback(f func()) {
141+
func (n *FallbackManager) SetStopCallback(f func()) {
142142
n.lock.Lock()
143143
n.stopFunc = f
144144
n.lock.Unlock()
145145
}
146146

147-
func (n *fallbackManager) Stop() {
147+
func (n *FallbackManager) Stop() {
148148
n.lock.Lock()
149149
defer n.lock.Unlock()
150150
if n.stopFunc != nil {
@@ -162,12 +162,12 @@ func (n *fallbackManager) Stop() {
162162
// the nilManager is still used for shutdown on some cases,
163163
// but that does not mean the Beat is being managed externally,
164164
// hence it will always return false.
165-
func (n *fallbackManager) Enabled() bool { return false }
166-
func (n *fallbackManager) AgentInfo() client.AgentInfo { return client.AgentInfo{} }
167-
func (n *fallbackManager) Start() error { return nil }
168-
func (n *fallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
169-
func (n *fallbackManager) RegisterAction(action client.Action) {}
170-
func (n *fallbackManager) UnregisterAction(action client.Action) {}
171-
func (n *fallbackManager) SetPayload(map[string]interface{}) {}
172-
func (n *fallbackManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
165+
func (n *FallbackManager) Enabled() bool { return false }
166+
func (n *FallbackManager) AgentInfo() client.AgentInfo { return client.AgentInfo{} }
167+
func (n *FallbackManager) Start() error { return nil }
168+
func (n *FallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
169+
func (n *FallbackManager) RegisterAction(action client.Action) {}
170+
func (n *FallbackManager) UnregisterAction(action client.Action) {}
171+
func (n *FallbackManager) SetPayload(map[string]interface{}) {}
172+
func (n *FallbackManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
173173
}

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
2525
"github.com/elastic/beats/v7/libbeat/publisher/processing"
2626
"github.com/elastic/beats/v7/libbeat/version"
27+
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/otelmanager"
2728
"github.com/elastic/elastic-agent-libs/config"
2829
"github.com/elastic/elastic-agent-libs/keystore"
2930
"github.com/elastic/elastic-agent-libs/logp"
@@ -187,8 +188,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
187188
b.Info.FQDN = fqdn
188189
}
189190

191+
// register NewOtelManager
192+
management.SetManagerFactory(otelmanager.NewOtelManager)
193+
190194
// initialize config manager
191-
m, err := management.NewManager(b.Config.Management, b.Registry, logger)
195+
oCfg, _ := cfg.Child("management.otel", -1)
196+
m, err := management.NewManager(oCfg, b.Registry, logger)
192197
if err != nil {
193198
return nil, fmt.Errorf("error creating new manager: %w", err)
194199
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package instance
6+
7+
import (
8+
"maps"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/consumer/consumertest"
14+
"go.uber.org/zap/zapcore"
15+
16+
"github.com/elastic/beats/v7/filebeat/cmd"
17+
"github.com/elastic/beats/v7/filebeat/input/log"
18+
"github.com/elastic/beats/v7/libbeat/management"
19+
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/otelmanager"
20+
conf "github.com/elastic/elastic-agent-libs/config"
21+
)
22+
23+
func TestManager(t *testing.T) {
24+
tmpDir := t.TempDir()
25+
cfg := map[string]any{
26+
"filebeat": map[string]any{
27+
"inputs": []map[string]any{
28+
{
29+
"type": "benchmark",
30+
"enabled": true,
31+
"message": "test",
32+
"count": 10,
33+
},
34+
},
35+
},
36+
"output": map[string]any{
37+
"otelconsumer": map[string]any{},
38+
},
39+
"path.home": tmpDir,
40+
}
41+
t.Run("otel management disabled - key missing", func(t *testing.T) {
42+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
43+
assert.NoError(t, err)
44+
assert.NotNil(t, beat.Manager)
45+
// it should fallback to FallbackManager if key is missing
46+
assert.IsType(t, beat.Manager, &management.FallbackManager{})
47+
assert.False(t, management.UnderAgent())
48+
})
49+
t.Run("otel management enabled", func(t *testing.T) {
50+
tmpCfg := map[string]any{}
51+
maps.Copy(tmpCfg, cfg)
52+
tmpCfg["management.otel.enabled"] = true
53+
defer func() {
54+
management.SetUnderAgent(false) // reset to false
55+
}()
56+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
57+
assert.NoError(t, err)
58+
assert.NotNil(t, beat.Manager)
59+
assert.IsType(t, beat.Manager, &otelmanager.OtelManager{})
60+
assert.True(t, management.UnderAgent())
61+
62+
// test if log input is enabled
63+
cfg, err := conf.NewConfigFrom(`
64+
type: "log"`)
65+
require.NoError(t, err)
66+
assert.True(t, log.AllowDeprecatedUse(cfg))
67+
})
68+
t.Run("otel management disabled", func(t *testing.T) {
69+
tmpCfg := map[string]any{}
70+
maps.Copy(tmpCfg, cfg)
71+
tmpCfg["management.otel.enabled"] = false
72+
defer func() {
73+
management.SetUnderAgent(false) // reset to false
74+
}()
75+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
76+
assert.NoError(t, err)
77+
assert.NotNil(t, beat.Manager)
78+
assert.IsType(t, beat.Manager, &management.FallbackManager{})
79+
assert.False(t, management.UnderAgent())
80+
81+
// test if log input is disabled
82+
cfg, err := conf.NewConfigFrom(`
83+
type: "log"`)
84+
require.NoError(t, err)
85+
assert.False(t, log.AllowDeprecatedUse(cfg))
86+
})
87+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package otelmanager
6+
7+
import (
8+
"github.com/elastic/beats/v7/libbeat/common/reload"
9+
"github.com/elastic/beats/v7/libbeat/management"
10+
"github.com/elastic/beats/v7/libbeat/management/status"
11+
"github.com/elastic/elastic-agent-client/v7/pkg/client"
12+
"github.com/elastic/elastic-agent-libs/config"
13+
"github.com/elastic/elastic-agent-libs/logp"
14+
)
15+
16+
var _ management.Manager = (*OtelManager)(nil)
17+
18+
func NewOtelManager(cfg *config.C, registry *reload.Registry, logger *logp.Logger) (management.Manager, error) {
19+
management.SetUnderAgent(true)
20+
return &OtelManager{}, nil
21+
}
22+
23+
// OtelManager is the main manager for managing beatreceivers
24+
type OtelManager struct{}
25+
26+
func (n *OtelManager) UpdateStatus(_ status.Status, _ string) {
27+
// a stub implemtation for now.
28+
// TODO(@VihasMakwana): Explore the option to tidy and refactor the status reporting for beatsreceivers.
29+
}
30+
31+
func (n *OtelManager) SetStopCallback(func()) {
32+
}
33+
34+
func (n *OtelManager) Stop() {}
35+
36+
// Enabled returns false because many places inside beats call manager.Enabled() for various purposes
37+
// Returning true might lead to side effects.
38+
func (n *OtelManager) Enabled() bool { return false }
39+
func (n *OtelManager) AgentInfo() client.AgentInfo { return client.AgentInfo{} }
40+
func (n *OtelManager) Start() error { return nil }
41+
func (n *OtelManager) CheckRawConfig(cfg *config.C) error { return nil }
42+
func (n *OtelManager) RegisterAction(action client.Action) {}
43+
func (n *OtelManager) UnregisterAction(action client.Action) {}
44+
func (n *OtelManager) SetPayload(map[string]interface{}) {}
45+
func (n *OtelManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
46+
}

0 commit comments

Comments
 (0)