Skip to content

Commit c001a51

Browse files
authored
Ensure same time is used for updated time, and for expiration test (#2745)
Ensure consistency, data points coming in the same batch are either all dropped or all exported. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2dbc66c commit c001a51

File tree

2 files changed

+42
-46
lines changed

2 files changed

+42
-46
lines changed

exporter/prometheusexporter/accumulator.go

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import (
2626
)
2727

2828
type accumulatedValue struct {
29-
// value contains a metric with exactly one aggregated datapoint
29+
// value contains a metric with exactly one aggregated datapoint.
3030
value pdata.Metric
31-
// stored indicates when metric was stored
32-
stored time.Time
31+
// updated indicates when metric was last changed.
32+
updated time.Time
3333

3434
instrumentationLibrary pdata.InstrumentationLibrary
3535
}
@@ -49,7 +49,7 @@ type lastValueAccumulator struct {
4949
registeredMetrics sync.Map
5050

5151
// metricExpiration contains duration for which metric
52-
// should be served after it was stored
52+
// should be served after it was updated
5353
metricExpiration time.Duration
5454
}
5555

@@ -63,104 +63,103 @@ func newAccumulator(logger *zap.Logger, metricExpiration time.Duration) accumula
6363

6464
// Accumulate stores one datapoint per metric
6565
func (a *lastValueAccumulator) Accumulate(rm pdata.ResourceMetrics) (n int) {
66+
now := time.Now()
6667
ilms := rm.InstrumentationLibraryMetrics()
6768

6869
for i := 0; i < ilms.Len(); i++ {
6970
ilm := ilms.At(i)
7071

7172
metrics := ilm.Metrics()
7273
for j := 0; j < metrics.Len(); j++ {
73-
n += a.addMetric(metrics.At(j), ilm.InstrumentationLibrary())
74+
n += a.addMetric(metrics.At(j), ilm.InstrumentationLibrary(), now)
7475
}
7576
}
7677

7778
return
7879
}
7980

80-
func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.InstrumentationLibrary) int {
81+
func (a *lastValueAccumulator) addMetric(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) int {
8182
a.logger.Debug(fmt.Sprintf("accumulating metric: %s", metric.Name()))
8283

8384
switch metric.DataType() {
8485
case pdata.MetricDataTypeIntGauge:
85-
return a.accumulateIntGauge(metric, il)
86+
return a.accumulateIntGauge(metric, il, now)
8687
case pdata.MetricDataTypeIntSum:
87-
return a.accumulateIntSum(metric, il)
88+
return a.accumulateIntSum(metric, il, now)
8889
case pdata.MetricDataTypeDoubleGauge:
89-
return a.accumulateDoubleGauge(metric, il)
90+
return a.accumulateDoubleGauge(metric, il, now)
9091
case pdata.MetricDataTypeDoubleSum:
91-
return a.accumulateDoubleSum(metric, il)
92+
return a.accumulateDoubleSum(metric, il, now)
9293
case pdata.MetricDataTypeIntHistogram:
93-
return a.accumulateIntHistogram(metric, il)
94+
return a.accumulateIntHistogram(metric, il, now)
9495
case pdata.MetricDataTypeHistogram:
95-
return a.accumulateDoubleHistogram(metric, il)
96+
return a.accumulateDoubleHistogram(metric, il, now)
9697
}
9798

9899
return 0
99100
}
100101

101-
func (a *lastValueAccumulator) accumulateIntGauge(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
102+
func (a *lastValueAccumulator) accumulateIntGauge(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
102103
dps := metric.IntGauge().DataPoints()
103104
for i := 0; i < dps.Len(); i++ {
104105
ip := dps.At(i)
105106

106-
ts := ip.Timestamp().AsTime()
107107
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
108108

109109
v, ok := a.registeredMetrics.Load(signature)
110110
if !ok {
111111
m := createMetric(metric)
112112
m.IntGauge().DataPoints().Append(ip)
113-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
113+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
114114
n++
115115
continue
116116
}
117117
mv := v.(*accumulatedValue)
118118

119-
if ts.Before(mv.value.IntGauge().DataPoints().At(0).Timestamp().AsTime()) {
119+
if ip.Timestamp().AsTime().Before(mv.value.IntGauge().DataPoints().At(0).Timestamp().AsTime()) {
120120
// only keep datapoint with latest timestamp
121121
continue
122122
}
123123

124124
m := createMetric(metric)
125125
m.IntGauge().DataPoints().Append(ip)
126-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
126+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
127127
n++
128128
}
129129
return
130130
}
131131

132-
func (a *lastValueAccumulator) accumulateDoubleGauge(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
132+
func (a *lastValueAccumulator) accumulateDoubleGauge(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
133133
dps := metric.DoubleGauge().DataPoints()
134134
for i := 0; i < dps.Len(); i++ {
135135
ip := dps.At(i)
136136

137-
ts := ip.Timestamp().AsTime()
138137
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
139138

140139
v, ok := a.registeredMetrics.Load(signature)
141140
if !ok {
142141
m := createMetric(metric)
143142
m.DoubleGauge().DataPoints().Append(ip)
144-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
143+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
145144
n++
146145
continue
147146
}
148147
mv := v.(*accumulatedValue)
149148

150-
if ts.Before(mv.value.DoubleGauge().DataPoints().At(0).Timestamp().AsTime()) {
149+
if ip.Timestamp().AsTime().Before(mv.value.DoubleGauge().DataPoints().At(0).Timestamp().AsTime()) {
151150
// only keep datapoint with latest timestamp
152151
continue
153152
}
154153

155154
m := createMetric(metric)
156155
m.DoubleGauge().DataPoints().Append(ip)
157-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
156+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
158157
n++
159158
}
160159
return
161160
}
162161

163-
func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
162+
func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
164163
intSum := metric.IntSum()
165164

166165
// Drop metrics with non-cumulative aggregations
@@ -172,7 +171,6 @@ func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.In
172171
for i := 0; i < dps.Len(); i++ {
173172
ip := dps.At(i)
174173

175-
ts := ip.Timestamp().AsTime()
176174
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
177175

178176
v, ok := a.registeredMetrics.Load(signature)
@@ -181,13 +179,13 @@ func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.In
181179
m.IntSum().SetIsMonotonic(metric.IntSum().IsMonotonic())
182180
m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
183181
m.IntSum().DataPoints().Append(ip)
184-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
182+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
185183
n++
186184
continue
187185
}
188186
mv := v.(*accumulatedValue)
189187

190-
if ts.Before(mv.value.IntSum().DataPoints().At(0).Timestamp().AsTime()) {
188+
if ip.Timestamp().AsTime().Before(mv.value.IntSum().DataPoints().At(0).Timestamp().AsTime()) {
191189
// only keep datapoint with latest timestamp
192190
continue
193191
}
@@ -196,13 +194,13 @@ func (a *lastValueAccumulator) accumulateIntSum(metric pdata.Metric, il pdata.In
196194
m.IntSum().SetIsMonotonic(metric.IntSum().IsMonotonic())
197195
m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
198196
m.IntSum().DataPoints().Append(ip)
199-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
197+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
200198
n++
201199
}
202200
return
203201
}
204202

205-
func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
203+
func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
206204
doubleSum := metric.DoubleSum()
207205

208206
// Drop metrics with non-cumulative aggregations
@@ -214,7 +212,6 @@ func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata
214212
for i := 0; i < dps.Len(); i++ {
215213
ip := dps.At(i)
216214

217-
ts := ip.Timestamp().AsTime()
218215
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
219216

220217
v, ok := a.registeredMetrics.Load(signature)
@@ -223,13 +220,13 @@ func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata
223220
m.DoubleSum().SetIsMonotonic(metric.DoubleSum().IsMonotonic())
224221
m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
225222
m.DoubleSum().DataPoints().Append(ip)
226-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
223+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
227224
n++
228225
continue
229226
}
230227
mv := v.(*accumulatedValue)
231228

232-
if ts.Before(mv.value.DoubleSum().DataPoints().At(0).Timestamp().AsTime()) {
229+
if ip.Timestamp().AsTime().Before(mv.value.DoubleSum().DataPoints().At(0).Timestamp().AsTime()) {
233230
// only keep datapoint with latest timestamp
234231
continue
235232
}
@@ -238,13 +235,13 @@ func (a *lastValueAccumulator) accumulateDoubleSum(metric pdata.Metric, il pdata
238235
m.DoubleSum().SetIsMonotonic(metric.DoubleSum().IsMonotonic())
239236
m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
240237
m.DoubleSum().DataPoints().Append(ip)
241-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
238+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
242239
n++
243240
}
244241
return
245242
}
246243

247-
func (a *lastValueAccumulator) accumulateIntHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
244+
func (a *lastValueAccumulator) accumulateIntHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
248245
intHistogram := metric.IntHistogram()
249246

250247
// Drop metrics with non-cumulative aggregations
@@ -256,34 +253,33 @@ func (a *lastValueAccumulator) accumulateIntHistogram(metric pdata.Metric, il pd
256253
for i := 0; i < dps.Len(); i++ {
257254
ip := dps.At(i)
258255

259-
ts := ip.Timestamp().AsTime()
260256
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
261257

262258
v, ok := a.registeredMetrics.Load(signature)
263259
if !ok {
264260
m := createMetric(metric)
265261
m.IntHistogram().DataPoints().Append(ip)
266-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
262+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
267263
n++
268264
continue
269265
}
270266
mv := v.(*accumulatedValue)
271267

272-
if ts.Before(mv.value.IntHistogram().DataPoints().At(0).Timestamp().AsTime()) {
268+
if ip.Timestamp().AsTime().Before(mv.value.IntHistogram().DataPoints().At(0).Timestamp().AsTime()) {
273269
// only keep datapoint with latest timestamp
274270
continue
275271
}
276272

277273
m := createMetric(metric)
278274
m.IntHistogram().DataPoints().Append(ip)
279275
m.IntHistogram().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
280-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
276+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
281277
n++
282278
}
283279
return
284280
}
285281

286-
func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary) (n int) {
282+
func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il pdata.InstrumentationLibrary, now time.Time) (n int) {
287283
doubleHistogram := metric.Histogram()
288284

289285
// Drop metrics with non-cumulative aggregations
@@ -295,28 +291,27 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il
295291
for i := 0; i < dps.Len(); i++ {
296292
ip := dps.At(i)
297293

298-
ts := ip.Timestamp().AsTime()
299294
signature := timeseriesSignature(il.Name(), metric, ip.LabelsMap())
300295

301296
v, ok := a.registeredMetrics.Load(signature)
302297
if !ok {
303298
m := createMetric(metric)
304299
m.Histogram().DataPoints().Append(ip)
305-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
300+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
306301
n++
307302
continue
308303
}
309304
mv := v.(*accumulatedValue)
310305

311-
if ts.Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
306+
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
312307
// only keep datapoint with latest timestamp
313308
continue
314309
}
315310

316311
m := createMetric(metric)
317312
m.Histogram().DataPoints().Append(ip)
318313
m.Histogram().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
319-
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, stored: time.Now()})
314+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, instrumentationLibrary: il, updated: now})
320315
n++
321316
}
322317
return
@@ -326,11 +321,12 @@ func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pdata.Metric, il
326321
func (a *lastValueAccumulator) Collect() []pdata.Metric {
327322
a.logger.Debug("Accumulator collect called")
328323

329-
res := make([]pdata.Metric, 0)
324+
var res []pdata.Metric
325+
expirationTime := time.Now().Add(-a.metricExpiration)
330326

331327
a.registeredMetrics.Range(func(key, value interface{}) bool {
332328
v := value.(*accumulatedValue)
333-
if time.Now().After(v.stored.Add(a.metricExpiration)) {
329+
if expirationTime.After(v.updated) {
334330
a.logger.Debug(fmt.Sprintf("metric expired: %s", v.value.Name()))
335331
a.registeredMetrics.Delete(key)
336332
return true

exporter/prometheusexporter/accumulator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestInvalidDataType(t *testing.T) {
2929
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
3030
metric := pdata.NewMetric()
3131
metric.SetDataType(-100)
32-
n := a.addMetric(metric, pdata.NewInstrumentationLibrary())
32+
n := a.addMetric(metric, pdata.NewInstrumentationLibrary(), time.Now())
3333
require.Zero(t, n)
3434
}
3535

@@ -351,7 +351,7 @@ func TestAccumulateMetrics(t *testing.T) {
351351
require.Equal(t, m2Labels.Len(), vLabels.Len())
352352
require.Equal(t, m2Value, vValue)
353353
require.Equal(t, ts2.Unix(), vTS.Unix())
354-
require.Greater(t, v.stored.Unix(), vTS.Unix())
354+
require.Greater(t, v.updated.Unix(), vTS.Unix())
355355
require.Equal(t, m2Temporality, vTemporality)
356356
require.Equal(t, m2IsMonotonic, vIsMonotonic)
357357

0 commit comments

Comments
 (0)