Skip to content

Commit 04e745e

Browse files
authored
[processor/k8sattributes] Wait for ReplicaSet informer before starting pod informer (#37138)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR is an alternative approach to the previous fix made for #37056, which did not fully solve the issue of the deployment name not being added to a pod after the initial informer sync <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #37056 <!--Describe what testing was performed and which tests were added.--> #### Testing Re-enabled the flaky E2E test --------- Signed-off-by: Florian Bacher <[email protected]>
1 parent afbb05f commit 04e745e

File tree

3 files changed

+60
-16
lines changed

3 files changed

+60
-16
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: k8sattributesprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Wait for the other informers to complete their initial sync before starting the pod informers
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37056]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/k8sattributesprocessor/e2e_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,8 +1099,6 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) {
10991099
// make docker-otelcontribcol
11001100
// KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest
11011101
func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) {
1102-
// TODO: Re-enable this test when the issue being tested here is fully solved: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056
1103-
t.Skip("Skipping test as https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 is not fully solved yet")
11041102
testDir := filepath.Join("testdata", "e2e", "clusterrbac")
11051103

11061104
k8sClient, err := k8stest.NewK8sClient(testKubeConfig)

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ func New(
209209
// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
210210
func (c *WatchClient) Start() error {
211211
synced := make([]cache.InformerSynced, 0)
212-
213212
// start the replicaSet informer first, as the replica sets need to be
214213
// present at the time the pods are handled, to correctly establish the connection between pods and deployments
215214
if c.Rules.DeploymentName || c.Rules.DeploymentUID {
@@ -225,18 +224,7 @@ func (c *WatchClient) Start() error {
225224
go c.replicasetInformer.Run(c.stopCh)
226225
}
227226

228-
reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
229-
AddFunc: c.handlePodAdd,
230-
UpdateFunc: c.handlePodUpdate,
231-
DeleteFunc: c.handlePodDelete,
232-
})
233-
if err != nil {
234-
return err
235-
}
236-
synced = append(synced, reg.HasSynced)
237-
go c.informer.Run(c.stopCh)
238-
239-
reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
227+
reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
240228
AddFunc: c.handleNamespaceAdd,
241229
UpdateFunc: c.handleNamespaceUpdate,
242230
DeleteFunc: c.handleNamespaceDelete,
@@ -260,13 +248,28 @@ func (c *WatchClient) Start() error {
260248
go c.nodeInformer.Run(c.stopCh)
261249
}
262250

251+
reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
252+
AddFunc: c.handlePodAdd,
253+
UpdateFunc: c.handlePodUpdate,
254+
DeleteFunc: c.handlePodDelete,
255+
})
256+
if err != nil {
257+
return err
258+
}
259+
260+
// start the podInformer with the prerequisite of the other informers to be finished first
261+
go c.runInformerWithDependencies(c.informer, synced)
262+
263263
if c.waitForMetadata {
264264
timeoutCh := make(chan struct{})
265265
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
266266
close(timeoutCh)
267267
})
268268
defer t.Stop()
269-
if !cache.WaitForCacheSync(timeoutCh, synced...) {
269+
// Wait for the Pod informer to be completed.
270+
// The other informers will already be finished at this point, as the pod informer
271+
// waits for them be finished before it can run
272+
if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) {
270273
return errors.New("failed to wait for caches to sync")
271274
}
272275
}
@@ -1123,6 +1126,22 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) {
11231126
return nil, false
11241127
}
11251128

1129+
// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete
1130+
// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer
1131+
// to be finished to correctly establish the connection to the replicaset/deployment it belongs to.
1132+
func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, dependencies []cache.InformerSynced) {
1133+
if len(dependencies) > 0 {
1134+
timeoutCh := make(chan struct{})
1135+
// TODO hard coding the timeout for now, check if we should make this configurable
1136+
t := time.AfterFunc(5*time.Second, func() {
1137+
close(timeoutCh)
1138+
})
1139+
defer t.Stop()
1140+
cache.WaitForCacheSync(timeoutCh, dependencies...)
1141+
}
1142+
informer.Run(c.stopCh)
1143+
}
1144+
11261145
// ignoreDeletedFinalStateUnknown returns the object wrapped in
11271146
// DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do
11281147
// not need the additional context.

0 commit comments

Comments
 (0)