Skip to content

Commit 81d5190

Browse files
committed
[receiver/k8sobjectsreceiver] k8sobject receiver support leader election
Signed-off-by: Murphy Chen <[email protected]>
1 parent 4930224 commit 81d5190

File tree

6 files changed

+65
-6
lines changed

6 files changed

+65
-6
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sobjectsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: k8sobject receiver support leader election
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39054]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8sobjectsreceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The following is example configuration
2525
```yaml
2626
k8sobjects:
2727
auth_type: serviceAccount
28+
k8s_leader_elector: k8s_leader_elector
2829
objects:
2930
- name: pods
3031
mode: pull
@@ -56,6 +57,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
5657
use this config to specify the group to select. By default, it will select the first group.
5758
For example, `events` resource is available in both `v1` and `events.k8s.io/v1` APIGroup. In
5859
this case, it will select `v1` by default.
60+
- `k8s_leader_elector` (default: none): if specified, will enable Leader Extension by using `k8sleaderelector` extension
5961

6062

6163
The full list of settings exposed for this receiver are documented in [config.go](./config.go)

receiver/k8sobjectsreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"time"
1111

12+
"go.opentelemetry.io/collector/component"
1213
"k8s.io/apimachinery/pkg/runtime/schema"
1314
apiWatch "k8s.io/apimachinery/pkg/watch"
1415
"k8s.io/client-go/discovery"
@@ -52,6 +53,8 @@ type Config struct {
5253

5354
Objects []*K8sObjectsConfig `mapstructure:"objects"`
5455

56+
K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"`
57+
5558
// For mocking purposes only.
5659
makeDiscoveryClient func() (discovery.ServerResourcesInterface, error)
5760
makeDynamicClient func() (dynamic.Interface, error)

receiver/k8sobjectsreceiver/go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/google/uuid v1.6.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.122.0
78
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.122.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.122.0
910
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0
@@ -130,7 +131,7 @@ require (
130131
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
131132
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
132133
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
133-
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
134+
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
134135
sigs.k8s.io/yaml v1.4.0 // indirect
135136
)
136137

@@ -152,3 +153,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest
152153
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
153154

154155
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
156+
157+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector

receiver/k8sobjectsreceiver/go.sum

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/client-go/tools/cache"
2525
"k8s.io/client-go/tools/watch"
2626

27+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
2728
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata"
2829
)
2930

@@ -66,17 +67,38 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log
6667
}, nil
6768
}
6869

69-
func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error {
70+
func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) error {
7071
client, err := kr.config.getDynamicClient()
7172
if err != nil {
7273
return err
7374
}
7475
kr.client = client
75-
kr.setting.Logger.Info("Object Receiver started")
76-
7776
cctx, cancel := context.WithCancel(ctx)
7877
kr.cancel = cancel
7978

79+
if kr.config.K8sLeaderElector != nil {
80+
k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector]
81+
if k8sLeaderElector == nil {
82+
return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector)
83+
}
84+
85+
kr.setting.Logger.Info("Trying to become the leader")
86+
elector := k8sLeaderElector.(k8sleaderelector.LeaderElection)
87+
elector.SetCallBackFuncs(
88+
func(_ context.Context) {
89+
kr.setting.Logger.Info("Object Receiver started as leader")
90+
for _, object := range kr.config.Objects {
91+
kr.start(cctx, object)
92+
}
93+
},
94+
func() {
95+
kr.setting.Logger.Info("Object Receiver stopped as leader lose")
96+
_ = kr.Shutdown(context.Background())
97+
})
98+
return nil
99+
}
100+
101+
kr.setting.Logger.Info("Object Receiver started")
80102
for _, object := range kr.config.Objects {
81103
kr.start(cctx, object)
82104
}

0 commit comments

Comments
 (0)