Skip to content

[processor/k8sattributes] Automatic resource attributes #37114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1021e4e
automatically set service instance id
zeitlinger Jan 8, 2025
4bee42d
automatically set service instance id
zeitlinger Jan 8, 2025
a5e6ddc
automatically set service instance id
zeitlinger Jan 9, 2025
8b3196f
automatically set service instance id
zeitlinger Jan 9, 2025
a9bb403
add label rules, test annotation override
zeitlinger Jan 9, 2025
02c50e8
add service version
zeitlinger Jan 9, 2025
0e1aeb0
automatically set service name
zeitlinger Jan 9, 2025
575b8d9
automatically set service name
zeitlinger Jan 9, 2025
74b9b1a
automatically set service name
zeitlinger Jan 9, 2025
109c985
automatically set service name
zeitlinger Jan 9, 2025
fa8ede3
automatically set service name
zeitlinger Jan 9, 2025
18143ad
lint
zeitlinger Jan 10, 2025
1a3372e
add changelog entry
zeitlinger Jan 10, 2025
abdd9ec
add docs
zeitlinger Jan 10, 2025
b0a0e07
part-of is not part of semconv
zeitlinger Mar 10, 2025
529df25
add app.kubernetes.io/instance
zeitlinger Mar 10, 2025
1029b1e
update service.version
zeitlinger Mar 10, 2025
e59e647
add app.kubernetes.io/instance
zeitlinger Mar 11, 2025
7a66579
rename
zeitlinger Mar 18, 2025
ccaf3fe
fix merge, revert implicit "enabled: true"
zeitlinger Mar 18, 2025
0560d17
custom prefixes
zeitlinger Mar 18, 2025
dec8a07
add, customizations and missing namespace rule
zeitlinger Mar 18, 2025
8f3afb6
add, customizations and missing namespace rule
zeitlinger Mar 18, 2025
890ed61
add, customizations and missing namespace rule
zeitlinger Mar 18, 2025
387ebca
format
zeitlinger Mar 19, 2025
43e39fa
format
zeitlinger Mar 19, 2025
c30d1ed
format
zeitlinger Mar 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/operator-resource-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
change_type: enhancement

component: k8sattributesprocessor

note: Add option to configure resource attributes using the same logic as the OTel operator

issues: [37114]

subtext: |
If you are using the file log receiver, you can now create the same resource attributes as traces (via OTLP) received
from an application instrumented with the OpenTelemetry Operator -
simply by adding the `extract: { automatic_attributes: { enabled: true }}` configuration to the `k8sattributesprocessor` processor.
See the [documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/k8sattributesprocessor/README.md#config-example) for more details.
14 changes: 10 additions & 4 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,16 @@ k8sattributes/2:
- k8s.node.name
- k8s.pod.start_time
labels:
# This label extraction rule takes the value 'app.kubernetes.io/component' label and maps it to the 'app.label.component' attribute which will be added to the associated resources
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
# This label extraction rule takes the value 'app.kubernetes.io/component' label and maps it to the 'app.label.component' attribute which will be added to the associated resources
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
automatic_attributes:
# Apply the operator rules - see https://github.com/open-telemetry/semantic-conventions/blob/main/docs/non-normative/k8s-attributes.md
enabled: true
well_known_labels: true # default is false
annotation_prefixes: ["foo/"] # default is ["resource.opentelemetry.io/"] - use empty list to disable
exclude: ["service.version"] # default is empty list
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just move well known attributes from this section to where all the other pre-defined resource attributes are configured extract.metadata, and disabled by default?

Then this section would be responsible for taking resource attributes from annotations only. Instead of calling it automatic_attributes, it can be something like

extract:
  otel_annotations: <bool> // false by default

Then, if we want to provide flexibility for annotations extraction, we could add extract.annotations with the same syntax as we currently have for extract.labels. That is not really needed right now.

Providing such interface would be consistent with the existing one and easier to understand by the end users. Please let me know WDYT.

pod_association:
- sources:
# This rule associates all resources containing the 'k8s.pod.ip' attribute with the matching pods. If this attribute is not present in the resource, this rule will not be able to find the matching pod.
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type ExtractConfig struct {
// It is a list of FieldExtractConfig type. See FieldExtractConfig
// documentation for more details.
Labels []FieldExtractConfig `mapstructure:"labels"`

AutomaticRules kube.AutomaticRules `mapstructure:"automatic_attributes"`
}

// FieldExtractConfig allows specifying an extraction rule to extract a resource attribute from pod (or namespace)
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func createProcessorOpts(cfg component.Config) []option {
opts = append(opts, withExtractMetadata(oCfg.Extract.Metadata...))
opts = append(opts, withExtractLabels(oCfg.Extract.Labels...))
opts = append(opts, withExtractAnnotations(oCfg.Extract.Annotations...))
opts = append(opts, withAutomaticRules(oCfg.Extract.AutomaticRules))

// filters
opts = append(opts, withFilterNode(oCfg.Filter.Node, oCfg.Filter.NodeFromEnvVar))
Expand Down
3 changes: 2 additions & 1 deletion processor/k8sattributesprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sat
go 1.23.0

require (
github.com/distribution/reference v0.6.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.122.0
Expand Down Expand Up @@ -38,6 +39,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
k8s.io/api v0.32.3
k8s.io/apimachinery v0.32.3
k8s.io/client-go v0.32.3
Expand All @@ -47,7 +49,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.5.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 92 additions & 15 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/distribution/reference"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -86,6 +87,8 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
// format: [cronjob-name]-[time-hash-int]
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

var errCannotRetrieveImage = errors.New("cannot retrieve image name")

// New initializes a new k8s Client.
func New(
set component.TelemetrySettings,
Expand Down Expand Up @@ -457,11 +460,15 @@ func (c *WatchClient) GetNode(nodeName string) (*Node, bool) {
return nil, false
}

func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) (map[string]string, map[string]string) {
tags := map[string]string{}
serviceNames := map[string]string{}
if c.Rules.PodName {
tags[conventions.AttributeK8SPodName] = pod.Name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SPodName] = pod.Name
}

if c.Rules.PodHostName {
tags[tagHostName] = pod.Spec.Hostname
Expand Down Expand Up @@ -500,7 +507,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
c.Rules.CronJobName || c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -510,10 +517,19 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.DeploymentName {
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.DeploymentName || c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
name := replicaset.Deployment.Name
if name != "" {
if c.Rules.DeploymentName {
tags[conventions.AttributeK8SDeploymentName] = name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SDeploymentName] = name
}
}
}
}
Expand All @@ -531,18 +547,30 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.DaemonSetName {
tags[conventions.AttributeK8SDaemonSetName] = ref.Name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SDaemonSetName] = ref.Name
}
case "StatefulSet":
if c.Rules.StatefulSetUID {
tags[conventions.AttributeK8SStatefulSetUID] = string(ref.UID)
}
if c.Rules.StatefulSetName {
tags[conventions.AttributeK8SStatefulSetName] = ref.Name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SStatefulSetName] = ref.Name
}
case "Job":
if c.Rules.CronJobName {
if c.Rules.CronJobName || c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SCronJobName] = parts[1]
name := parts[1]
if c.Rules.CronJobName {
tags[conventions.AttributeK8SCronJobName] = name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SCronJobName] = name
}
}
}
if c.Rules.JobUID {
Expand All @@ -551,6 +579,9 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.JobName {
tags[conventions.AttributeK8SJobName] = ref.Name
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
serviceNames[conventions.AttributeK8SJobName] = ref.Name
}
}
}
}
Expand All @@ -574,7 +605,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
for _, r := range c.Rules.Annotations {
r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s")
}
return tags
return tags, serviceNames
}

