Skip to content

Commit efb017b

Browse files
enisocatoulme
authored andcommitted
[processor/metricstransform] Fix aggregation of exponential histograms (open-telemetry#39143)
#### Description A panic would occur if the histograms being merged had different numbers of populated entries in the BucketCounts array. Also the counts for the Zero buckets were not being merged. Example config: ``` processors: metricstransform: transforms: - action: combine aggregation_type: sum include: traces.span.metrics.duration new_name: traces.span.metrics.duration ``` Example panic: ``` panic: runtime error: index out of range [133] with length 133 goroutine 177 [running]: go.opentelemetry.io/collector/pdata/pcommon.UInt64Slice.At(...) go.opentelemetry.io/collector/[email protected]/pcommon/generated_uint64slice.go:55 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil.mergeExponentialHistogramDataPoints(0x40014b57e0?, {0x4007bc7220?, 0x400744fee8?}) github.com/open-telemetry/opentelemetry-collector-contrib/internal/[email protected]/aggregateutil/aggregate.go:309 +0x888 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil.MergeDataPoints({0x4001a435c0?, 0x400744fee8?}, {0x40011b4078?, 0x60?}, {0x0?, 0x0?, 0x0?, 0x4006a73410?}) github.com/open-telemetry/opentelemetry-collector-contrib/internal/[email protected]/aggregateutil/aggregate.go:96 +0x208 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor.groupMetrics({0x4001b048a0?, 0x400744fed8?}, {0x40011b4078, 0x3}, {0x4001a435c0?, 0x400744fee8?}) github.com/open-telemetry/opentelemetry-collector-contrib/processor/[email protected]/metrics_transform_processor_otlp.go:451 +0xb8 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor.combine({{0xadce910, 0x40018a0fd8}, {0x40011b4066, 0x7}, {0x4000777800, 0x1c}, 0x0, {0x40011b4078, 0x3}, {0x0, ...}, ...}, ...) github.com/open-telemetry/opentelemetry-collector-contrib/processor/[email protected]/metrics_transform_processor_otlp.go:438 +0x2c0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor.(*metricsTransformProcessor).processMetrics.func1.1({0x4000c0fd50?, 0x4007038b80?}) github.com/open-telemetry/opentelemetry-collector-contrib/processor/[email protected]/metrics_transform_processor_otlp.go:258 +0x4e8 go.opentelemetry.io/collector/pdata/pmetric.ScopeMetricsSlice.RemoveIf({0x40041de338?, 0x4007038b80?}, 0x4000e3f5f8) go.opentelemetry.io/collector/[email protected]/pmetric/generated_scopemetricsslice.go:111 +0x80 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor.(*metricsTransformProcessor).processMetrics.func1({0x40041de300?, 0x4007038b80?}) github.com/open-telemetry/opentelemetry-collector-contrib/processor/[email protected]/metrics_transform_processor_otlp.go:235 +0x64 go.opentelemetry.io/collector/pdata/pmetric.ResourceMetricsSlice.RemoveIf({0x4000e3c378?, 0x4007038b80?}, 0x4000e3f6e8) go.opentelemetry.io/collector/[email protected]/pmetric/generated_resourcemetricsslice.go:111 +0x80 github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor.(*metricsTransformProcessor).processMetrics(0x4000e3f748?, {0x50ac40c?, 0x10?}, {0x4000e3c378?, 0x4007038b80?}) github.com/open-telemetry/opentelemetry-collector-contrib/processor/[email protected]/metrics_transform_processor_otlp.go:234 +0x74 go.opentelemetry.io/collector/processor/processorhelper.NewMetrics.func1({0xad9c090, 0x11b475c0}, {0x4000e3c378?, 0x4007038b80?}) go.opentelemetry.io/collector/[email protected]/processorhelper/metrics.go:55 +0xfc go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/processor/processorhelper.NewMetrics.func1({0xad9c090, 0x11b475c0}, {0x4000e3c378?, 0x4007038b80?}) go.opentelemetry.io/collector/[email protected]/processorhelper/metrics.go:66 +0x21c go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/processor/processorhelper.NewMetrics.func1({0xad9c090, 0x11b475c0}, {0x4000e3c2e8?, 0x4007038390?}) go.opentelemetry.io/collector/[email protected]/processorhelper/metrics.go:66 +0x21c go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/processor/processorhelper.NewMetrics.func1({0xad9c090, 0x11b475c0}, {0x4000e3c2e8?, 0x4007038390?}) go.opentelemetry.io/collector/[email protected]/processorhelper/metrics.go:66 +0x21c go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/processor/processorhelper.NewMetrics.func1({0xad9c090, 0x11b475c0}, {0x4000e3c2e8?, 0x4007038390?}) go.opentelemetry.io/collector/[email protected]/processorhelper/metrics.go:66 +0x21c go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/consumer.ConsumeMetricsFunc.ConsumeMetrics(...) go.opentelemetry.io/collector/[email protected]/metrics.go:27 go.opentelemetry.io/collector/internal/fanoutconsumer.(*metricsConsumer).ConsumeMetrics(0x400197bd40, {0xad9c090, 0x11b475c0}, {0x4000e3c2e8?, 0x4007038390?}) go.opentelemetry.io/collector/internal/[email protected]/metrics.go:60 +0x1ec github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector.(*connectorImp).exportMetrics(0x4000203a40, {0xad9c090, 0x11b475c0}) github.com/open-telemetry/opentelemetry-collector-contrib/connector/[email protected]/connector.go:258 +0x110 github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector.(*connectorImp).Start.func1() github.com/open-telemetry/opentelemetry-collector-contrib/connector/[email protected]/connector.go:213 +0x48 created by github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector.(*connectorImp).Start in goroutine 1 github.com/open-telemetry/opentelemetry-collector-contrib/connector/[email protected]/connector.go:207 +0xa8 ``` --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 7fb4b0d commit efb017b

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: metricstransformprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix aggregation of exponential histograms in metricstransform processor.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39143]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Fix a panic when the number of populated buckets varies, and fix summing of counts for the Zero bucket.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

