Skip to content

Commit 01c2963

Browse files
FrapschenatoulmeChrsMark
authored andcommitted
[receiver/k8sobjectsreceiver] k8sobject receiver support leader election (open-telemetry#39054)
#### Description As open-telemetry#38426 move the k8sleaderelector from development to alpha, we can use it in k8sobjectsreceiver. --------- Co-authored-by: Antoine Toulme <[email protected]> Co-authored-by: Christos Markou <[email protected]>
1 parent fcb73fa commit 01c2963

File tree

7 files changed

+159
-13
lines changed

7 files changed

+159
-13
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobjectsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: k8sobject receiver support leader election
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39054]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8sobjectsreceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The following is example configuration
2626
```yaml
2727
k8sobjects:
2828
auth_type: serviceAccount
29+
k8s_leader_elector: k8s_leader_elector
2930
objects:
3031
- name: pods
3132
mode: pull
@@ -61,6 +62,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
6162
use this config to specify the group to select. By default, it will select the first group.
6263
For example, `events` resource is available in both `v1` and `events.k8s.io/v1` APIGroup. In
6364
this case, it will select `v1` by default.
65+
- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension
6466

6567

6668
The full list of settings exposed for this receiver are documented in [config.go](./config.go)

receiver/k8sobjectsreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"time"
1111

12+
"go.opentelemetry.io/collector/component"
1213
"k8s.io/apimachinery/pkg/runtime/schema"
1314
apiWatch "k8s.io/apimachinery/pkg/watch"
1415
"k8s.io/client-go/discovery"
@@ -61,6 +62,8 @@ type Config struct {
6162
Objects []*K8sObjectsConfig `mapstructure:"objects"`
6263
ErrorMode ErrorMode `mapstructure:"error_mode"`
6364

65+
K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"`
66+
6467
// For mocking purposes only.
6568
makeDiscoveryClient func() (discovery.ServerResourcesInterface, error)
6669
makeDynamicClient func() (dynamic.Interface, error)

receiver/k8sobjectsreceiver/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/google/uuid v1.6.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.125.0
78
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.125.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.125.0
910
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.125.0
@@ -95,6 +96,7 @@ require (
9596
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250509190408-4ca0f1829e0a // indirect
9697
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250509190408-4ca0f1829e0a // indirect
9798
go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250509190408-4ca0f1829e0a // indirect
99+
go.opentelemetry.io/collector/extension v1.31.1-0.20250509190408-4ca0f1829e0a // indirect
98100
go.opentelemetry.io/collector/extension/extensionauth v1.31.1-0.20250509190408-4ca0f1829e0a // indirect
99101
go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.1-0.20250509190408-4ca0f1829e0a // indirect
100102
go.opentelemetry.io/collector/featuregate v1.31.1-0.20250509190408-4ca0f1829e0a // indirect
@@ -136,7 +138,7 @@ require (
136138
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
137139
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
138140
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
139-
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
141+
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
140142
sigs.k8s.io/yaml v1.4.0 // indirect
141143
)
142144

@@ -158,3 +160,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest
158160
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
159161

160162
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
163+
164+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector

receiver/k8sobjectsreceiver/go.sum

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/client-go/tools/cache"
2525
"k8s.io/client-go/tools/watch"
2626

27+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
2728
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata"
2829
)
2930

@@ -75,7 +76,7 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log
7576
}, nil
7677
}
7778

78-
func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error {
79+
func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) error {
7980
client, err := kr.config.getDynamicClient()
8081
if err != nil {
8182
return err
@@ -96,7 +97,7 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
9697
for k := range validObjects {
9798
availableResource = append(availableResource, k)
9899
}
99-
err := fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource)
100+
err = fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource)
100101
if handlerErr := kr.handleError(err, ""); handlerErr != nil {
101102
return handlerErr
102103
}
@@ -116,17 +117,46 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
116117
}
117118

118119
if len(validConfigs) == 0 {
119-
err := errors.New("no valid Kubernetes objects found to watch")
120+
err = errors.New("no valid Kubernetes objects found to watch")
120121
return err
121122
}
122123