// This function removes all data from the Pod except what is required by extraction rules and pod association
Expand Down Expand Up @@ -639,7 +670,7 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
transformedContainer := api_v1.Container{}
transformedContainer.Name = c.Name // we always need the name, it's used for identification
if rules.ContainerImageName || rules.ContainerImageTag {
if rules.ContainerImageName || rules.ContainerImageTag || rules.AutomaticRules.NeedContainer() {
transformedContainer.Image = c.Image
}
return transformedContainer
Expand Down Expand Up @@ -672,6 +703,38 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
return &transformedPod
}

// parseServiceVersionFromImage parses the service version for differently-formatted image names
// according to https://github.com/open-telemetry/semantic-conventions/blob/main/docs/non-normative/k8s-attributes.md#how-serviceversion-should-be-calculated
func parseServiceVersionFromImage(image string) (string, error) {
ref, err := reference.Parse(image)
if err != nil {
return "", err
}

namedRef, ok := ref.(reference.Named)
if !ok {
return "", errCannotRetrieveImage
}
var tag, digest string
if taggedRef, ok := namedRef.(reference.Tagged); ok {
tag = taggedRef.Tag()
}
if digestedRef, ok := namedRef.(reference.Digested); ok {
digest = digestedRef.Digest().String()
}
if digest != "" {
if tag != "" {
return fmt.Sprintf("%s@%s", tag, digest), nil
}
return digest, nil
}
if tag != "" {
return tag, nil
}

return "", errCannotRetrieveImage
}

func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers {
containers := PodContainers{
ByID: map[string]*Container{},
Expand All @@ -680,7 +743,7 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if !needContainerAttributes(c.Rules) {
return containers
}
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag {
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag || c.Rules.AutomaticRules.NeedContainer() {
for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
container := &Container{}
imageRef, err := dcommon.ParseImageName(spec.Image)
Expand All @@ -691,18 +754,31 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if c.Rules.ContainerImageTag {
container.ImageTag = imageRef.Tag
}
serviceVersion, err := parseServiceVersionFromImage(spec.Image)
if err == nil {
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceVersion) {
container.ServiceVersion = serviceVersion
}
}
}
containers.ByName[spec.Name] = container
}
}
for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
container, ok := containers.ByName[apiStatus.Name]
containerName := apiStatus.Name
container, ok := containers.ByName[containerName]
if !ok {
container = &Container{}
containers.ByName[apiStatus.Name] = container
containers.ByName[containerName] = container
}
if c.Rules.ContainerName {
container.Name = apiStatus.Name
container.Name = containerName
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceInstanceID) {
container.ServiceInstanceID = automaticServiceInstanceID(pod, containerName)
}
if c.Rules.AutomaticRules.IsEnabled(conventions.AttributeServiceName) {
container.ServiceName = containerName
}
containerID := apiStatus.ContainerID
// Remove container runtime prefix
Expand Down Expand Up @@ -774,7 +850,7 @@ func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
if c.shouldIgnorePod(pod) {
newPod.Ignore = true
} else {
newPod.Attributes = c.extractPodAttributes(pod)
newPod.Attributes, newPod.ServiceNames = c.extractPodAttributes(pod)
if needContainerAttributes(c.Rules) {
newPod.Containers = c.extractPodContainersAttributes(pod)
}
Expand Down Expand Up @@ -1035,7 +1111,8 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerName ||
rules.ContainerImageTag ||
rules.ContainerImageRepoDigests ||
rules.ContainerID
rules.ContainerID ||
rules.AutomaticRules.NeedContainer()
}

func (c *WatchClient) handleReplicaSetAdd(obj any) {
Expand Down
Loading