Skip to content

Commit 816d165

Browse files
authored
Add ability to process metrics in schema processor (#38628)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add ability to process metrics in schema processor <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 5b28ee0 commit 816d165

File tree

7 files changed

+351
-1
lines changed

7 files changed

+351
-1
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: schemaprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: |
11+
Add functionality to transform metrics for the schema processor.
12+
13+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
14+
issues: [38628]
15+
16+
# (Optional) One or more lines of additional information to render under the primary note.
17+
# These lines will be padded with 2 spaces and then inserted directly into the document.
18+
# Use pipe (|) for multiline entries.
19+
subtext: |
20+
Adds functionality to transform metrics using the target schema version.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: []

processor/schemaprocessor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/google/go-cmp v0.7.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.122.0
78
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0
89
github.com/patrickmn/go-cache v2.1.0+incompatible
910
github.com/stretchr/testify v1.10.0

processor/schemaprocessor/processor.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,46 @@ func (t schemaProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Lo
8989
return ld, nil
9090
}
9191

92-
func (t schemaProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
92+
func (t schemaProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
93+
for mt := 0; mt < md.ResourceMetrics().Len(); mt++ {
94+
rMetric := md.ResourceMetrics().At(mt)
95+
resourceSchemaURL := rMetric.SchemaUrl()
96+
if resourceSchemaURL != "" {
97+
t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL))
98+
tr, err := t.manager.RequestTranslation(context.Background(), resourceSchemaURL)
99+
if err != nil {
100+
t.log.Error("failed to request translation", zap.Error(err))
101+
return md, err
102+
}
103+
err = tr.ApplyAllResourceChanges(rMetric, resourceSchemaURL)
104+
if err != nil {
105+
t.log.Error("failed to apply resource changes", zap.Error(err))
106+
return md, err
107+
}
108+
}
109+
for sm := 0; sm < rMetric.ScopeMetrics().Len(); sm++ {
110+
metric := rMetric.ScopeMetrics().At(sm)
111+
metricSchemaURL := metric.SchemaUrl()
112+
if metricSchemaURL == "" {
113+
metricSchemaURL = resourceSchemaURL
114+
}
115+
if metricSchemaURL == "" {
116+
continue
117+
}
118+
tr, err := t.manager.
119+
RequestTranslation(ctx, metricSchemaURL)
120+
if err != nil {
121+
t.log.Error("failed to request translation", zap.Error(err))
122+
return md, err
123+
}
124+
err = tr.ApplyScopeMetricChanges(metric, metricSchemaURL)
125+
if err != nil {
126+
t.log.Error("failed to apply scope metric changes", zap.Error(err))
127+
return md, err
128+
}
129+
}
130+
}
131+
93132
return md, nil
94133
}
95134

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package schemaprocessor
5+
6+
import (
7+
"context"
8+
"path/filepath"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/pdata/pmetric"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
17+
)
18+
19+
func TestMetrics_Rename(t *testing.T) {
20+
t.Parallel()
21+
22+
tests := []struct {
23+
name string
24+
in pmetric.Metrics
25+
out pmetric.Metrics
26+
transformations string
27+
targetVersion string
28+
}{
29+
{
30+
name: "one_version_downgrade",
31+
in: func() pmetric.Metrics {
32+
in, err := golden.ReadMetrics(filepath.Join("testdata", "new-metric.yaml"))
33+
assert.NoError(t, err, "Failed to read input metrics")
34+
return in
35+
}(),
36+
out: func() pmetric.Metrics {
37+
out, err := golden.ReadMetrics(filepath.Join("testdata", "old-metric.yaml"))
38+
assert.NoError(t, err, "Failed to read expected output metrics")
39+
return out
40+
}(),
41+
transformations: `
42+
1.9.0:
43+
all:
44+
changes:
45+
- rename_attributes:
46+
attribute_map:
47+
old.resource.name: new.resource.name
48+
metrics:
49+
changes:
50+
- rename_attributes:
51+
attribute_map:
52+
old.attr.name: new.attr.name
53+
- rename_metrics:
54+
old.sum.metric: new.sum.metric
55+
old.gauge.metric: new.gauge.metric
56+
- rename_metrics:
57+
old.histogram.metric: new.histogram.metric
58+
old.summary.metric: new.summary.metric
59+
1.8.0:`,
60+
targetVersion: "1.8.0",
61+
},
62+
{
63+
name: "one_version_upgrade",
64+
in: func() pmetric.Metrics {
65+
in, err := golden.ReadMetrics(filepath.Join("testdata", "old-metric.yaml"))
66+
assert.NoError(t, err, "Failed to read input metrics")
67+
return in
68+
}(),
69+
out: func() pmetric.Metrics {
70+
out, err := golden.ReadMetrics(filepath.Join("testdata", "new-metric.yaml"))
71+
assert.NoError(t, err, "Failed to read expected output metrics")
72+
return out
73+
}(),
74+
transformations: `
75+
1.9.0:
76+
all:
77+
changes:
78+
- rename_attributes:
79+
attribute_map:
80+
old.resource.name: new.resource.name
81+
metrics:
82+
changes:
83+
- rename_attributes:
84+
attribute_map:
85+
old.attr.name: new.attr.name
86+
- rename_metrics:
87+
old.sum.metric: new.sum.metric
88+
old.gauge.metric: new.gauge.metric
89+
- rename_metrics:
90+
old.histogram.metric: new.histogram.metric
91+
old.summary.metric: new.summary.metric
92+
1.8.0:`,
93+
targetVersion: "1.9.0",
94+
},
95+
}
96+
97+
for _, tt := range tests {
98+
t.Run(tt.name, func(t *testing.T) {
99+
pr := newTestSchemaProcessor(t, tt.transformations, tt.targetVersion)
100+
ctx := context.Background()
101+
out, err := pr.processMetrics(ctx, tt.in)
102+
if err != nil {
103+
t.Errorf("Error while processing metrics: %v", err)
104+
}
105+
require.NoError(t, pmetrictest.CompareMetrics(tt.out, out, pmetrictest.IgnoreStartTimestamp(),
106+
pmetrictest.IgnoreTimestamp()))
107+
})
108+
}
109+
}
110+
111+
func TestMetrics_Errors(t *testing.T) {
112+
t.Parallel()
113+
114+
tests := []struct {
115+
name string
116+
in pmetric.Metrics
117+
errormsg string
118+
transformations string
119+
targetVersion string
120+
}{
121+
{
122+
name: "target_attribute_already_exists",
123+
in: func() pmetric.Metrics {
124+
in, err := golden.ReadMetrics(filepath.Join("testdata", "metric-with-old-attr.yaml"))
125+
assert.NoError(t, err, "Failed to read input metrics")
126+
return in
127+
}(),
128+
errormsg: "value \"old.attr.name\" already exists",
129+
transformations: `
130+
1.9.0:
131+
all:
132+
changes:
133+
- rename_attributes:
134+
attribute_map:
135+
old.resource.name: new.resource.name
136+
metrics:
137+
changes:
138+
- rename_attributes:
139+
attribute_map:
140+
old.attr.name: new.attr.name
141+
1.8.0:`,
142+
targetVersion: "1.8.0",
143+
},
144+
{
145+
name: "invalid_schema_translation",
146+
in: func() pmetric.Metrics {
147+
in, err := golden.ReadMetrics(filepath.Join("testdata", "metric-with-old-attr.yaml"))
148+
assert.NoError(t, err, "Failed to read input metrics")
149+
in.ResourceMetrics().At(0).SetSchemaUrl("invalid_schema_url")
150+
return in
151+
}(),
152+
errormsg: "invalid schema version",
153+
transformations: `
154+
1.9.0:
155+
all:
156+
changes:
157+
- rename_attributes:
158+
attribute_map:
159+
old.resource.name: new.resource.name
160+
metrics:
161+
changes:
162+
- rename_attributes:
163+
attribute_map:
164+
old.attr.name: new.attr.name
165+
1.8.0:`,
166+
targetVersion: "1.8.0",
167+
},
168+
}
169+
170+
for _, tt := range tests {
171+
t.Run(tt.name, func(t *testing.T) {
172+
pr := newTestSchemaProcessor(t, tt.transformations, tt.targetVersion)
173+
ctx := context.Background()
174+
_, err := pr.processMetrics(ctx, tt.in)
175+
require.Error(t, err)
176+
assert.Equal(t, tt.errormsg, err.Error())
177+
})
178+
}
179+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: new.resource.name
5+
value:
6+
stringValue: "test-cluster"
7+
schemaUrl: http://opentelemetry.io/schemas/1.9.0
8+
scopeMetrics:
9+
- schemaUrl: http://opentelemetry.io/schemas/1.9.0
10+
- scope: {}
11+
schemaUrl: http://opentelemetry.io/schemas/1.9.0
12+
metrics:
13+
- sum:
14+
dataPoints:
15+
- attributes:
16+
- key: new.attr.name
17+
value:
18+
stringValue: "test-cluster"
19+
- key: old.attr.name
20+
value:
21+
stringValue: "old-cluster"
22+
name: "new.sum.metric"
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: new.resource.name
5+
value:
6+
stringValue: "test-cluster"
7+
schemaUrl: http://opentelemetry.io/schemas/1.9.0
8+
scopeMetrics:
9+
- schemaUrl: http://opentelemetry.io/schemas/1.9.0
10+
- scope: {}
11+
schemaUrl: http://opentelemetry.io/schemas/1.9.0
12+
metrics:
13+
- sum:
14+
dataPoints:
15+
- attributes:
16+
- key: new.attr.name
17+
value:
18+
stringValue: "test-cluster"
19+
name: "new.sum.metric"
20+
- gauge:
21+
dataPoints:
22+
- attributes:
23+
- key: new.attr.name
24+
value:
25+
stringValue: "test-cluster"
26+
name: "new.gauge.metric"
27+
- histogram:
28+
dataPoints:
29+
- attributes:
30+
- key: new.attr.name
31+
value:
32+
stringValue: "test-cluster"
33+
name: "new.histogram.metric"
34+
- summary:
35+
dataPoints:
36+
- attributes:
37+
- key: new.attr.name
38+
value:
39+
stringValue: "test-cluster"
40+
name: "new.summary.metric"
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: old.resource.name
5+
value:
6+
stringValue: "test-cluster"
7+
schemaUrl: http://opentelemetry.io/schemas/1.8.0
8+
scopeMetrics:
9+
- schemaUrl: http://opentelemetry.io/schemas/1.8.0
10+
- scope:
11+
schemaUrl: http://opentelemetry.io/schemas/1.8.0
12+
metrics:
13+
- sum:
14+
dataPoints:
15+
- attributes:
16+
- key: old.attr.name
17+
value:
18+
stringValue: "test-cluster"
19+
name: "old.sum.metric"
20+
- gauge:
21+
dataPoints:
22+
- attributes:
23+
- key: old.attr.name
24+
value:
25+
stringValue: "test-cluster"
26+
name: "old.gauge.metric"
27+
- histogram:
28+
dataPoints:
29+
- attributes:
30+
- key: old.attr.name
31+
value:
32+
stringValue: "test-cluster"
33+
name: "old.histogram.metric"
34+
- summary:
35+
dataPoints:
36+
- attributes:
37+
- key: old.attr.name
38+
value:
39+
stringValue: "test-cluster"
40+
name: "old.summary.metric"

0 commit comments

Comments
 (0)