Skip to content

Commit 736b2d3

Browse files
committed
metricstarttimeprocessor: address edge case where reset happens immediately after the reference is cached
Signed-off-by: Ridwan Sharif <[email protected]>
1 parent f8fd66d commit 736b2d3

File tree

6 files changed

+169
-97
lines changed

6 files changed

+169
-97
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ chlog-update: $(CHLOGGEN)
400400
.PHONY: genotelcontribcol
401401
genotelcontribcol: $(BUILDER)
402402
./internal/buildscripts/ocb-add-replaces.sh otelcontribcol
403-
$(BUILDER) --skip-compilation --config cmd/otelcontribcol/builder-config.yaml
403+
$(BUILDER) --skip-compilation --config cmd/otelcontribcol/builder-config-replaced.yaml
404404

405405
# Build the Collector executable.
406406
.PHONY: otelcontribcol
@@ -417,7 +417,7 @@ otelcontribcollite: genotelcontribcol
417417
.PHONY: genoteltestbedcol
418418
genoteltestbedcol: $(BUILDER)
419419
./internal/buildscripts/ocb-add-replaces.sh oteltestbedcol
420-
$(BUILDER) --skip-compilation --config cmd/oteltestbedcol/builder-config.yaml
420+
$(BUILDER) --skip-compilation --config cmd/oteltestbedcol/builder-config-replaced.yaml
421421

422422
# Build the Collector executable, with only components used in testbed.
423423
.PHONY: oteltestbedcol

processor/metricstarttimeprocessor/internal/datapointstorage/timeseries_map.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,20 @@ func (tsm *TimeseriesMap) GC() {
101101
tsm.Mark = false
102102
}
103103

