Skip to content

Commit 8163dd0

Browse files
nijaveMovieStoreGuy
authored andcommitted
[processor/spanmetrics] Prune histograms (open-telemetry#27083)
Prune histograms when the dimension cache evictions are removed **Description:** Prunes histograms when the dimension cache is pruned. This prevents metric series from growing indefinitely **Link to tracking Issue:** open-telemetry#27080 **Testing:** I modified the the existing test to check `histograms` length instead of dimensions cache length. This required simulating ticks to hit the exportMetrics function **Documentation:** <Describe the documentation added.> Co-authored-by: Sean Marciniak <[email protected]>
1 parent 7006046 commit 8163dd0

File tree

4 files changed

+82
-23
lines changed

4 files changed

+82
-23
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/spanmetrics
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Prune histograms when dimension cache is pruned.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [27080]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Dimension cache was always pruned but histograms were not being pruned. This caused metric series created
20+
by processor/spanmetrics to grow unbounded.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

processor/spanmetricsprocessor/internal/cache/cache.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
6464
return val, ok
6565
}
6666

67+
func (c *Cache[K, V]) Contains(key K) bool {
68+
return c.lru.Contains(key)
69+
}
70+
6771
// Len returns the number of items in the cache.
6872
func (c *Cache[K, V]) Len() int {
6973
return c.lru.Len()

processor/spanmetricsprocessor/processor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,11 @@ func (p *processorImp) exportMetrics(ctx context.Context) {
258258
p.metricKeyToDimensions.Purge()
259259
} else {
260260
p.metricKeyToDimensions.RemoveEvictedItems()
261+
for key := range p.histograms {
262+
if !p.metricKeyToDimensions.Contains(key) {
263+
delete(p.histograms, key)
264+
}
265+
}
261266
}
262267

263268
// This component no longer needs to read the metrics once built, so it is safe to unlock.

processor/spanmetricsprocessor/processor_test.go

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const (
4747
notInSpanAttrName0 = "shouldBeInMetric"
4848
notInSpanAttrName1 = "shouldNotBeInMetric"
4949
regionResourceAttrName = "region"
50-
DimensionsCacheSize = 2
50+
DimensionsCacheSize = 1000
5151

5252
sampleRegion = "us-east-1"
5353
sampleLatency = float64(11)
@@ -144,7 +144,7 @@ func TestProcessorConcurrentShutdown(t *testing.T) {
144144
ticker := mockClock.NewTicker(time.Nanosecond)
145145

146146
// Test
147-
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker)
147+
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker, DimensionsCacheSize)
148148
err := p.Start(ctx, mhost)
149149
require.NoError(t, err)
150150

@@ -230,7 +230,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
230230
tcon := &mocks.TracesConsumer{}
231231
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(fakeErr)
232232

233-
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, nil)
233+
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, nil, DimensionsCacheSize)
234234

235235
traces := buildSampleTrace()
236236

@@ -262,7 +262,7 @@ func TestProcessorConsumeMetricsErrors(t *testing.T) {
262262

263263
mockClock := clock.NewMock(time.Now())
264264
ticker := mockClock.NewTicker(time.Nanosecond)
265-
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker)
265+
p := newProcessorImp(mexp, tcon, nil, cumulative, logger, ticker, DimensionsCacheSize)
266266

267267
exporters := map[component.DataType]map[component.ID]component.Component{}
268268
mhost := &mocks.Host{}
@@ -362,7 +362,7 @@ func TestProcessorConsumeTraces(t *testing.T) {
362362
mockClock := clock.NewMock(time.Now())
363363
ticker := mockClock.NewTicker(time.Nanosecond)
364364

365-
p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)
365+
p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, zaptest.NewLogger(t), ticker, DimensionsCacheSize)
366366

367367
exporters := map[component.DataType]map[component.ID]component.Component{}
368368
mhost := &mocks.Host{}
@@ -387,39 +387,61 @@ func TestProcessorConsumeTraces(t *testing.T) {
387387
}
388388
}
389389

