Skip to content

Commit 23b0a2d

Browse files
haoqixuTylerHelmuth
authored andcommitted
[processor/k8sattribute] Support adding labels and annotations from node (open-telemetry#28570)
**Description:** support adding labels and annotations from the node as additional resource attributes on telemetry processed through the `k8sattributes` processor. **Link to tracking Issue:** Resolve open-telemetry#22620 --------- Co-authored-by: Tyler Helmuth <[email protected]>
1 parent 8598d73 commit 23b0a2d

File tree

17 files changed

+536
-18
lines changed

17 files changed

+536
-18
lines changed

.chloggen/feat-22620.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/k8sattribute
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: support adding labels and annotations from node
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [22620]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

.github/workflows/configs/e2e-kind-config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,8 @@ kubeadmConfigPatches:
44
- |
55
kind: KubeletConfiguration
66
serverTLSBootstrap: true
7+
nodes:
8+
- role: control-plane
9+
labels:
10+
# used in k8sattributesprocessor e2e test
11+
foo: too

processor/k8sattributesprocessor/README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ Additional container level attributes can be extracted provided that certain res
8888
instance. If it's not set, the latest container instance will be used:
8989
- container.id (not added by default, has to be specified in `metadata`)
9090

91-
The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods and namespaces.
92-
The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys.
93-
This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs.
91+
The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.
92+
The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace/Node annotations/labels is configured via "annotations" and "labels" keys.
93+
This config represents a list of annotations/labels that are extracted from pods/namespaces/nodes and added to spans, metrics and logs.
9494
Each item is specified as a config of tag_name (representing the tag name to tag the spans with),
9595
key (representing the key used to extract value) and from (representing the kubernetes object used to extract the value).
96-
The "from" field has only two possible values "pod" and "namespace" and defaults to "pod" if none is specified.
96+
The "from" field has only three possible values "pod", "namespace" and "node" and defaults to "pod" if none is specified.
9797

9898
A few examples to use this config are as follows:
9999

@@ -106,6 +106,10 @@ annotations:
106106
key: annotation-two
107107
regex: field=(?P<value>.+)
108108
from: namespace
109+
- tag_name: a3 # extracts value of annotation from nodes with key `annotation-three` with regexp and inserts it as a tag with key `a3`
110+
key: annotation-three
111+
regex: field=(?P<value>.+)
112+
from: node
109113

110114
labels:
111115
- tag_name: l1 # extracts value of label from namespaces with key `label1` and inserts it as a tag with key `l1`
@@ -115,6 +119,9 @@ labels:
115119
key: label2
116120
regex: field=(?P<value>.+)
117121
from: pod
122+
- tag_name: l3 # extracts value of label from nodes with key `label3` and inserts it as a tag with key `l3`
123+
key: label3
124+
from: node
118125
```
119126
120127
### Config example
@@ -147,7 +154,7 @@ k8sattributes/2:
147154
148155
## Role-based access control
149156
150-
The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources.
157+
The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicaset` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `node` resources.
151158

152159
Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):
153160

processor/k8sattributesprocessor/client_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ type fakeClient struct {
2323
Informer cache.SharedInformer
2424
NamespaceInformer cache.SharedInformer
2525
ReplicaSetInformer cache.SharedInformer
26+
NodeInformer cache.SharedInformer
2627
Namespaces map[string]*kube.Namespace
28+
Nodes map[string]*kube.Node
2729
StopCh chan struct{}
2830
}
2931

@@ -44,6 +46,7 @@ func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRu
4446
Associations: associations,
4547
Informer: kube.NewFakeInformer(cs, "", ls, fs),
4648
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
49+
NodeInformer: kube.NewFakeInformer(cs, "", ls, fs),
4750
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs),
4851
StopCh: make(chan struct{}),
4952
}, nil
@@ -61,6 +64,11 @@ func (f *fakeClient) GetNamespace(namespace string) (*kube.Namespace, bool) {
6164
return ns, ok
6265
}
6366