104-
// IsResetHistogram compares the given histogram datapoint h, to tsi.Histogram
104+
// IsResetHistogram compares the given histogram datapoint h, to ref
105105
// and determines whether the metric has been reset based on the values. It is
106106
// a reset if any of the bucket boundaries have changed, if any of the bucket
107107
// counts have decreased or if the total sum or count have decreased.
108-
func (tsi *TimeseriesInfo) IsResetHistogram(h pmetric.HistogramDataPoint) bool {
109-
if h.Count() < tsi.Histogram.Count() {
108+
func IsResetHistogram(h, ref pmetric.HistogramDataPoint) bool {
109+
if h.Count() < ref.Count() {
110110
return true
111111
}
112-
if h.Sum() < tsi.Histogram.Sum() {
112+
if h.Sum() < ref.Sum() {
113113
return true
114114
}
115115

116116
// Guard against bucket boundaries changes.
117-
refBounds := tsi.Histogram.ExplicitBounds().AsRaw()
117+
refBounds := ref.ExplicitBounds().AsRaw()
118118
hBounds := h.ExplicitBounds().AsRaw()
119119
if len(refBounds) != len(hBounds) {
120120
return true
@@ -126,69 +126,69 @@ func (tsi *TimeseriesInfo) IsResetHistogram(h pmetric.HistogramDataPoint) bool {
126126
}
127127

128128
// We need to check individual buckets to make sure the counts are all increasing.
129-
if tsi.Histogram.BucketCounts().Len() != h.BucketCounts().Len() {
129+
if ref.BucketCounts().Len() != h.BucketCounts().Len() {
130130
return true
131131
}
132-
for i := range tsi.Histogram.BucketCounts().Len() {
133-
if h.BucketCounts().At(i) < tsi.Histogram.BucketCounts().At(i) {
132+
for i := range ref.BucketCounts().Len() {
133+
if h.BucketCounts().At(i) < ref.BucketCounts().At(i) {
134134
return true
135135
}
136136
}
137137
return false
138138
}
139139

140140
// IsResetExponentialHistogram compares the given exponential histogram
141-
// datapoint eh, to tsi.ExponentialHistogram and determines whether the metric
141+
// datapoint eh, to ref and determines whether the metric
142142
// has been reset based on the values. It is a reset if any of the bucket
143143
// boundaries have changed, if any of the bucket counts have decreased or if the
144144
// total sum or count have decreased.
145-
func (tsi *TimeseriesInfo) IsResetExponentialHistogram(eh pmetric.ExponentialHistogramDataPoint) bool {
145+
func IsResetExponentialHistogram(eh, ref pmetric.ExponentialHistogramDataPoint) bool {
146146
// Same as the histogram implementation
147-
if eh.Count() < tsi.ExponentialHistogram.Count() {
147+
if eh.Count() < ref.Count() {
148148
return true
149149
}
150-
if eh.Sum() < tsi.ExponentialHistogram.Sum() {
150+
if eh.Sum() < ref.Sum() {
151151
return true
152152
}
153153

154154
// Guard against bucket boundaries changes.
155-
if tsi.ExponentialHistogram.Scale() != eh.Scale() {
155+
if ref.Scale() != eh.Scale() {
156156
return true
157157
}
158158

159159
// We need to check individual buckets to make sure the counts are all increasing.
160-
if tsi.ExponentialHistogram.Positive().BucketCounts().Len() != eh.Positive().BucketCounts().Len() {
160+
if ref.Positive().BucketCounts().Len() != eh.Positive().BucketCounts().Len() {
161161
return true
162162
}
163-
for i := range tsi.ExponentialHistogram.Positive().BucketCounts().Len() {
164-
if eh.Positive().BucketCounts().At(i) < tsi.ExponentialHistogram.Positive().BucketCounts().At(i) {
163+
for i := range ref.Positive().BucketCounts().Len() {
164+
if eh.Positive().BucketCounts().At(i) < ref.Positive().BucketCounts().At(i) {
165165
return true
166166
}
167167
}
168-
if tsi.ExponentialHistogram.Negative().BucketCounts().Len() != eh.Negative().BucketCounts().Len() {
168+
if ref.Negative().BucketCounts().Len() != eh.Negative().BucketCounts().Len() {
169169
return true
170170
}
171-
for i := range tsi.ExponentialHistogram.Negative().BucketCounts().Len() {
172-
if eh.Negative().BucketCounts().At(i) < tsi.ExponentialHistogram.Negative().BucketCounts().At(i) {
171+
for i := range ref.Negative().BucketCounts().Len() {
172+
if eh.Negative().BucketCounts().At(i) < ref.Negative().BucketCounts().At(i) {
173173
return true
174174
}
175175
}
176176

177177
return false
178178
}
179179

180-
// IsResetSummary compares the given summary datapoint s to tsi.Summary and
180+
// IsResetSummary compares the given summary datapoint s to ref and
181181
// determines whether the metric has been reset based on the values. It is a
182182
// reset if the count or sum has decreased.
183-
func (tsi *TimeseriesInfo) IsResetSummary(s pmetric.SummaryDataPoint) bool {
184-
return s.Count() < tsi.Summary.Count() || s.Sum() < tsi.Summary.Sum()
183+
func IsResetSummary(s, ref pmetric.SummaryDataPoint) bool {
184+
return s.Count() < ref.Count() || s.Sum() < ref.Sum()
185185
}
186186

187-
// IsResetSum compares the given number datapoint s to tsi.Number and determines
187+
// IsResetSum compares the given number datapoint s to ref and determines
188188
// whether the metric has been reset based on the values. It is a reset if the
189189
// value has decreased.
190-
func (tsi *TimeseriesInfo) IsResetSum(s pmetric.NumberDataPoint) bool {
191-
return s.DoubleValue() < tsi.Number.DoubleValue()
190+
func IsResetSum(s, ref pmetric.NumberDataPoint) bool {
191+
return s.DoubleValue() < ref.DoubleValue()
192192
}
193193

194194
func newTimeseriesMap() *TimeseriesMap {

processor/metricstarttimeprocessor/internal/datapointstorage/timeseries_map_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func TestTimeseriesInfo_IsResetHistogram(t *testing.T) {
316316
t.Run(tt.name, func(t *testing.T) {
317317
tsi := tt.setupTsi()
318318
h := tt.setupH()
319-
assert.Equal(t, tt.expectedReset, tsi.IsResetHistogram(h))
319+
assert.Equal(t, tt.expectedReset, IsResetHistogram(h, tsi.Histogram))
320320
})
321321
}
322322
}
@@ -518,7 +518,7 @@ func TestTimeseriesInfo_IsResetExponentialHistogram(t *testing.T) {
518518
t.Run(tt.name, func(t *testing.T) {
519519
tsi := tt.setupTsi()
520520
eh := tt.setupEh()
521-
assert.Equal(t, tt.expectedReset, tsi.IsResetExponentialHistogram(eh))
521+
assert.Equal(t, tt.expectedReset, IsResetExponentialHistogram(eh, tsi.ExponentialHistogram))
522522
})
523523
}
524524
}
@@ -604,7 +604,7 @@ func TestTimeseriesInfo_IsResetSummary(t *testing.T) {
604604
t.Run(tt.name, func(t *testing.T) {
605605
tsi := tt.setupTsi()
606606
s := tt.setupS()
607-
assert.Equal(t, tt.expectedReset, tsi.IsResetSummary(s))
607+
assert.Equal(t, tt.expectedReset, IsResetSummary(s, tsi.Summary))
608608
})
609609
}
610610
}
@@ -652,7 +652,7 @@ func TestTimeseriesInfo_IsResetSum(t *testing.T) {
652652
t.Run(tt.name, func(t *testing.T) {
653653
tsi := tt.setupTsi()
654654
s := tt.setupS()
655-
assert.Equal(t, tt.expectedReset, tsi.IsResetSum(s))
655+
assert.Equal(t, tt.expectedReset, IsResetSum(s, tsi.Number))
656656
})
657657
}
658658
}

processor/metricstarttimeprocessor/internal/subtractinitial/adjuster.go

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
// The subtract initial point adjuster sets the start time of all points in a series by:
2121
// - Dropping the initial point, and recording its value and timestamp.
2222
// - Subtracting the initial point from all subsequent points, and using the timestamp of the initial point as the start timestamp.
23+
//
24+
// Note that when a reset is detected (eg: value of a counter is decreasing) - the strategy will set the
25+
// start time of the reset point as point timestamp - 1ms.
2326
const Type = "subtract_initial_point"
2427

2528
type Adjuster struct {
@@ -155,29 +158,33 @@ func adjustMetricHistogram(referenceTsm, previousValueTsm *datapointstorage.Time
155158
adjustedPoint.CopyTo(resHistogram.DataPoints().AppendEmpty())
156159
continue
157160
}
161+
isReset := datapointstorage.IsResetHistogram(adjustedPoint, referenceTsi.Histogram)
158162
subtractHistogramDataPoint(adjustedPoint, referenceTsi.Histogram)
159163

160164
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
161-
if !found {
162-
// First point after the reference. Not a reset.
163-
previousTsi.Histogram = pmetric.NewHistogramDataPoint()
164-
} else if previousTsi.IsResetHistogram(adjustedPoint) {
165+
if isReset || (found && datapointstorage.IsResetHistogram(adjustedPoint, previousTsi.Histogram)) {
165166
// reset re-initialize everything and use the non adjusted points start time.
166167
resetStartTimeStamp := pcommon.NewTimestampFromTime(currentDist.StartTimestamp().AsTime().Add(-1 * time.Millisecond))
167168
currentDist.SetStartTimestamp(resetStartTimeStamp)
168169

170+
// Update the reference value with the current point.
169171
referenceTsi.Histogram = pmetric.NewHistogramDataPoint()
172+
previousTsi.Histogram = pmetric.NewHistogramDataPoint()
170173
referenceTsi.Histogram.SetStartTimestamp(currentDist.StartTimestamp())
174+
currentDist.ExplicitBounds().CopyTo(referenceTsi.Histogram.ExplicitBounds())
175+
referenceTsi.Histogram.BucketCounts().FromRaw(make([]uint64, currentDist.BucketCounts().Len()))
171176

172-
tmp := resHistogram.DataPoints().AppendEmpty()
173-
currentDist.CopyTo(tmp)
177+
currentDist.CopyTo(resHistogram.DataPoints().AppendEmpty())
178+
currentDist.CopyTo(previousTsi.Histogram)
174179
continue
180+
} else if !found {
181+
// First point after the reference. Not a reset.
182+
previousTsi.Histogram = pmetric.NewHistogramDataPoint()
175183
}
176184

177185
// Update previous values with the current point.
178186
adjustedPoint.CopyTo(previousTsi.Histogram)
179-
tmp := resHistogram.DataPoints().AppendEmpty()
180-
adjustedPoint.CopyTo(tmp)
187+
adjustedPoint.CopyTo(resHistogram.DataPoints().AppendEmpty())
181188
}
182189
}
183190

@@ -217,29 +224,34 @@ func adjustMetricExponentialHistogram(referenceTsm, previousValueTsm *datapoints
217224
adjustedPoint.CopyTo(resExpHistogram.DataPoints().AppendEmpty())
218225
continue
219226
}
227+
228+
isReset := datapointstorage.IsResetExponentialHistogram(adjustedPoint, referenceTsi.ExponentialHistogram)
220229
subtractExponentialHistogramDataPoint(adjustedPoint, referenceTsi.ExponentialHistogram)
221230

222231
previousTsi, found := previousValueTsm.Get(current, currentDist.Attributes())
223-
if !found {
224-
// First point after the reference. Not a reset.
225-
previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
226-
} else if previousTsi.IsResetExponentialHistogram(adjustedPoint) {
232+
if isReset || (found && datapointstorage.IsResetExponentialHistogram(adjustedPoint, previousTsi.ExponentialHistogram)) {
227233
// reset re-initialize everything and use the non adjusted points start time.
228234
resetStartTimeStamp := pcommon.NewTimestampFromTime(currentDist.StartTimestamp().AsTime().Add(-1 * time.Millisecond))
229235
currentDist.SetStartTimestamp(resetStartTimeStamp)
230236

231237
referenceTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
238+
previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
232239
referenceTsi.ExponentialHistogram.SetStartTimestamp(currentDist.StartTimestamp())
240+
referenceTsi.ExponentialHistogram.SetScale(currentDist.Scale())
241+
referenceTsi.ExponentialHistogram.Positive().BucketCounts().FromRaw(make([]uint64, currentDist.Positive().BucketCounts().Len()))
242+
referenceTsi.ExponentialHistogram.Negative().BucketCounts().FromRaw(make([]uint64, currentDist.Negative().BucketCounts().Len()))
233243

234-
tmp := resExpHistogram.DataPoints().AppendEmpty()
235-
currentDist.CopyTo(tmp)
244+
currentDist.CopyTo(resExpHistogram.DataPoints().AppendEmpty())
245+
currentDist.CopyTo(previousTsi.ExponentialHistogram)
236246
continue
247+
} else if !found {
248+
// First point after the reference. Not a reset.
249+
previousTsi.ExponentialHistogram = pmetric.NewExponentialHistogramDataPoint()
237250
}
238251

239252
// Update previous values with the current point.
240253
adjustedPoint.CopyTo(previousTsi.ExponentialHistogram)
241-
tmp := resExpHistogram.DataPoints().AppendEmpty()
242-
adjustedPoint.CopyTo(tmp)
254+
adjustedPoint.CopyTo(resExpHistogram.DataPoints().AppendEmpty())
243255
}
244256
}
245257

@@ -278,29 +290,30 @@ func adjustMetricSum(referenceTsm, previousValueTsm *datapointstorage.Timeseries
278290
adjustedPoint.CopyTo(resSum.DataPoints().AppendEmpty())
279291
continue
280292
}
293+
isReset := datapointstorage.IsResetSum(adjustedPoint, referenceTsi.Number)
281294
adjustedPoint.SetDoubleValue(adjustedPoint.DoubleValue() - referenceTsi.Number.DoubleValue())
282295

283296
previousTsi, found := previousValueTsm.Get(current, currentSum.Attributes())
284-
if !found {
285-
// First point after the reference. Not a reset.
286-
previousTsi.Number = pmetric.NewNumberDataPoint()
287-
} else if previousTsi.IsResetSum(adjustedPoint) {
297+
if isReset || (found && datapointstorage.IsResetSum(adjustedPoint, previousTsi.Number)) {
288298
// reset re-initialize everything and use the non adjusted points start time.
289299
resetStartTimeStamp := pcommon.NewTimestampFromTime(currentSum.StartTimestamp().AsTime().Add(-1 * time.Millisecond))
290300
currentSum.SetStartTimestamp(resetStartTimeStamp)
291301

292302
referenceTsi.Number = pmetric.NewNumberDataPoint()
303+
previousTsi.Number = pmetric.NewNumberDataPoint()
293304
referenceTsi.Number.SetStartTimestamp(currentSum.StartTimestamp())
294305

295-
tmp := resSum.DataPoints().AppendEmpty()
296-
currentSum.CopyTo(tmp)
306+
currentSum.CopyTo(resSum.DataPoints().AppendEmpty())
307+
currentSum.CopyTo(previousTsi.Number)
297308
continue
309+
} else if !found {
310+
// First point after the reference. Not a reset.
311+
previousTsi.Number = pmetric.NewNumberDataPoint()
298312
}
299313

300314
// Update previous values with the current point.
301315
adjustedPoint.CopyTo(previousTsi.Number)
302-
tmp := resSum.DataPoints().AppendEmpty()
303-
adjustedPoint.CopyTo(tmp)
316+
adjustedPoint.CopyTo(resSum.DataPoints().AppendEmpty())
304317
}
305318
}
306319

@@ -331,30 +344,32 @@ func adjustMetricSummary(referenceTsm, previousValueTsm *datapointstorage.Timese
331344
adjustedPoint.CopyTo(resSummary.DataPoints().AppendEmpty())
332345
continue
333346
}
347+
348+
isReset := datapointstorage.IsResetSummary(adjustedPoint, referenceTsi.Summary)
334349
adjustedPoint.SetCount(adjustedPoint.Count() - referenceTsi.Summary.Count())
335350
adjustedPoint.SetSum(adjustedPoint.Sum() - referenceTsi.Summary.Sum())
336351

337352
previousTsi, found := previousValueTsm.Get(current, currentSummary.Attributes())
338-
if !found {
339-
// First point after the reference. Not a reset.
340-
previousTsi.Summary = pmetric.NewSummaryDataPoint()
341-
} else if previousTsi.IsResetSummary(adjustedPoint) {
353+
if isReset || (found && datapointstorage.IsResetSummary(adjustedPoint, previousTsi.Summary)) {
342354
// reset re-initialize everything and use the non adjusted points start time.
343355
resetStartTimeStamp := pcommon.NewTimestampFromTime(currentSummary.StartTimestamp().AsTime().Add(-1 * time.Millisecond))
344356
currentSummary.SetStartTimestamp(resetStartTimeStamp)
345357

346358
referenceTsi.Summary = pmetric.NewSummaryDataPoint()
359+
previousTsi.Summary = pmetric.NewSummaryDataPoint()
347360
referenceTsi.Summary.SetStartTimestamp(currentSummary.StartTimestamp())
348361

349-
tmp := resSummary.DataPoints().AppendEmpty()
350-
currentSummary.CopyTo(tmp)
362+
currentSummary.CopyTo(resSummary.DataPoints().AppendEmpty())
363+
currentSummary.CopyTo(previousTsi.Summary)
351364
continue
365+
} else if !found {
366+
// First point after the reference. Not a reset.
367+
previousTsi.Summary = pmetric.NewSummaryDataPoint()
352368
}
353369

354370
// Update previous values with the current point.
355371
adjustedPoint.CopyTo(previousTsi.Summary)
356-
tmp := resSummary.DataPoints().AppendEmpty()
357-
adjustedPoint.CopyTo(tmp)
372+
adjustedPoint.CopyTo(resSummary.DataPoints().AppendEmpty())
358373
}
359374
}
360375

0 commit comments

Comments
 (0)