Skip to content

Commit c4d8b9d

Browse files
receiver/prometheus: remove assumption that scraped metrics share a resource (#36479)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This change removes as assumption that all metrics in a single scrape come from the same resource. This is indeed not true when `honor_labels` is set to `true` AND when the scraped metrics contain a `job` or `instance` label. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #36477 <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit tests <!--Describe the documentation added.--> #### Documentation N/A <!--Please delete paragraphs that you did not use before submitting.-->
1 parent df405c5 commit c4d8b9d

File tree

4 files changed

+124
-20
lines changed

4 files changed

+124
-20
lines changed

.chloggen/adjuster-resource-fix.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Metric adjuster no longer assumes that all metrics from a scrape come from the same resource
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36477]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/prometheusreceiver/internal/metrics_adjuster.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -255,25 +255,28 @@ func NewInitialPointAdjuster(logger *zap.Logger, gcInterval time.Duration, useCr
255255
// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and
256256
// previous points in the timeseriesMap.
257257
func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
258-
// By contract metrics will have at least 1 data point, so for sure will have at least one ResourceMetrics.
259-
260-
job, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
261-
if !found {
262-
return errors.New("adjusting metrics without job")
263-
}
258+
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
259+
rm := metrics.ResourceMetrics().At(i)
260+
_, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
261+
if !found {
262+
return errors.New("adjusting metrics without job")
263+
}
264264

265-
instance, found := metrics.ResourceMetrics().At(0).Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
266-
if !found {
267-
return errors.New("adjusting metrics without instance")
265+
_, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
266+
if !found {
267+
return errors.New("adjusting metrics without instance")
268+
}
268269
}
269-
tsm := a.jobsMap.get(job.Str(), instance.Str())
270270

271-
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
272-
// nothing else can modify the data used for adjustment.
273-
tsm.Lock()
274-
defer tsm.Unlock()
275271
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
276272
rm := metrics.ResourceMetrics().At(i)
273+
job, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
274+
instance, _ := rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
275+
tsm := a.jobsMap.get(job.Str(), instance.Str())
276+
277+
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
278+
// nothing else can modify the data used for adjustment.
279+
tsm.Lock()
277280
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
278281
ilm := rm.ScopeMetrics().At(j)
279282
for k := 0; k < ilm.Metrics().Len(); k++ {
@@ -303,6 +306,7 @@ func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
303306
}
304307
}
305308
}
309+
tsm.Unlock()
306310
}
307311
return nil
308312
}

receiver/prometheusreceiver/internal/metrics_adjuster_test.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ var (
2626
percent0 = []float64{10, 50, 90}
2727

2828
sum1 = "sum1"
29+
sum2 = "sum2"
2930
gauge1 = "gauge1"
3031
histogram1 = "histogram1"
3132
summary1 = "summary1"
@@ -103,6 +104,37 @@ func TestSum(t *testing.T) {
103104
runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script)
104105
}
105106

