Skip to content

Commit e254878

Browse files
bripkensJaredTan95
andauthored
[connector/spanmetrics] Fix memory leak (#28847)
# Why The `spanmetrics` connector has a memory leak that occurs when the cumulative temporality is used. The `connectorImp#resourceMetrics` map is only ever cleaned up in delta temporality. # What Turn `connectorImp#resourceMetrics` into a LRU cache with a maximum size. To correctly handle metric resets we also introduce a start timestamp per `resourceMetric` instance. # References Fixes #27654 Co-authored-by: Jared Tan <[email protected]>
1 parent d189c00 commit e254878

File tree

9 files changed

+138
-55
lines changed

9 files changed

+138
-55
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: connector/spanmetrics
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Fix memory leak when the cumulative temporality is used.
9+
10+
# One or more tracking issues related to the change
11+
issues: [27654]

connector/spanmetricsconnector/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ The following settings can be optionally configured:
109109
If no `default` is provided, this dimension will be **omitted** from the metric.
110110
- `exclude_dimensions`: the list of dimensions to be excluded from the default set of dimensions. Use to exclude unneeded data from metrics.
111111
- `dimensions_cache_size` (default: `1000`): the size of cache for storing Dimensions to improve collectors memory usage. Must be a positive number.
112+
- `resource_metrics_cache_size` (default: `1000`): the size of the cache holding metrics for a service. This is mostly relevant for
113+
cumulative temporality to avoid memory leaks and correct metric timestamp resets.
112114
- `aggregation_temporality` (default: `AGGREGATION_TEMPORALITY_CUMULATIVE`): Defines the aggregation temporality of the generated metrics.
113115
One of either `AGGREGATION_TEMPORALITY_CUMULATIVE` or `AGGREGATION_TEMPORALITY_DELTA`.
114116
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.

connector/spanmetricsconnector/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type Config struct {
4646
// Optional. See defaultDimensionsCacheSize in connector.go for the default value.
4747
DimensionsCacheSize int `mapstructure:"dimensions_cache_size"`
4848

49+
// ResourceMetricsCacheSize defines the size of the cache holding metrics for a service. This is mostly relevant for
50+
// cumulative temporality to avoid memory leaks and correct metric timestamp resets.
51+
// Optional. See defaultResourceMetricsCacheSize in connector.go for the default value.
52+
ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"`
53+
4954
AggregationTemporality string `mapstructure:"aggregation_temporality"`
5055

5156
Histogram HistogramConfig `mapstructure:"histogram"`

connector/spanmetricsconnector/config_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ func TestLoadConfig(t *testing.T) {
4747
{Name: "http.method", Default: &defaultMethod},
4848
{Name: "http.status_code", Default: (*string)(nil)},
4949
},
50-
DimensionsCacheSize: 1500,
51-
MetricsFlushInterval: 30 * time.Second,
50+
DimensionsCacheSize: 1500,
51+
ResourceMetricsCacheSize: 1600,
52+
MetricsFlushInterval: 30 * time.Second,
5253
Exemplars: ExemplarsConfig{
5354
Enabled: true,
5455
},
@@ -66,9 +67,10 @@ func TestLoadConfig(t *testing.T) {
6667
{
6768
id: component.NewIDWithName(metadata.Type, "exponential_histogram"),
6869
expected: &Config{
69-
AggregationTemporality: cumulative,
70-
DimensionsCacheSize: 1000,
71-
MetricsFlushInterval: 15 * time.Second,
70+
AggregationTemporality: cumulative,
71+
DimensionsCacheSize: defaultDimensionsCacheSize,
72+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
73+
MetricsFlushInterval: 15 * time.Second,
7274
Histogram: HistogramConfig{
7375
Unit: metrics.Milliseconds,
7476
Exponential: &ExponentialHistogramConfig{
@@ -88,11 +90,12 @@ func TestLoadConfig(t *testing.T) {
8890
{
8991
id: component.NewIDWithName(metadata.Type, "exemplars_enabled"),
9092
expected: &Config{
91-
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
92-
DimensionsCacheSize: defaultDimensionsCacheSize,
93-
MetricsFlushInterval: 15 * time.Second,
94-
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
95-
Exemplars: ExemplarsConfig{Enabled: true},
93+
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
94+
DimensionsCacheSize: defaultDimensionsCacheSize,
95+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
96+
MetricsFlushInterval: 15 * time.Second,
97+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
98+
Exemplars: ExemplarsConfig{Enabled: true},
9699
},
97100
},
98101
}

connector/spanmetricsconnector/connector.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ const (
3232
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
3333
metricKeySeparator = string(byte(0))
3434

35-
defaultDimensionsCacheSize = 1000
35+
defaultDimensionsCacheSize = 1000
36+
defaultResourceMetricsCacheSize = 1000
3637

3738
metricNameDuration = "duration"
3839
metricNameCalls = "calls"
@@ -51,10 +52,7 @@ type connectorImp struct {
5152
// Additional dimensions to add to metrics.
5253
dimensions []dimension
5354

54-
// The starting time of the data points.
55-
startTimestamp pcommon.Timestamp
56-
57-
resourceMetrics map[resourceKey]*resourceMetrics
55+
resourceMetrics *cache.Cache[resourceKey, *resourceMetrics]
5856

5957
keyBuf *bytes.Buffer
6058

@@ -79,6 +77,8 @@ type resourceMetrics struct {
7977
sums metrics.SumMetrics
8078
events metrics.SumMetrics
8179
attributes pcommon.Map
80+
// startTimestamp captures when the first data points for this resource are recorded.
81+
startTimestamp pcommon.Timestamp
8282
}
8383

8484
type dimension struct {
@@ -110,11 +110,15 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
110110
return nil, err
111111
}
112112

113+
resourceMetricsCache, err := cache.NewCache[resourceKey, *resourceMetrics](cfg.ResourceMetricsCacheSize)
114+
if err != nil {
115+
return nil, err
116+
}
117+
113118
return &connectorImp{
114119
logger: logger,
115120
config: *cfg,
116-
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
117-
resourceMetrics: make(map[resourceKey]*resourceMetrics),
121+
resourceMetrics: resourceMetricsCache,
118122
dimensions: newDimensions(cfg.Dimensions),
119123
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
120124
metricKeyToDimensions: metricKeyToDimensionsCache,
@@ -236,7 +240,8 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
236240
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
237241
func (p *connectorImp) buildMetrics() pmetric.Metrics {
238242
m := pmetric.NewMetrics()
239-
for _, rawMetrics := range p.resourceMetrics {
243+
244+
p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
240245
rm := m.ResourceMetrics().AppendEmpty()
241246
rawMetrics.attributes.CopyTo(rm.Resource().Attributes())
242247

@@ -246,42 +251,42 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
246251
sums := rawMetrics.sums
247252
metric := sm.Metrics().AppendEmpty()
248253
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
249-
sums.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
254+
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
250255
if !p.config.Histogram.Disable {
251256
histograms := rawMetrics.histograms
252257
metric = sm.Metrics().AppendEmpty()
253258
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
254259
metric.SetUnit(p.config.Histogram.Unit.String())
255-
histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
260+
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
256261
}
257262

258263
events := rawMetrics.events
259264
if p.events.Enabled {
260265
metric = sm.Metrics().AppendEmpty()
261266
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
262-
events.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
267+
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
263268
}
264-
}
269+
})
265270

266271
return m
267272
}
268273

269274
func (p *connectorImp) resetState() {
270275
// If delta metrics, reset accumulated data
271276
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
272-
p.resourceMetrics = make(map[resourceKey]*resourceMetrics)
277+
p.resourceMetrics.Purge()
273278
p.metricKeyToDimensions.Purge()
274-
p.startTimestamp = pcommon.NewTimestampFromTime(time.Now())
275279
} else {
280+
p.resourceMetrics.RemoveEvictedItems()
276281
p.metricKeyToDimensions.RemoveEvictedItems()
277282

278283
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
279284
if p.config.Histogram.Disable {
280285
return
281286
}
282-
for _, m := range p.resourceMetrics {
287+
p.resourceMetrics.ForEach(func(_ resourceKey, m *resourceMetrics) {
283288
m.histograms.Reset(true)
284-
}
289+
})
285290

286291
}
287292
}
@@ -387,15 +392,16 @@ type resourceKey [16]byte
387392

388393
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
389394
key := resourceKey(pdatautil.MapHash(attr))
390-
v, ok := p.resourceMetrics[key]
395+
v, ok := p.resourceMetrics.Get(key)
391396
if !ok {
392397
v = &resourceMetrics{
393-
histograms: initHistogramMetrics(p.config),
394-
sums: metrics.NewSumMetrics(),
395-
events: metrics.NewSumMetrics(),
396-
attributes: attr,
398+
histograms: initHistogramMetrics(p.config),
399+
sums: metrics.NewSumMetrics(),
400+
events: metrics.NewSumMetrics(),
401+
attributes: attr,
402+
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
397403
}
398-
p.resourceMetrics[key] = v
404+
p.resourceMetrics.Add(key, v)
399405
}
400406
return v
401407
}

connector/spanmetricsconnector/connector_test.go

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,19 @@ import (
3232
)
3333

3434
const (
35-
stringAttrName = "stringAttrName"
36-
intAttrName = "intAttrName"
37-
doubleAttrName = "doubleAttrName"
38-
boolAttrName = "boolAttrName"
39-
nullAttrName = "nullAttrName"
40-
mapAttrName = "mapAttrName"
41-
arrayAttrName = "arrayAttrName"
42-
notInSpanAttrName0 = "shouldBeInMetric"
43-
notInSpanAttrName1 = "shouldNotBeInMetric"
44-
regionResourceAttrName = "region"
45-
exceptionTypeAttrName = "exception.type"
46-
DimensionsCacheSize = 2
35+
stringAttrName = "stringAttrName"
36+
intAttrName = "intAttrName"
37+
doubleAttrName = "doubleAttrName"
38+
boolAttrName = "boolAttrName"
39+
nullAttrName = "nullAttrName"
40+
mapAttrName = "mapAttrName"
41+
arrayAttrName = "arrayAttrName"
42+
notInSpanAttrName0 = "shouldBeInMetric"
43+
notInSpanAttrName1 = "shouldNotBeInMetric"
44+
regionResourceAttrName = "region"
45+
exceptionTypeAttrName = "exception.type"
46+
dimensionsCacheSize = 2
47+
resourceMetricsCacheSize = 5
4748

4849
sampleRegion = "us-east-1"
4950
sampleDuration = float64(11)
@@ -881,7 +882,7 @@ func TestMetricKeyCache(t *testing.T) {
881882
require.NoError(t, err)
882883
// 2 key was cached, 1 key was evicted and cleaned after the processing
883884
assert.Eventually(t, func() bool {
884-
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
885+
return p.metricKeyToDimensions.Len() == dimensionsCacheSize
885886
}, 10*time.Second, time.Millisecond*100)
886887

887888
// consume another batch of traces
@@ -890,10 +891,48 @@ func TestMetricKeyCache(t *testing.T) {
890891

891892
// 2 key was cached, other keys were evicted and cleaned after the processing
892893
assert.Eventually(t, func() bool {
893-
return p.metricKeyToDimensions.Len() == DimensionsCacheSize
894+
return p.metricKeyToDimensions.Len() == dimensionsCacheSize
894895
}, 10*time.Second, time.Millisecond*100)
895896
}
896897

898+
func TestResourceMetricsCache(t *testing.T) {
899+
mcon := consumertest.NewNop()
900+
901+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
902+
903+
// Test
904+
ctx := metadata.NewIncomingContext(context.Background(), nil)
905+
906+
// 0 resources in the beginning
907+
assert.Zero(t, p.resourceMetrics.Len())
908+
909+
err := p.ConsumeTraces(ctx, buildSampleTrace())
910+
// Validate
911+
require.NoError(t, err)
912+
assert.Equal(t, 2, p.resourceMetrics.Len())
913+
914+
// consume another batch of traces for the same resources
915+
err = p.ConsumeTraces(ctx, buildSampleTrace())
916+
require.NoError(t, err)
917+
assert.Equal(t, 2, p.resourceMetrics.Len())
918+
919+
// consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded
920+
for i := 0; i < resourceMetricsCacheSize; i++ {
921+
traces := buildSampleTrace()
922+
923+
// add resource attributes to simulate additional resources providing data
924+
for j := 0; j < traces.ResourceSpans().Len(); j++ {
925+
traces.ResourceSpans().At(j).Resource().Attributes().PutStr("dummy", fmt.Sprintf("%d", i))
926+
}
927+
928+
err = p.ConsumeTraces(ctx, traces)
929+
require.NoError(t, err)
930+
}
931+
932+
// validate that the cache doesn't grow past its limit
933+
assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len())
934+
}
935+
897936
func BenchmarkConnectorConsumeTraces(b *testing.B) {
898937
// Prepare
899938
mcon := consumertest.NewNop()
@@ -964,11 +1003,12 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {
9641003
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
9651004

9661005
cfg := &Config{
967-
AggregationTemporality: temporality,
968-
Histogram: histogramConfig(),
969-
Exemplars: exemplarsConfig(),
970-
ExcludeDimensions: excludedDimensions,
971-
DimensionsCacheSize: DimensionsCacheSize,
1006+
AggregationTemporality: temporality,
1007+
Histogram: histogramConfig(),
1008+
Exemplars: exemplarsConfig(),
1009+
ExcludeDimensions: excludedDimensions,
1010+
DimensionsCacheSize: dimensionsCacheSize,
1011+
ResourceMetricsCacheSize: resourceMetricsCacheSize,
9721012
Dimensions: []Dimension{
9731013
// Set nil defaults to force a lookup for the attribute in the span.
9741014
{stringAttrName, nil},

connector/spanmetricsconnector/factory.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ func NewFactory() connector.Factory {
2828

2929
func createDefaultConfig() component.Config {
3030
return &Config{
31-
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
32-
DimensionsCacheSize: defaultDimensionsCacheSize,
33-
MetricsFlushInterval: 15 * time.Second,
34-
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
31+
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
32+
DimensionsCacheSize: defaultDimensionsCacheSize,
33+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
34+
MetricsFlushInterval: 15 * time.Second,
35+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
3536
}
3637
}
3738

connector/spanmetricsconnector/internal/cache/cache.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,17 @@ func (c *Cache[K, V]) Purge() {
7474
c.lru.Purge()
7575
c.RemoveEvictedItems()
7676
}
77+
78+
// ForEach iterates over all the items within the cache, as well as the evicted items (if any).
79+
func (c *Cache[K, V]) ForEach(fn func(k K, v V)) {
80+
for _, k := range c.lru.Keys() {
81+
v, ok := c.lru.Get(k)
82+
if ok {
83+
fn(k.(K), v.(V))
84+
}
85+
}
86+
87+
for k, v := range c.evictedItems {
88+
fn(k, v)
89+
}
90+
}

connector/spanmetricsconnector/testdata/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ spanmetrics/full:
1515
exemplars:
1616
enabled: true
1717
dimensions_cache_size: 1500
18+
resource_metrics_cache_size: 1600
1819

1920
# Additional list of dimensions on top of:
2021
# - service.name

0 commit comments

Comments
 (0)