Skip to content

Commit 061eb3e

Browse files
author
Mikołaj Świątek
authored
[processor/k8sattributes] Only store necessary Pod data (open-telemetry#23272)
Only store Pod data we actually use for attributes in the informer store. By default, informers store all data about K8s objects, as they're primarily intended to act as a local cache for the API Server. For our use case, most of that data is unnecessary, and it eats a significant amount of memory in larger clusters. This PR uses a transform function to remove the unnecessary data from the informer store. I've measured the gains synthetically, and we use nearly 70% less memory per stored Pod. I haven't included the benchmark function in this PR, as it's a bit complicated and hacky, and I'm not convinced there's value in adding it to the codebase permanently.
1 parent ab42d69 commit 061eb3e

File tree

4 files changed

+163
-3
lines changed

4 files changed

+163
-3
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: enhancement
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: k8sattributesprocessor
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Store only necessary Pod data
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [23226]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.uber.org/zap"
1515
apps_v1 "k8s.io/api/apps/v1"
1616
api_v1 "k8s.io/api/core/v1"
17+
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1718
"k8s.io/apimachinery/pkg/fields"
1819
"k8s.io/apimachinery/pkg/labels"
1920
"k8s.io/apimachinery/pkg/selection"
@@ -108,6 +109,17 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
108109
}
109110

110111
c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
112+
err = c.informer.SetTransform(
113+
func(object interface{}) (interface{}, error) {
114+
originalPod, success := object.(*api_v1.Pod)
115+
if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing
116+
return object, nil
117+
}
118+
119+
return removeUnnecessaryPodData(originalPod, c.Rules), nil
120+
},
121+
)
122+
111123
if c.extractNamespaceLabelsAnnotations() {
112124
c.namespaceInformer = newNamespaceInformer(c.kc)
113125
} else {
@@ -398,6 +410,100 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
398410
return tags
399411
}
400412