390-
func TestMetricKeyCache(t *testing.T) {
390+
func TestMetricCache(t *testing.T) {
391+
var wg sync.WaitGroup
392+
391393
mexp := &mocks.MetricsConsumer{}
392-
tcon := &mocks.TracesConsumer{}
394+
mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pmetric.Metrics) bool {
395+
wg.Done()
396+
return true
397+
})).Return(nil)
393398

394-
mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil)
399+
tcon := &mocks.TracesConsumer{}
395400
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)
396401

397402
defaultNullValue := pcommon.NewValueStr("defaultNullValue")
398-
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), nil)
399-
traces := buildSampleTrace()
403+
mockClock := clock.NewMock(time.Now())
404+
ticker := mockClock.NewTicker(time.Nanosecond)
405+
dimensionsCacheSize := 2
406+
407+
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker, dimensionsCacheSize)
408+
409+
exporters := map[component.DataType]map[component.ID]component.Component{}
410+
mhost := &mocks.Host{}
411+
mhost.On("GetExporters").Return(exporters)
400412

401413
// Test
402414
ctx := metadata.NewIncomingContext(context.Background(), nil)
415+
err := p.Start(ctx, mhost)
416+
require.NoError(t, err)
403417

404418
// 0 key was cached at beginning
405-
assert.Zero(t, p.metricKeyToDimensions.Len())
419+
assert.Zero(t, len(p.histograms))
420+
421+
traces := buildSampleTrace()
422+
require.Condition(t, func() bool {
423+
return traces.SpanCount() >= dimensionsCacheSize
424+
})
425+
426+
err = p.ConsumeTraces(ctx, traces)
427+
wg.Add(1)
428+
mockClock.Add(time.Nanosecond)
429+
wg.Wait()
406430

407-
err := p.ConsumeTraces(ctx, traces)
408431
// Validate
409432
require.NoError(t, err)
410433
// 2 key was cached, 1 key was evicted and cleaned after the processing
411-
assert.Eventually(t, func() bool {
412-
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
413-
}, 10*time.Second, time.Millisecond*100)
434+
assert.Equal(t, len(p.histograms), dimensionsCacheSize)
414435

415436
// consume another batch of traces
416437
err = p.ConsumeTraces(ctx, traces)
417438
require.NoError(t, err)
439+
wg.Add(1)
440+
mockClock.Add(time.Nanosecond)
441+
wg.Wait()
418442

419443
// 2 key was cached, other keys were evicted and cleaned after the processing
420-
assert.Eventually(t, func() bool {
421-
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
422-
}, 10*time.Second, time.Millisecond*100)
444+
assert.Equal(t, len(p.histograms), dimensionsCacheSize)
423445
}
424446

425447
func BenchmarkProcessorConsumeTraces(b *testing.B) {
@@ -431,7 +453,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) {
431453
tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil)
432454

433455
defaultNullValue := pcommon.NewValueStr("defaultNullValue")
434-
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), nil)
456+
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(b), nil, DimensionsCacheSize)
435457

436458
traces := buildSampleTrace()
437459

@@ -442,10 +464,10 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) {
442464
}
443465
}
444466

445-
func newProcessorImp(mexp *mocks.MetricsConsumer, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, ticker *clock.Ticker) *processorImp {
467+
func newProcessorImp(mexp *mocks.MetricsConsumer, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, ticker *clock.Ticker, cacheSize int) *processorImp {
446468
defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal")
447469
// use size 2 for LRU cache for testing purpose
448-
metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize)
470+
metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](cacheSize)
449471
if err != nil {
450472
panic(err)
451473
}
@@ -979,8 +1001,7 @@ func TestConsumeTracesEvictedCacheKey(t *testing.T) {
9791001
mockClock := clock.NewMock(time.Now())
9801002
ticker := mockClock.NewTicker(time.Nanosecond)
9811003

982-
// Note: default dimension key cache size is 2.
983-
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker)
1004+
p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, zaptest.NewLogger(t), ticker, DimensionsCacheSize)
9841005

9851006
exporters := map[component.DataType]map[component.ID]component.Component{}
9861007

0 commit comments

Comments
 (0)