internal/coreinternal/aggregateutil/aggregate.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,17 +296,31 @@ func mergeExponentialHistogramDataPoints(dpsMap map[string]pmetric.ExponentialHi
296296
}
297297
dp.SetCount(dp.Count() + dps.At(i).Count())
298298
dp.SetSum(dp.Sum() + dps.At(i).Sum())
299+
dp.SetZeroCount(dp.ZeroCount() + dps.At(i).ZeroCount())
299300
if dp.HasMin() && dp.Min() > dps.At(i).Min() {
300301
dp.SetMin(dps.At(i).Min())
301302
}
302303
if dp.HasMax() && dp.Max() < dps.At(i).Max() {
303304
dp.SetMax(dps.At(i).Max())
304305
}
306+
// Merge bucket counts.
307+
// Note that groupExponentialHistogramDataPoints() has already ensured that we only try
308+
// to merge exponential histograms with matching Scale and Positive/Negative Offsets,
309+
// so the corresponding array items in BucketCounts have the same bucket boundaries.
310+
// However, the number of buckets may differ depending on what values have been observed.
305311
for b := 0; b < dps.At(i).Negative().BucketCounts().Len(); b++ {
306-
negatives.SetAt(b, negatives.At(b)+dps.At(i).Negative().BucketCounts().At(b))
312+
if b < negatives.Len() {
313+
negatives.SetAt(b, negatives.At(b)+dps.At(i).Negative().BucketCounts().At(b))
314+
} else {
315+
negatives.Append(dps.At(i).Negative().BucketCounts().At(b))
316+
}
307317
}
308318
for b := 0; b < dps.At(i).Positive().BucketCounts().Len(); b++ {
309-
positives.SetAt(b, positives.At(b)+dps.At(i).Positive().BucketCounts().At(b))
319+
if b < positives.Len() {
320+
positives.SetAt(b, positives.At(b)+dps.At(i).Positive().BucketCounts().At(b))
321+
} else {
322+
positives.Append(dps.At(i).Positive().BucketCounts().At(b))
323+
}
310324
}
311325
dps.At(i).Exemplars().MoveAndAppendTo(dp.Exemplars())
312326
if dps.At(i).StartTimestamp() < dp.StartTimestamp() {

internal/coreinternal/aggregateutil/aggregate_test.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,10 @@ func Test_GroupDataPoints(t *testing.T) {
365365
d := s.DataPoints().AppendEmpty()
366366
d.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{}))
367367
d.Attributes().PutStr("attr1", "val1")
368-
d.SetCount(1)
368+
d.SetCount(9)
369+
d.SetZeroCount(2)
370+
d.Positive().BucketCounts().Append(0, 1, 2, 3)
371+
d.Negative().BucketCounts().Append(0, 1)
369372
return m
370373
},
371374
want: AggGroups{
@@ -483,8 +486,11 @@ func Test_MergeDataPoints(t *testing.T) {
483486
s := m.SetEmptyExponentialHistogram()
484487
d := s.DataPoints().AppendEmpty()
485488
d.Attributes().PutStr("attr1", "val1")
486-
d.SetCount(3)
489+
d.SetCount(16)
487490
d.SetSum(0)
491+
d.SetZeroCount(3)
492+
d.Positive().BucketCounts().Append(0, 2, 4, 3)
493+
d.Negative().BucketCounts().Append(0, 2, 2)
488494
return m
489495
},
490496
in: func() pmetric.Metric {
@@ -547,18 +553,34 @@ func testDataExpHistogram() pmetric.ExponentialHistogramDataPointSlice {
547553
data := pmetric.NewExponentialHistogramDataPointSlice()
548554
d := data.AppendEmpty()
549555
d.Attributes().PutStr("attr1", "val1")
550-
d.SetCount(2)
556+
d.SetCount(7)
557+
d.SetZeroCount(1)
558+
d.Positive().BucketCounts().Append(0, 1, 2)
559+
d.Negative().BucketCounts().Append(0, 1, 2)
551560
return data
552561
}
553562

554563
func testDataExpHistogramDouble() pmetric.ExponentialHistogramDataPointSlice {
555564
dataWant := pmetric.NewExponentialHistogramDataPointSlice()
565+
556566
dWant := dataWant.AppendEmpty()
557567
dWant.Attributes().PutStr("attr1", "val1")
558-
dWant.SetCount(2)
568+
dWant.SetCount(7)
569+
dWant.SetZeroCount(1)
570+
dWant.Positive().BucketCounts().Append(0, 1, 2)
571+
dWant.Negative().BucketCounts().Append(0, 1, 2)
572+
559573
dWant2 := dataWant.AppendEmpty()
560574
dWant2.SetTimestamp(pcommon.NewTimestampFromTime(time.Time{}))
561575
dWant2.Attributes().PutStr("attr1", "val1")
562-
dWant2.SetCount(1)
576+
dWant2.SetCount(9)
577+
dWant2.SetZeroCount(2)
578+
// Use a larger number of buckets than above to check that we expand the
579+
// destination array as needed while merging.
580+
dWant2.Positive().BucketCounts().Append(0, 1, 2, 3)
581+
// Use a smaller number of buckets than above to check that we merge values
582+
// into the correct, existing buckets.
583+
dWant2.Negative().BucketCounts().Append(0, 1)
584+
563585
return dataWant
564586
}

0 commit comments

Comments
 (0)