413+
// This function removes all data from the Pod except what is required by extraction rules and pod association
414+
func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Pod {
415+
416+
// name, namespace, uid, start time and ip are needed for identifying Pods
417+
// there's room to optimize this further, it's kept this way for simplicity
418+
transformedPod := api_v1.Pod{
419+
ObjectMeta: meta_v1.ObjectMeta{
420+
Name: pod.GetName(),
421+
Namespace: pod.GetNamespace(),
422+
UID: pod.GetUID(),
423+
},
424+
Status: api_v1.PodStatus{
425+
PodIP: pod.Status.PodIP,
426+
StartTime: pod.Status.StartTime,
427+
},
428+
Spec: api_v1.PodSpec{
429+
HostNetwork: pod.Spec.HostNetwork,
430+
},
431+
}
432+
433+
if rules.StartTime {
434+
transformedPod.SetCreationTimestamp(pod.GetCreationTimestamp())
435+
}
436+
437+
if rules.PodUID {
438+
transformedPod.SetUID(pod.GetUID())
439+
}
440+
441+
if rules.Node {
442+
transformedPod.Spec.NodeName = pod.Spec.NodeName
443+
}
444+
445+
if rules.PodHostName {
446+
transformedPod.Spec.Hostname = pod.Spec.Hostname
447+
}
448+
449+
if needContainerAttributes(rules) {
450+
for _, containerStatus := range pod.Status.ContainerStatuses {
451+
transformedPod.Status.ContainerStatuses = append(
452+
transformedPod.Status.ContainerStatuses,
453+
api_v1.ContainerStatus{
454+
Name: containerStatus.Name,
455+
ContainerID: containerStatus.ContainerID,
456+
RestartCount: containerStatus.RestartCount,
457+
},
458+
)
459+
}
460+
for _, containerStatus := range pod.Status.InitContainerStatuses {
461+
transformedPod.Status.InitContainerStatuses = append(
462+
transformedPod.Status.InitContainerStatuses,
463+
api_v1.ContainerStatus{
464+
Name: containerStatus.Name,
465+
ContainerID: containerStatus.ContainerID,
466+
RestartCount: containerStatus.RestartCount,
467+
},
468+
)
469+
}
470+
471+
removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
472+
transformedContainer := api_v1.Container{}
473+
transformedContainer.Name = c.Name // we always need the name, it's used for identification
474+
if rules.ContainerImageName || rules.ContainerImageTag {
475+
transformedContainer.Image = c.Image
476+
}
477+
return transformedContainer
478+
}
479+
480+
for _, container := range pod.Spec.Containers {
481+
transformedPod.Spec.Containers = append(
482+
transformedPod.Spec.Containers, removeUnnecessaryContainerData(container),
483+
)
484+
}
485+
for _, container := range pod.Spec.InitContainers {
486+
transformedPod.Spec.InitContainers = append(
487+
transformedPod.Spec.InitContainers, removeUnnecessaryContainerData(container),
488+
)
489+
}
490+
}
491+
492+
if len(rules.Labels) > 0 {
493+
transformedPod.Labels = pod.Labels
494+
}
495+
496+
if len(rules.Annotations) > 0 {
497+
transformedPod.Annotations = pod.Annotations
498+
}
499+
500+
if rules.IncludesOwnerMetadata() {
501+
transformedPod.SetOwnerReferences(pod.GetOwnerReferences())
502+
}
503+
504+
return &transformedPod
505+
}
506+
401507
func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers {
402508
containers := PodContainers{
403509
ByID: map[string]*Container{},

processor/k8sattributesprocessor/internal/kube/client_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,12 @@ func TestExtractionRules(t *testing.T) {
846846
for _, tc := range testCases {
847847
t.Run(tc.name, func(t *testing.T) {
848848
c.Rules = tc.rules
849+
850+
// manually call the data removal function here
851+
// normally the informer does this, but fully emulating the informer in this test is annoying
852+
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
849853
c.handleReplicaSetAdd(replicaset)
850-
c.handlePodAdd(pod)
854+
c.handlePodAdd(transformedPod)
851855
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
852856
require.True(t, ok)
853857

@@ -996,8 +1000,12 @@ func TestReplicaSetExtractionRules(t *testing.T) {
9961000
t.Run(tc.name, func(t *testing.T) {
9971001
c.Rules = tc.rules
9981002
replicaset.OwnerReferences = tc.ownerReferences
1003+
1004+
// manually call the data removal function here
1005+
// normally the informer does this, but fully emulating the informer in this test is annoying
1006+
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
9991007
c.handleReplicaSetAdd(replicaset)
1000-
c.handlePodAdd(pod)
1008+
c.handlePodAdd(transformedPod)
10011009
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
10021010
require.True(t, ok)
10031011

@@ -1462,7 +1470,10 @@ func Test_extractPodContainersAttributes(t *testing.T) {
14621470
for _, tt := range tests {
14631471
t.Run(tt.name, func(t *testing.T) {
14641472
c := WatchClient{Rules: tt.rules}
1465-
assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod))
1473+
// manually call the data removal function here
1474+
// normally the informer does this, but fully emulating the informer in this test is annoying
1475+
transformedPod := removeUnnecessaryPodData(&tt.pod, c.Rules)
1476+
assert.Equal(t, tt.want, c.extractPodContainersAttributes(transformedPod))
14661477
})
14671478
}
14681479
}

processor/k8sattributesprocessor/internal/kube/kube.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,29 @@ type ExtractionRules struct {
208208
Labels []FieldExtractionRule
209209
}
210210

211+
// IncludesOwnerMetadata determines whether the ExtractionRules include metadata about Pod Owners
212+
func (rules *ExtractionRules) IncludesOwnerMetadata() bool {
213+
rulesNeedingOwnerMetadata := []bool{
214+
rules.CronJobName,
215+
rules.DeploymentName,
216+
rules.DeploymentUID,
217+
rules.DaemonSetUID,
218+
rules.DaemonSetName,
219+
rules.JobName,
220+
rules.JobUID,
221+
rules.ReplicaSetID,
222+
rules.ReplicaSetName,
223+
rules.StatefulSetUID,
224+
rules.StatefulSetName,
225+
}
226+
for _, ruleEnabled := range rulesNeedingOwnerMetadata {
227+
if ruleEnabled {
228+
return true
229+
}
230+
}
231+
return false
232+
}
233+
211234
// FieldExtractionRule is used to specify which fields to extract from pod fields
212235
// and inject into spans as attributes.
213236
type FieldExtractionRule struct {

0 commit comments

Comments
 (0)