Skip to content

Commit c5ae763

Browse files
committed
metricstarttimeprocessor: add subtractinitial strategy implementation
This change addresses open-telemetry#38379.
1 parent d7544ff commit c5ae763

File tree

5 files changed

+1413
-10
lines changed

5 files changed

+1413
-10
lines changed

.chloggen/decouple-cache.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: metricstarttimeprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Decouples the cache from the strategies for adjusting
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38382]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go

Lines changed: 339 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ package subtractinitial // import "github.com/open-telemetry/opentelemetry-colle
55

66
import (
77
"context"
8-
"errors"
98
"time"
109

1110
"go.opentelemetry.io/collector/component"
1211
"go.opentelemetry.io/collector/pdata/pmetric"
12+
"go.uber.org/zap"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/datapointstorage"
1316
)
1417

1518
// Type is the value users can use to configure the subtract initial point adjuster.
@@ -19,19 +22,349 @@ import (
1922
const Type = "subtract_initial_point"
2023

2124
type Adjuster struct {
22-
set component.TelemetrySettings
25+
referenceCache *datapointstorage.Cache
26+
previousValueCache *datapointstorage.Cache
27+
set component.TelemetrySettings
2328
}
2429

2530
// NewAdjuster returns a new Adjuster which adjust metrics' start times based on the initial received points.
26-
func NewAdjuster(set component.TelemetrySettings, _ time.Duration) *Adjuster {
31+
func NewAdjuster(set component.TelemetrySettings, gcInterval time.Duration) *Adjuster {
2732
return &Adjuster{
28-
set: set,
33+
referenceCache: datapointstorage.NewCache(gcInterval),
34+
previousValueCache: datapointstorage.NewCache(gcInterval),
35+
set: set,
2936
}
3037
}
3138

3239
// AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and
3340
// previous points in the timeseriesMap.
41+
// For each metric:
42+
// - Check if it exists in the map already.
43+
// - If it doesn't, save its value in the reference cache and move on
44+
// - If it does, find its reference cache value
45+
// - Add new entry to the result with the value - reference value
46+
// - When a reset is discovered, update the reference as well.
3447
func (a *Adjuster) AdjustMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) {
35-
// TODO(#38379): Implement the subtract_initial_point adjuster
36-
return metrics, errors.New("not implemented")
48+
// Create a copy of metrics to store the results
49+
resultMetrics := pmetric.NewMetrics()
50+
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
51+
rm := metrics.ResourceMetrics().At(i)
52+
53+
// Copy over resource info to the result.
54+
resResource := resultMetrics.ResourceMetrics().AppendEmpty()
55+
resResource.SetSchemaUrl(rm.SchemaUrl())
56+
rm.Resource().CopyTo(resResource.Resource())
57+
58+
attrHash := pdatautil.MapHash(rm.Resource().Attributes())
59+
referenceTsm, _ := a.referenceCache.Get(attrHash)
60+
previousValueTsm, _ := a.previousValueCache.Get(attrHash)
61+
62+
// The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that
63+
// nothing else can modify the data used for adjustment.
64+
referenceTsm.Lock()
65+
previousValueTsm.Lock()
66+
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
67+
ilm := rm.ScopeMetrics().At(j)
68+
69+
// Copy over scope info to the result.
70+
resScope := resResource.ScopeMetrics().AppendEmpty()
71+
resScope.SetSchemaUrl(ilm.SchemaUrl())
72+
ilm.Scope().CopyTo(resScope.Scope())
73+
74+
for k := 0; k < ilm.Metrics().Len(); k++ {
75+
metric := ilm.Metrics().At(k)
76+
77+
// Copy over metric info to the result.
78+
resMetric := resScope.Metrics().AppendEmpty()
79+
resMetric.SetName(metric.Name())
80+
resMetric.SetDescription(metric.Description())
81+
resMetric.SetUnit(metric.Unit())
82+
metric.Metadata().CopyTo(resMetric.Metadata())
83+
84+
switch dataType := metric.Type(); dataType {
85+
case pmetric.MetricTypeGauge:
86+
// gauges don't need to be adjusted so no additional processing is necessary
87+
metric.CopyTo(resMetric)
88+
89+
case pmetric.MetricTypeHistogram:
90+
a.adjustMetricHistogram(referenceTsm, previousValueTsm, metric, resMetric.SetEmptyHistogram())
91+
92+
case pmetric.MetricTypeSummary:
93+
a.adjustMetricSummary(referenceTsm, previousValueTsm, metric, resMetric.SetEmptySummary())
94+
95+
case pmetric.MetricTypeSum:
96+
a.adjustMetricSum(referenceTsm, previousValueTsm, metric, resMetric.SetEmptySum())
97+
98+
case pmetric.MetricTypeExponentialHistogram:
99+
a.adjustMetricExponentialHistogram(referenceTsm, previousValueTsm, metric, resMetric.SetEmptyExponentialHistogram())
100+
101+
case pmetric.MetricTypeEmpty:
102+
fallthrough
103+
104+
default:
105+
// this shouldn't happen
106+
a.set.Logger.Info("Adjust - skipping unexpected point", zap.String("type", dataType.String()))
107+
}
108+
}
109+
}
110+
referenceTsm.Unlock()
111+
previousValueTsm.Unlock()
112+
}
113+
114+
return resultMetrics, nil
115+
}
116+
117+
func (a *Adjuster) adjustMetricHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resHistogram pmetric.Histogram) {
118+
resHistogram.SetAggregationTemporality(current.Histogram().AggregationTemporality())
119+
120+
histogram := current.Histogram()
121+
if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
122+
// Only dealing with CumulativeDistributions.
123+
histogram.CopyTo(resHistogram)
124+
return
125+
}
126+
127+
currentPoints := histogram.DataPoints()
128+
for i := 0; i < currentPoints.Len(); i++ {
129+
currentDist := currentPoints.At(i)
130+
131+
referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes())
132+
if !found {
133+
// initialize everything. Don't add the datapoint to the result.
134+
referenceTsi.Histogram = pmetric.NewHistogramDataPoint()
135+
currentDist.CopyTo(referenceTsi.Histogram)
136+
continue
137+
}
138+
139+
// Adjust the datapoint based on the reference value.
140+
adjustedPoint := pmetric.NewHistogramDataPoint()
141+
currentDist.CopyTo(adjustedPoint)
142+
adjustedPoint.SetStartTimestamp(referenceTsi.Histogram.StartTimestamp())
143+
if currentDist.Flags().NoRecordedValue() {
144+
// TODO: Investigate why this does not reset.
145+
tmp := resHistogram.DataPoints().AppendEmpty()
146+
adjustedPoint.CopyTo(tmp)
147+
continue
148+
}
149+
subtractHistogramDataPoint(adjustedPoint, referenceTsi.Histogram)
150+
151+
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
152+
if !found {
153+
// First point after the reference. Not a reset.
154+
previousTsi.Histogram = pmetric.NewHistogramDataPoint()
155+
} else if previousTsi.IsResetHistogram(adjustedPoint) {
156+
// reset re-initialize everything and use the non adjusted points start time.
157+
referenceTsi.Histogram = pmetric.NewHistogramDataPoint()
158+
referenceTsi.Histogram.SetStartTimestamp(currentDist.StartTimestamp())
159+
160+
tmp := resHistogram.DataPoints().AppendEmpty()
161+
currentDist.CopyTo(tmp)
162+
continue
163+
}
164+
165+
// Update previous values with the current point.
166+
adjustedPoint.CopyTo(previousTsi.Histogram)
167+
tmp := resHistogram.DataPoints().AppendEmpty()
168+
adjustedPoint.CopyTo(tmp)
169+
}
170+
}
171+
172+
func (a *Adjuster) adjustMetricExponentialHistogram(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resExpHistogram pmetric.ExponentialHistogram) {
173+
resExpHistogram.SetAggregationTemporality(current.ExponentialHistogram().AggregationTemporality())
174+
175+
histogram := current.ExponentialHistogram()
176+
if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
177+
// Only dealing with CumulativeDistributions.
178+
histogram.CopyTo(resExpHistogram)
179+
return
180+
}
181+
182+
currentPoints := histogram.DataPoints()
183+
for i := 0; i < currentPoints.Len(); i++ {
184+
currentDist := currentPoints.At(i)
185+
186+
referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes())
187+
if !found {
188+
// initialize everything. Don't add the datapoint to the result.
189+
referenceTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
190+
currentDist.CopyTo(referenceTsi.ExponentialHistogram)
191+
continue
192+
}
193+
194+
// Adjust the datapoint based on the reference value.
195+
adjustedPoint := pmetric.NewExponentialHistogramDataPoint()
196+
currentDist.CopyTo(adjustedPoint)
197+
adjustedPoint.SetStartTimestamp(referenceTsi.ExponentialHistogram.StartTimestamp())
198+
if currentDist.Flags().NoRecordedValue() {
199+
// TODO: Investigate why this does not reset.
200+
tmp := resExpHistogram.DataPoints().AppendEmpty()
201+
adjustedPoint.CopyTo(tmp)
202+
continue
203+
}
204+
subtractExponentialHistogramDataPoint(adjustedPoint, referenceTsi.ExponentialHistogram)
205+
206+
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
207+
if !found {
208+
// First point after the reference. Not a reset.
209+
previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
210+
} else if previousTsi.IsResetExponentialHistogram(adjustedPoint) {
211+
// reset re-initialize everything and use the non adjusted points start time.
212+
referenceTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
213+
referenceTsi.ExponentialHistogram.SetStartTimestamp(currentDist.StartTimestamp())
214+
215+
tmp := resExpHistogram.DataPoints().AppendEmpty()
216+
currentDist.CopyTo(tmp)
217+
continue
218+
}
219+
220+
// Update previous values with the current point.
221+
adjustedPoint.CopyTo(previousTsi.ExponentialHistogram)
222+
tmp := resExpHistogram.DataPoints().AppendEmpty()
223+
adjustedPoint.CopyTo(tmp)
224+
}
225+
}
226+
227+
func (a *Adjuster) adjustMetricSum(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resSum pmetric.Sum) {
228+
resSum.SetAggregationTemporality(current.Sum().AggregationTemporality())
229+
resSum.SetIsMonotonic(current.Sum().IsMonotonic())
230+
231+
currentPoints := current.Sum().DataPoints()
232+
for i := 0; i < currentPoints.Len(); i++ {
233+
currentDist := currentPoints.At(i)
234+
235+
referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes())
236+
if !found {
237+
// initialize everything. Don't add the datapoint to the result.
238+
referenceTsi.Number = pmetric.NewNumberDataPoint()
239+
currentDist.CopyTo(referenceTsi.Number)
240+
continue
241+
}
242+
243+
// Adjust the datapoint based on the reference value.
244+
adjustedPoint := pmetric.NewNumberDataPoint()
245+
currentDist.CopyTo(adjustedPoint)
246+
adjustedPoint.SetStartTimestamp(referenceTsi.Number.StartTimestamp())
247+
if currentDist.Flags().NoRecordedValue() {
248+
// TODO: Investigate why this does not reset.
249+
tmp := resSum.DataPoints().AppendEmpty()
250+
adjustedPoint.CopyTo(tmp)
251+
continue
252+
}
253+
adjustedPoint.SetDoubleValue(adjustedPoint.DoubleValue() - referenceTsi.Number.DoubleValue())
254+
255+
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
256+
if !found {
257+
// First point after the reference. Not a reset.
258+
previousTsi.Number = pmetric.NewNumberDataPoint()
259+
} else if previousTsi.IsResetSum(adjustedPoint) {
260+
// reset re-initialize everything and use the non adjusted points start time.
261+
referenceTsi.Number = pmetric.NewNumberDataPoint()
262+
referenceTsi.Number.SetStartTimestamp(currentDist.StartTimestamp())
263+
264+
tmp := resSum.DataPoints().AppendEmpty()
265+
currentDist.CopyTo(tmp)
266+
continue
267+
}
268+
269+
// Update previous values with the current point.
270+
adjustedPoint.CopyTo(previousTsi.Number)
271+
tmp := resSum.DataPoints().AppendEmpty()
272+
adjustedPoint.CopyTo(tmp)
273+
}
274+
}
275+
276+
func (a *Adjuster) adjustMetricSummary(referenceTsm, previousValueTsm *datapointstorage.TimeseriesMap, current pmetric.Metric, resSummary pmetric.Summary) {
277+
currentPoints := current.Summary().DataPoints()
278+
for i := 0; i < currentPoints.Len(); i++ {
279+
currentDist := currentPoints.At(i)
280+
281+
referenceTsi, found := referenceTsm.Get(current, currentDist.Attributes())
282+
if !found {
283+
// initialize everything. Don't add the datapoint to the result.
284+
referenceTsi.Summary = pmetric.NewSummaryDataPoint()
285+
currentDist.CopyTo(referenceTsi.Summary)
286+
continue
287+
}
288+
289+
// Adjust the datapoint based on the reference value.
290+
adjustedPoint := pmetric.NewSummaryDataPoint()
291+
currentDist.CopyTo(adjustedPoint)
292+
adjustedPoint.SetStartTimestamp(referenceTsi.Summary.StartTimestamp())
293+
if currentDist.Flags().NoRecordedValue() {
294+
// TODO: Investigate why this does not reset.
295+
tmp := resSummary.DataPoints().AppendEmpty()
296+
adjustedPoint.CopyTo(tmp)
297+
continue
298+
}
299+
adjustedPoint.SetCount(adjustedPoint.Count() - referenceTsi.Summary.Count())
300+
adjustedPoint.SetSum(adjustedPoint.Sum() - referenceTsi.Summary.Sum())
301+
302+
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
303+
if !found {
304+
// First point after the reference. Not a reset.
305+
previousTsi.Summary = pmetric.NewSummaryDataPoint()
306+
} else if previousTsi.IsResetSummary(adjustedPoint) {
307+
// reset re-initialize everything and use the non adjusted points start time.
308+
referenceTsi.Summary = pmetric.NewSummaryDataPoint()
309+
referenceTsi.Summary.SetStartTimestamp(currentDist.StartTimestamp())
310+
311+
tmp := resSummary.DataPoints().AppendEmpty()
312+
currentDist.CopyTo(tmp)
313+
continue
314+
}
315+
316+
// Update previous values with the current point.
317+
adjustedPoint.CopyTo(previousTsi.Summary)
318+
tmp := resSummary.DataPoints().AppendEmpty()
319+
adjustedPoint.CopyTo(tmp)
320+
}
321+
}
322+
323+
// subtractHistogramDataPoint subtracts b from a.
324+
func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) {
325+
a.SetStartTimestamp(b.StartTimestamp())
326+
a.SetCount(a.Count() - b.Count())
327+
a.SetSum(a.Sum() - b.Sum())
328+
aBuckets := a.BucketCounts()
329+
bBuckets := b.BucketCounts()
330+
if bBuckets.Len() != aBuckets.Len() {
331+
// Post reset, the reference histogram will have no buckets.
332+
return
333+
}
334+
newBuckets := make([]uint64, aBuckets.Len())
335+
for i := 0; i < aBuckets.Len(); i++ {
336+
newBuckets[i] = aBuckets.At(i) - bBuckets.At(i)
337+
}
338+
a.BucketCounts().FromRaw(newBuckets)
339+
}
340+
341+
// subtractExponentialHistogramDataPoint subtracts b from a.
342+
func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramDataPoint) {
343+
a.SetStartTimestamp(b.StartTimestamp())
344+
a.SetCount(a.Count() - b.Count())
345+
a.SetSum(a.Sum() - b.Sum())
346+
a.SetZeroCount(a.ZeroCount() - b.ZeroCount())
347+
if a.Positive().BucketCounts().Len() != b.Positive().BucketCounts().Len() ||
348+
a.Negative().BucketCounts().Len() != b.Negative().BucketCounts().Len() {
349+
// Post reset, the reference histogram will have no buckets.
350+
return
351+
}
352+
a.Positive().BucketCounts().FromRaw(subtractExponentialBuckets(a.Positive(), b.Positive()))
353+
a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative()))
354+
}
355+
356+
// subtractExponentialBuckets subtracts b from a.
357+
func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBuckets) []uint64 {
358+
newBuckets := make([]uint64, a.BucketCounts().Len())
359+
offsetDiff := int(a.Offset() - b.Offset())
360+
for i := 0; i < a.BucketCounts().Len(); i++ {
361+
bOffset := i + offsetDiff
362+
// if there is no corresponding bucket for the starting BucketCounts, don't normalize
363+
if bOffset < 0 || bOffset >= b.BucketCounts().Len() {
364+
newBuckets[i] = a.BucketCounts().At(i)
365+
} else {
366+
newBuckets[i] = a.BucketCounts().At(i) - b.BucketCounts().At(bOffset)
367+
}
368+
}
369+
return newBuckets
37370
}

0 commit comments

Comments
 (0)