67+
func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
68+
node, ok := f.Nodes[nodeName]
69+
return node, ok
70+
}
71+
6472
// Start is a noop for FakeClient.
6573
func (f *fakeClient) Start() {
6674
if f.Informer != nil {

processor/k8sattributesprocessor/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ func (cfg *Config) Validate() error {
5757
}
5858

5959
switch f.From {
60-
case "", kube.MetadataFromPod, kube.MetadataFromNamespace:
60+
case "", kube.MetadataFromPod, kube.MetadataFromNamespace, kube.MetadataFromNode:
6161
default:
62-
return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace", f.From)
62+
return fmt.Errorf("%s is not a valid choice for From. Must be one of: pod, namespace, node", f.From)
6363
}
6464

6565
if f.Regex != "" {
@@ -117,7 +117,7 @@ func (cfg *Config) Validate() error {
117117
// ExtractConfig section allows specifying extraction rules to extract
118118
// data from k8s pod specs.
119119
type ExtractConfig struct {
120-
// Metadata allows to extract pod/namespace metadata from a list of metadata fields.
120+
// Metadata allows to extract pod/namespace/node metadata from a list of metadata fields.
121121
// The field accepts a list of strings.
122122
//
123123
// Metadata fields supported right now are,

processor/k8sattributesprocessor/e2e_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func TestE2E(t *testing.T) {
107107
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
108108
"container.image.tag": newExpectedValue(equal, "latest"),
109109
"container.id": newExpectedValue(exist, ""),
110+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
110111
},
111112
},
112113
{
@@ -129,6 +130,7 @@ func TestE2E(t *testing.T) {
129130
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
130131
"container.image.tag": newExpectedValue(equal, "latest"),
131132
"container.id": newExpectedValue(exist, ""),
133+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
132134
},
133135
},
134136
{
@@ -175,6 +177,7 @@ func TestE2E(t *testing.T) {
175177
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
176178
"container.image.tag": newExpectedValue(equal, "latest"),
177179
"container.id": newExpectedValue(exist, ""),
180+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
178181
},
179182
},
180183
{
@@ -197,6 +200,7 @@ func TestE2E(t *testing.T) {
197200
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
198201
"container.image.tag": newExpectedValue(equal, "latest"),
199202
"container.id": newExpectedValue(exist, ""),
203+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
200204
},
201205
},
202206
{
@@ -219,6 +223,7 @@ func TestE2E(t *testing.T) {
219223
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
220224
"container.image.tag": newExpectedValue(equal, "latest"),
221225
"container.id": newExpectedValue(exist, ""),
226+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
222227
},
223228
},
224229
{
@@ -265,6 +270,7 @@ func TestE2E(t *testing.T) {
265270
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
266271
"container.image.tag": newExpectedValue(equal, "latest"),
267272
"container.id": newExpectedValue(exist, ""),
273+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
268274
},
269275
},
270276
{
@@ -287,6 +293,7 @@ func TestE2E(t *testing.T) {
287293
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
288294
"container.image.tag": newExpectedValue(equal, "latest"),
289295
"container.id": newExpectedValue(exist, ""),
296+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
290297
},
291298
},
292299
{
@@ -333,6 +340,7 @@ func TestE2E(t *testing.T) {
333340
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
334341
"container.image.tag": newExpectedValue(equal, "latest"),
335342
"container.id": newExpectedValue(exist, ""),
343+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
336344
},
337345
},
338346
{
@@ -355,6 +363,7 @@ func TestE2E(t *testing.T) {
355363
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
356364
"container.image.tag": newExpectedValue(equal, "latest"),
357365
"container.id": newExpectedValue(exist, ""),
366+
"k8s.node.labels.foo": newExpectedValue(equal, "too"),
358367
},
359368
},
360369
}

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type WatchClient struct {
4242
kc kubernetes.Interface
4343
informer cache.SharedInformer
4444
namespaceInformer cache.SharedInformer
45+
nodeInformer cache.SharedInformer
4546
replicasetInformer cache.SharedInformer
4647
replicasetRegex *regexp.Regexp
4748
cronJobRegex *regexp.Regexp
@@ -60,6 +61,10 @@ type WatchClient struct {
6061
// Key is namespace name
6162
Namespaces map[string]*Namespace
6263

64+
// A map containing Node related data, used to associate them with resources.
65+
// Key is node name
66+
Nodes map[string]*Node
67+
6368
// A map containing ReplicaSets related data, used to associate them with resources.
6469
// Key is replicaset uid
6570
ReplicaSets map[string]*ReplicaSet
@@ -89,6 +94,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
8994

9095
c.Pods = map[PodIdentifier]*Pod{}
9196
c.Namespaces = map[string]*Namespace{}
97+
c.Nodes = map[string]*Node{}
9298
c.ReplicaSets = map[string]*ReplicaSet{}
9399
if newClientSet == nil {
94100
newClientSet = k8sconfig.MakeClient
@@ -162,6 +168,10 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
162168
}
163169
}
164170

171+
if c.extractNodeLabelsAnnotations() {
172+
c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node)
173+
}
174+
165175
return c, err
166176
}
167177

@@ -198,6 +208,18 @@ func (c *WatchClient) Start() {
198208
}
199209
go c.replicasetInformer.Run(c.stopCh)
200210
}
211+
212+
if c.nodeInformer != nil {
213+
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
214+
AddFunc: c.handleNodeAdd,
215+
UpdateFunc: c.handleNodeUpdate,
216+
DeleteFunc: c.handleNodeDelete,
217+
})
218+
if err != nil {
219+
c.logger.Error("error adding event handler to node informer", zap.Error(err))
220+
}
221+
go c.nodeInformer.Run(c.stopCh)
222+
}
201223
}
202224