107+
func TestSumWithDifferentResources(t *testing.T) {
108+
script := []*metricsAdjusterTest{
109+
{
110+
description: "Sum: round 1 - initial instance, start time is established",
111+
metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t1, 44))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t1, 44)))),
112+
adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t1, 44))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t1, 44)))),
113+
},
114+
{
115+
description: "Sum: round 2 - instance adjusted based on round 1 (metrics in different order)",
116+
metrics: metricsFromResourceMetrics(resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t2, t2, 66))), resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t2, t2, 66)))),
117+
adjusted: metricsFromResourceMetrics(resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t1, t2, 66))), resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t1, t2, 66)))),
118+
},
119+
{
120+
description: "Sum: round 3 - instance reset (value less than previous value), start time is reset",
121+
metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t3, 55))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t3, 55)))),
122+
adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t3, 55))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t3, 55)))),
123+
},
124+
{
125+
description: "Sum: round 4 - instance adjusted based on round 3",
126+
metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t4, t4, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t4, t4, 72)))),
127+
adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t4, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t3, t4, 72)))),
128+
},
129+
{
130+
description: "Sum: round 5 - instance adjusted based on round 4, sum2 metric resets but sum1 doesn't",
131+
metrics: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t5, t5, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t5, t5, 10)))),
132+
adjusted: metricsFromResourceMetrics(resourceMetrics("job1", "instance1", sumMetric(sum1, doublePoint(k1v1k2v2, t3, t5, 72))), resourceMetrics("job2", "instance2", sumMetric(sum2, doublePoint(k1v1k2v2, t5, t5, 10)))),
133+
},
134+
}
135+
runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script)
136+
}
137+
106138
func TestSummaryNoCount(t *testing.T) {
107139
script := []*metricsAdjusterTest{
108140
{
@@ -710,14 +742,32 @@ func runScript(t *testing.T, ma MetricsAdjuster, job, instance string, tests []*
710742
t.Run(test.description, func(t *testing.T) {
711743
adjusted := pmetric.NewMetrics()
712744
test.metrics.CopyTo(adjusted)
713-
// Add the instance/job to the input metrics.
714-
adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance)
715-
adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, job)
745+
// Add the instance/job to the input metrics if they aren't already present.
746+
for i := 0; i < adjusted.ResourceMetrics().Len(); i++ {
747+
rm := adjusted.ResourceMetrics().At(i)
748+
_, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
749+
if !found {
750+
rm.Resource().Attributes().PutStr(semconv.AttributeServiceName, job)
751+
}
752+
_, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
753+
if !found {
754+
rm.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance)
755+
}
756+
}
716757
assert.NoError(t, ma.AdjustMetrics(adjusted))
717758

718-
// Add the instance/job to the expected metrics as well.
719-
test.adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance)
720-
test.adjusted.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, job)
759+
// Add the instance/job to the expected metrics as well if they aren't already present.
760+
for i := 0; i < test.adjusted.ResourceMetrics().Len(); i++ {
761+
rm := test.adjusted.ResourceMetrics().At(i)
762+
_, found := rm.Resource().Attributes().Get(semconv.AttributeServiceName)
763+
if !found {
764+
rm.Resource().Attributes().PutStr(semconv.AttributeServiceName, job)
765+
}
766+
_, found = rm.Resource().Attributes().Get(semconv.AttributeServiceInstanceID)
767+
if !found {
768+
rm.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance)
769+
}
770+
}
721771
assert.EqualValues(t, test.adjusted, adjusted)
722772
})
723773
}

receiver/prometheusreceiver/internal/metricsutil_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package internal
66
import (
77
"go.opentelemetry.io/collector/pdata/pcommon"
88
"go.opentelemetry.io/collector/pdata/pmetric"
9+
semconv "go.opentelemetry.io/collector/semconv/v1.27.0"
910
)
1011

1112
type kv struct {
@@ -23,6 +24,28 @@ func metrics(metrics ...pmetric.Metric) pmetric.Metrics {
2324
return md
2425
}
2526

27+
func metricsFromResourceMetrics(metrics ...pmetric.ResourceMetrics) pmetric.Metrics {
28+
md := pmetric.NewMetrics()
29+
for _, metric := range metrics {
30+
mr := md.ResourceMetrics().AppendEmpty()
31+
metric.CopyTo(mr)
32+
}
33+
return md
34+
}
35+
36+
func resourceMetrics(job, instance string, metrics ...pmetric.Metric) pmetric.ResourceMetrics {
37+
mr := pmetric.NewResourceMetrics()
38+
mr.Resource().Attributes().PutStr(semconv.AttributeServiceName, job)
39+
mr.Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, instance)
40+
ms := mr.ScopeMetrics().AppendEmpty().Metrics()
41+
42+
for _, metric := range metrics {
43+
destMetric := ms.AppendEmpty()
44+
metric.CopyTo(destMetric)
45+
}
46+
return mr
47+
}
48+
2649
func histogramPointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.HistogramDataPoint {
2750
hdp := pmetric.NewHistogramDataPoint()
2851
hdp.SetStartTimestamp(startTimestamp)

0 commit comments

Comments
 (0)