diff --git a/.chloggen/k8s0bject-leader-election.yaml b/.chloggen/k8s0bject-leader-election.yaml new file mode 100644 index 0000000000000..0d717a0f0d7a6 --- /dev/null +++ b/.chloggen/k8s0bject-leader-election.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sobjectsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: k8sobject receiver support leader election + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [39054] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/k8sobjectsreceiver/README.md b/receiver/k8sobjectsreceiver/README.md index 9a4f962d29ee4..5797c6db35774 100644 --- a/receiver/k8sobjectsreceiver/README.md +++ b/receiver/k8sobjectsreceiver/README.md @@ -26,6 +26,7 @@ The following is example configuration ```yaml k8sobjects: auth_type: serviceAccount + k8s_leader_elector: k8s_leader_elector objects: - name: pods mode: pull @@ -61,6 +62,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount` use this config to specify the group to select. By default, it will select the first group. For example, `events` resource is available in both `v1` and `events.k8s.io/v1` APIGroup. In this case, it will select `v1` by default. +- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension The full list of settings exposed for this receiver are documented in [config.go](./config.go) diff --git a/receiver/k8sobjectsreceiver/config.go b/receiver/k8sobjectsreceiver/config.go index 005a7ee6060dd..f062a19064b6b 100644 --- a/receiver/k8sobjectsreceiver/config.go +++ b/receiver/k8sobjectsreceiver/config.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/component" "k8s.io/apimachinery/pkg/runtime/schema" apiWatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -61,6 +62,8 @@ type Config struct { Objects []*K8sObjectsConfig `mapstructure:"objects"` ErrorMode ErrorMode `mapstructure:"error_mode"` + K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"` + // For mocking purposes only. makeDiscoveryClient func() (discovery.ServerResourcesInterface, error) makeDynamicClient func() (dynamic.Interface, error) diff --git a/receiver/k8sobjectsreceiver/go.mod b/receiver/k8sobjectsreceiver/go.mod index 603d0fc2e589e..2fe713026d852 100644 --- a/receiver/k8sobjectsreceiver/go.mod +++ b/receiver/k8sobjectsreceiver/go.mod @@ -4,6 +4,7 @@ go 1.23.0 require ( github.com/google/uuid v1.6.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.125.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.125.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.125.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.125.0 @@ -96,6 +97,7 @@ require ( go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250509190408-4ca0f1829e0a // indirect go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250509190408-4ca0f1829e0a // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250509190408-4ca0f1829e0a // indirect + go.opentelemetry.io/collector/extension v1.31.1-0.20250509190408-4ca0f1829e0a // indirect go.opentelemetry.io/collector/extension/extensionauth v1.31.1-0.20250509190408-4ca0f1829e0a // indirect go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.1-0.20250509190408-4ca0f1829e0a // indirect go.opentelemetry.io/collector/featuregate v1.31.1-0.20250509190408-4ca0f1829e0a // indirect @@ -137,7 +139,7 @@ require ( k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) @@ -159,3 +161,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector diff --git a/receiver/k8sobjectsreceiver/go.sum b/receiver/k8sobjectsreceiver/go.sum index ed49a844db0db..d4099742e8712 100644 --- a/receiver/k8sobjectsreceiver/go.sum +++ b/receiver/k8sobjectsreceiver/go.sum @@ -354,8 +354,8 @@ go.opentelemetry.io/collector/consumer/consumertest v0.125.1-0.20250509190408-4c go.opentelemetry.io/collector/consumer/consumertest v0.125.1-0.20250509190408-4ca0f1829e0a/go.mod h1:vkHf3y85cFLDHARO/cTREVjLjOPAV+cQg7lkC44DWOY= go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250509190408-4ca0f1829e0a h1:WtOwFkF+GCZo1Sg2le2U7v+cLS3U+kqq2KvzlA8PaAA= go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250509190408-4ca0f1829e0a/go.mod h1:FX0G37r0W+wXRgxxFtwEJ4rlsCB+p0cIaxtU3C4hskw= -go.opentelemetry.io/collector/extension v1.31.0 h1:DaqSl50jOA3BGtqPfPtSGJy4XwyXtQwvemVl/L9fDb4= -go.opentelemetry.io/collector/extension v1.31.0/go.mod h1:SiRNOZIJ6R0JbHEvs3g84hPEmiys5CZyIlMOE1RQ85s= +go.opentelemetry.io/collector/extension v1.31.1-0.20250509190408-4ca0f1829e0a h1:/ISd2YKoGKQ1VxZ5M/pNhimdL4JjHPqn7+MePXhtlZM= +go.opentelemetry.io/collector/extension v1.31.1-0.20250509190408-4ca0f1829e0a/go.mod h1:SiRNOZIJ6R0JbHEvs3g84hPEmiys5CZyIlMOE1RQ85s= go.opentelemetry.io/collector/extension/extensionauth v1.31.1-0.20250509190408-4ca0f1829e0a h1:zeJXGiopic2u7T+EDXKKv4GULjX0iI8iemz7/kBYRJc= go.opentelemetry.io/collector/extension/extensionauth v1.31.1-0.20250509190408-4ca0f1829e0a/go.mod h1:qaGbjJ+33Xv8sx4cPv/OXmc/LcQORSVbzcAE6O1n31o= go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.125.0 h1:nbVD4x955iJOseijofn/FwdVcYEQZ08IM505Y/deJCs= @@ -364,6 +364,8 @@ go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.1-0.202505091 go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.1-0.20250509190408-4ca0f1829e0a/go.mod h1:yZYfdaxnDOCNWruM0GrF5lBBmFoBorAXqXtCeLrcllU= go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.125.0 h1:ht7rm4y4LM3tM1spkvahv+HlEylD5zg2+IfCirPD7A4= go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.125.0/go.mod h1:D68nA5XzO/OPeXrxzlLSx2z+tT8e6YyFxdfPQbMXGms= +go.opentelemetry.io/collector/extension/extensiontest v0.125.1-0.20250509190408-4ca0f1829e0a h1:ug9oYcvRJb7+8qT71iLOQ6DLipo8NomTao/vFUsB+Qk= +go.opentelemetry.io/collector/extension/extensiontest v0.125.1-0.20250509190408-4ca0f1829e0a/go.mod h1:HABANc94xQmUtOSZokG5E6Z02GrHfKYSkQqOz+oCpPQ= go.opentelemetry.io/collector/featuregate v1.31.1-0.20250509190408-4ca0f1829e0a h1:zVm4S4J6FpHBqKCCS3+82WjhQVjMCVezphaVMapz7Co= go.opentelemetry.io/collector/featuregate v1.31.1-0.20250509190408-4ca0f1829e0a/go.mod h1:Y/KsHbvREENKvvN9RlpiWk/IGBK+CATBYzIIpU7nccc= go.opentelemetry.io/collector/internal/sharedcomponent v0.125.1-0.20250509190408-4ca0f1829e0a h1:J993IX3r0Vv5wuZDdvDDA+gDnV2I/G5O7OyCejO4rOk= @@ -710,8 +712,8 @@ sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1 sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.3 h1:sCP7Vv3xx/CWIuTPVN38lUPx0uw0lcLfzaiDa8Ja01A= +sigs.k8s.io/structured-merge-diff/v4 v4.4.3/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 27381657f8d73..d9c3b7f66ca9a 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/watch" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata" ) @@ -75,7 +76,7 @@ func newReceiver(params receiver.Settings, config *Config, consumer consumer.Log }, nil } -func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error { +func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) error { client, err := kr.config.getDynamicClient() if err != nil { return err @@ -96,7 +97,7 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error for k := range validObjects { availableResource = append(availableResource, k) } - err := fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource) + err = fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource) if handlerErr := kr.handleError(err, ""); handlerErr != nil { return handlerErr } @@ -116,17 +117,46 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error } if len(validConfigs) == 0 { - err := errors.New("no valid Kubernetes objects found to watch") + err = errors.New("no valid Kubernetes objects found to watch") return err } - kr.setting.Logger.Info("Object Receiver started") - cctx, cancel := context.WithCancel(ctx) - kr.cancel = cancel + if kr.config.K8sLeaderElector != nil { + k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector] + if k8sLeaderElector == nil { + return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector) + } + + kr.setting.Logger.Info("registering the receiver in leader election") + elector, ok := k8sLeaderElector.(k8sleaderelector.LeaderElection) + if !ok { + return fmt.Errorf("the extension %T is not implement k8sleaderelector.LeaderElection", k8sLeaderElector) + } - for _, object := range validConfigs { - kr.start(cctx, object) + elector.SetCallBackFuncs( + func(ctx context.Context) { + cctx, cancel := context.WithCancel(ctx) + kr.cancel = cancel + for _, object := range validConfigs { + kr.start(cctx, object) + } + kr.setting.Logger.Info("Object Receiver started as leader") + }, + func() { + kr.setting.Logger.Info("no longer leader, stopping") + err = kr.Shutdown(context.Background()) + if err != nil { + kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err)) + } + }) + } else { + cctx, cancel := context.WithCancel(ctx) + kr.cancel = cancel + for _, object := range validConfigs { + kr.start(cctx, object) + } } + return nil } diff --git a/receiver/k8sobjectsreceiver/receiver_test.go b/receiver/k8sobjectsreceiver/receiver_test.go index adc2833921d56..04c79df88d81c 100644 --- a/receiver/k8sobjectsreceiver/receiver_test.go +++ b/receiver/k8sobjectsreceiver/receiver_test.go @@ -10,11 +10,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" apiWatch "k8s.io/apimachinery/pkg/watch" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata" ) @@ -294,3 +296,79 @@ func TestExcludeDeletedTrue(t *testing.T) { assert.NoError(t, r.Shutdown(ctx)) } + +func TestReceiverWithLeaderElection(t *testing.T) { + fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{} + fakeHost := &k8sleaderelectortest.FakeHost{ + FakeLeaderElection: fakeLeaderElection, + } + leaderElectorID := component.MustNewID("k8s_leader_elector") + + mockClient := newMockDynamicClient() + rCfg := createDefaultConfig().(*Config) + rCfg.makeDynamicClient = mockClient.getMockDynamicClient + rCfg.makeDiscoveryClient = getMockDiscoveryClient + rCfg.ErrorMode = PropagateError + rCfg.Objects = []*K8sObjectsConfig{ + { + Name: "pods", + Mode: PullMode, + }, + } + rCfg.K8sLeaderElector = &leaderElectorID + + r, err := newReceiver( + receivertest.NewNopSettings(metadata.Type), + rCfg, + consumertest.NewNop(), + ) + require.NoError(t, err) + kr := r.(*k8sobjectsreceiver) + sink := new(consumertest.LogsSink) + kr.consumer = sink + + // Setup k8s resources. + mockClient.createPods( + generatePod("pod1", "default", map[string]any{ + "environment": "production", + }, "1"), + ) + + err = kr.Start(context.Background(), fakeHost) + require.NoError(t, err) + + // elected leader + fakeLeaderElection.InvokeOnLeading() + + require.Eventually(t, func() bool { + // expect get 2 log records + return sink.LogRecordCount() == 1 + }, 20*time.Second, 100*time.Millisecond, + "logs not collected") + + // lost election + fakeLeaderElection.InvokeOnStopping() + + // mock create pod again which not collected + mockClient.createPods( + generatePod("pod1", "default", map[string]any{ + "environment": "production", + }, "1"), + ) + + // get back election + fakeLeaderElection.InvokeOnLeading() + + // mock create pod finally + mockClient.createPods( + generatePod("pod1", "default", map[string]any{ + "environment": "production", + }, "1"), + ) + + require.Eventually(t, func() bool { + // expect get 4 log records + return sink.LogRecordCount() == 2 + }, 20*time.Second, 100*time.Millisecond, + "logs not collected") +}