123-
kr.setting.Logger.Info("Object Receiver started")
124-
cctx, cancel := context.WithCancel(ctx)
125-
kr.cancel = cancel
124+
if kr.config.K8sLeaderElector != nil {
125+
k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector]
126+
if k8sLeaderElector == nil {
127+
return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector)
128+
}
129+
130+
kr.setting.Logger.Info("registering the receiver in leader election")
131+
elector, ok := k8sLeaderElector.(k8sleaderelector.LeaderElection)
132+
if !ok {
133+
return fmt.Errorf("the extension %T is not implement k8sleaderelector.LeaderElection", k8sLeaderElector)
134+
}
126135

127-
for _, object := range validConfigs {
128-
kr.start(cctx, object)
136+
elector.SetCallBackFuncs(
137+
func(ctx context.Context) {
138+
cctx, cancel := context.WithCancel(ctx)
139+
kr.cancel = cancel
140+
for _, object := range validConfigs {
141+
kr.start(cctx, object)
142+
}
143+
kr.setting.Logger.Info("Object Receiver started as leader")
144+
},
145+
func() {
146+
kr.setting.Logger.Info("no longer leader, stopping")
147+
err = kr.Shutdown(context.Background())
148+
if err != nil {
149+
kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err))
150+
}
151+
})
152+
} else {
153+
cctx, cancel := context.WithCancel(ctx)
154+
kr.cancel = cancel
155+
for _, object := range validConfigs {
156+
kr.start(cctx, object)
157+
}
129158
}
159+
130160
return nil
131161
}
132162

receiver/k8sobjectsreceiver/receiver_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/component/componenttest"
1415
"go.opentelemetry.io/collector/consumer/consumertest"
1516
"go.opentelemetry.io/collector/receiver/receivertest"
1617
apiWatch "k8s.io/apimachinery/pkg/watch"
1718

19+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest"
1820
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata"
1921
)
2022

@@ -294,3 +296,79 @@ func TestExcludeDeletedTrue(t *testing.T) {
294296

295297
assert.NoError(t, r.Shutdown(ctx))
296298
}
299+
300+
func TestReceiverWithLeaderElection(t *testing.T) {
301+
fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
302+
fakeHost := &k8sleaderelectortest.FakeHost{
303+
FakeLeaderElection: fakeLeaderElection,
304+
}
305+
leaderElectorID := component.MustNewID("k8s_leader_elector")
306+
307+
mockClient := newMockDynamicClient()
308+
rCfg := createDefaultConfig().(*Config)
309+
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
310+
rCfg.makeDiscoveryClient = getMockDiscoveryClient
311+
rCfg.ErrorMode = PropagateError
312+
rCfg.Objects = []*K8sObjectsConfig{
313+
{
314+
Name: "pods",
315+
Mode: PullMode,
316+
},
317+
}
318+
rCfg.K8sLeaderElector = &leaderElectorID
319+
320+
r, err := newReceiver(
321+
receivertest.NewNopSettings(metadata.Type),
322+
rCfg,
323+
consumertest.NewNop(),
324+
)
325+
require.NoError(t, err)
326+
kr := r.(*k8sobjectsreceiver)
327+
sink := new(consumertest.LogsSink)
328+
kr.consumer = sink
329+
330+
// Setup k8s resources.
331+
mockClient.createPods(
332+
generatePod("pod1", "default", map[string]any{
333+
"environment": "production",
334+
}, "1"),
335+
)
336+
337+
err = kr.Start(context.Background(), fakeHost)
338+
require.NoError(t, err)
339+
340+
// elected leader
341+
fakeLeaderElection.InvokeOnLeading()
342+
343+
require.Eventually(t, func() bool {
344+
// expect get 2 log records
345+
return sink.LogRecordCount() == 1
346+
}, 20*time.Second, 100*time.Millisecond,
347+
"logs not collected")
348+
349+
// lost election
350+
fakeLeaderElection.InvokeOnStopping()
351+
352+
// mock create pod again which not collected
353+
mockClient.createPods(
354+
generatePod("pod1", "default", map[string]any{
355+
"environment": "production",
356+
}, "1"),
357+
)
358+
359+
// get back election
360+
fakeLeaderElection.InvokeOnLeading()
361+
362+
// mock create pod finally
363+
mockClient.createPods(
364+
generatePod("pod1", "default", map[string]any{
365+
"environment": "production",
366+
}, "1"),
367+
)
368+
369+
require.Eventually(t, func() bool {
370+
// expect get 4 log records
371+
return sink.LogRecordCount() == 2
372+
}, 20*time.Second, 100*time.Millisecond,
373+
"logs not collected")
374+
}

0 commit comments

Comments
 (0)