203225
// Stop signals the the k8s watcher/informer to stop watching for new events.
@@ -273,6 +295,37 @@ func (c *WatchClient) handleNamespaceDelete(obj interface{}) {
273295
}
274296
}
275297

298+
func (c *WatchClient) handleNodeAdd(obj interface{}) {
299+
observability.RecordNodeAdded()
300+
if node, ok := obj.(*api_v1.Node); ok {
301+
c.addOrUpdateNode(node)
302+
} else {
303+
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj))
304+
}
305+
}
306+
307+
func (c *WatchClient) handleNodeUpdate(_, newNode interface{}) {
308+
observability.RecordNodeUpdated()
309+
if node, ok := newNode.(*api_v1.Node); ok {
310+
c.addOrUpdateNode(node)
311+
} else {
312+
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", newNode))
313+
}
314+
}
315+
316+
func (c *WatchClient) handleNodeDelete(obj interface{}) {
317+
observability.RecordNodeDeleted()
318+
if node, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Node); ok {
319+
c.m.Lock()
320+
if n, ok := c.Nodes[node.Name]; ok {
321+
delete(c.Nodes, n.Name)
322+
}
323+
c.m.Unlock()
324+
} else {
325+
c.logger.Error("object received was not of type api_v1.Node", zap.Any("received", obj))
326+
}
327+
}
328+
276329
func (c *WatchClient) deleteLoop(interval time.Duration, gracePeriod time.Duration) {
277330
// This loop runs after N seconds and deletes pods from cache.
278331
// It iterates over the delete queue and deletes all that aren't
@@ -339,6 +392,17 @@ func (c *WatchClient) GetNamespace(namespace string) (*Namespace, bool) {
339392
return nil, false
340393
}
341394

395+
// GetNode takes a node name and returns the node object the node name is associated with.
396+
func (c *WatchClient) GetNode(nodeName string) (*Node, bool) {
397+
c.m.RLock()
398+
node, ok := c.Nodes[nodeName]
399+
c.m.RUnlock()
400+
if ok {
401+
return node, ok
402+
}
403+
return nil, false
404+
}
405+
342406
func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
343407
tags := map[string]string{}
344408
if c.Rules.PodName {
@@ -614,10 +678,25 @@ func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) ma
614678
return tags
615679
}
616680

681+
func (c *WatchClient) extractNodeAttributes(node *api_v1.Node) map[string]string {
682+
tags := map[string]string{}
683+
684+
for _, r := range c.Rules.Labels {
685+
r.extractFromNodeMetadata(node.Labels, tags, "k8s.node.labels.%s")
686+
}
687+
688+
for _, r := range c.Rules.Annotations {
689+
r.extractFromNodeMetadata(node.Annotations, tags, "k8s.node.annotations.%s")
690+
}
691+
692+
return tags
693+
}
694+
617695
func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
618696
newPod := &Pod{
619697
Name: pod.Name,
620698
Namespace: pod.GetNamespace(),
699+
NodeName: pod.Spec.NodeName,
621700
Address: pod.Status.PodIP,
622701
HostNetwork: pod.Spec.HostNetwork,
623702
PodUID: string(pod.UID),
@@ -832,6 +911,36 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool {
832911
return false
833912
}
834913

914+
func (c *WatchClient) extractNodeLabelsAnnotations() bool {
915+
for _, r := range c.Rules.Labels {
916+
if r.From == MetadataFromNode {
917+
return true
918+
}
919+
}
920+
921+
for _, r := range c.Rules.Annotations {
922+
if r.From == MetadataFromNode {
923+
return true
924+
}
925+
}
926+
927+
return false
928+
}
929+
930+
func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) {
931+
newNode := &Node{
932+
Name: node.Name,
933+
NodeUID: string(node.UID),
934+
}
935+
newNode.Attributes = c.extractNodeAttributes(node)
936+
937+
c.m.Lock()
938+
if node.Name != "" {
939+
c.Nodes[node.Name] = newNode
940+
}
941+
c.m.Unlock()
942+
}
943+
835944
func needContainerAttributes(rules ExtractionRules) bool {
836945
return rules.ContainerImageName ||
837946
rules.ContainerName ||

0 commit comments

Comments
 (0)