Skip to content

Commit 0afd75f

Browse files
authored
Add resource metrics cache cleanup (#39957)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add LRU to improve the cache cleanup and avoid memory leak through prw requests <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Related to #37277 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 7f89604 commit 0afd75f

File tree

6 files changed

+263
-11
lines changed

6 files changed

+263
-11
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewritereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add LRU cache for resource metrics in prometheusremotewritereceiver.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37277]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: To avoid memory leaks, the receiver will now use a LRU cache to store resource metrics.
19+
The less recently used resource metrics are evicted from the cache given a maximum size. For now the maximum size is 1000.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

receiver/prometheusremotewritereceiver/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,14 @@
1111

1212
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1313
<!-- end autogenerated section -->
14+
15+
# Resource Metrics Cache
16+
17+
`target_info` metrics and "normal" metrics are a match when they have the same job/instance labels (Please read the [specification](https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1) for more details). But these metrics do not always come in the same Remote-Write request. For this reason, the receiver uses an internal LRU (Least Recently Used) and stateless cache implementation to store resource metrics across requests.
18+
19+
The cleanup is based on a maximum size of 1000 resource metrics, and it will be cleaned when the limit is reached, cleaning the less recently used resource metrics.
20+
21+
This approach has some limitations, for example:
22+
- If the process dies or restarts, the cache will be lost.
23+
- Some inconsistencies can happen according to the order of the requests and the current cache size.
24+
- The limit of 1000 resource metrics is hardcoded and not configurable for now.

receiver/prometheusremotewritereceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/cespare/xxhash/v2 v2.3.0
77
github.com/gogo/protobuf v1.3.2
88
github.com/golang/snappy v1.0.0
9+
github.com/hashicorp/golang-lru/v2 v2.0.7
910
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.127.0
1011
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.127.0
1112
github.com/prometheus/prometheus v0.302.1

receiver/prometheusremotewritereceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cespare/xxhash/v2"
1616
"github.com/gogo/protobuf/proto"
17+
lru "github.com/hashicorp/golang-lru/v2"
1718
promconfig "github.com/prometheus/prometheus/config"
1819
"github.com/prometheus/prometheus/model/labels"
1920
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
@@ -30,25 +31,30 @@ import (
3031
)
3132

3233
func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
34+
cache, err := lru.New[uint64, pmetric.ResourceMetrics](1000)
35+
if err != nil {
36+
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
37+
}
3338
return &prometheusRemoteWriteReceiver{
3439
settings: settings,
3540
nextConsumer: nextConsumer,
3641
config: cfg,
3742
server: &http.Server{
3843
ReadTimeout: 60 * time.Second,
3944
},
40-
rmCache: make(map[uint64]pmetric.ResourceMetrics),
45+
rmCache: cache,
4146
}, nil
4247
}
4348

4449
type prometheusRemoteWriteReceiver struct {
4550
settings receiver.Settings
4651
nextConsumer consumer.Metrics
4752

48-
config *Config
49-
server *http.Server
50-
wg sync.WaitGroup
51-
rmCache map[uint64]pmetric.ResourceMetrics
53+
config *Config
54+
server *http.Server
55+
wg sync.WaitGroup
56+
57+
rmCache *lru.Cache[uint64, pmetric.ResourceMetrics]
5258
}
5359

5460
// MetricIdentity contains all the components that uniquely identify a metric
@@ -237,7 +243,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
237243
var rm pmetric.ResourceMetrics
238244
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
239245

240-
if existingRM, ok := prw.rmCache[hashedLabels]; ok {
246+
if existingRM, ok := prw.rmCache.Get(hashedLabels); ok {
241247
rm = existingRM
242248
} else {
243249
rm = otelMetrics.ResourceMetrics().AppendEmpty()
@@ -252,20 +258,20 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
252258
attrs.PutStr(l.Name, l.Value)
253259
}
254260
}
255-
prw.rmCache[hashedLabels] = rm
261+
prw.rmCache.Add(hashedLabels, rm)
256262
continue
257263
}
258264

259265
// For metrics other than target_info, we need to follow the standard process of creating a metric.
260266
var rm pmetric.ResourceMetrics
261267
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
262-
existingRM, ok := prw.rmCache[hashedLabels]
268+
existingRM, ok := prw.rmCache.Get(hashedLabels)
263269
if ok {
264270
rm = existingRM
265271
} else {
266272
rm = otelMetrics.ResourceMetrics().AppendEmpty()
267273
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
268-
prw.rmCache[hashedLabels] = rm
274+
prw.rmCache.Add(hashedLabels, rm)
269275
}
270276

271277
scopeName, scopeVersion := prw.extractScopeInfo(ls)

receiver/prometheusremotewritereceiver/receiver_test.go

Lines changed: 206 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,14 @@ func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
6363
prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
6464
assert.NoError(t, err)
6565
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")
66+
writeReceiver := prwReceiver.(*prometheusRemoteWriteReceiver)
6667

67-
return prwReceiver.(*prometheusRemoteWriteReceiver)
68+
// Add cleanup to ensure LRU cache is properly purged
69+
t.Cleanup(func() {
70+
writeReceiver.rmCache.Purge()
71+
})
72+
73+
return writeReceiver
6874
}
6975

7076
func TestHandlePRWContentTypeNegotiation(t *testing.T) {
@@ -474,7 +480,7 @@ func TestTranslateV2(t *testing.T) {
474480
} {
475481
t.Run(tc.name, func(t *testing.T) {
476482
// since we are using the rmCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test
477-
prwReceiver.rmCache = make(map[uint64]pmetric.ResourceMetrics)
483+
prwReceiver.rmCache.Purge()
478484
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
479485
if tc.expectError != "" {
480486
assert.ErrorContains(t, err, tc.expectError)
@@ -654,3 +660,201 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) {
654660
})
655661
}
656662
}
663+
664+
// TestLRUCacheResourceMetrics verifies the LRU cache behavior for resource metrics:
665+
// 1. Caching: Metrics with same job/instance share resource attributes
666+
// 2. Eviction: When cache is full, least recently used entries are evicted
667+
// 3. Reuse: After eviction, same job/instance metrics create new resource attributes
668+
func TestLRUCacheResourceMetrics(t *testing.T) {
669+
prwReceiver := setupMetricsReceiver(t)
670+
671+
// Set a small cache size to emulate the cache eviction
672+
prwReceiver.rmCache.Resize(1)
673+
674+
t.Cleanup(func() {
675+
prwReceiver.rmCache.Purge()
676+
})
677+
678+
// Metric 1.
679+
targetInfoRequest := &writev2.Request{
680+
Symbols: []string{
681+
"",
682+
"job", "production/service_a", // 1, 2
683+
"instance", "host1", // 3, 4
684+
"machine_type", "n1-standard-1", // 5, 6
685+
"cloud_provider", "gcp", // 7, 8
686+
"region", "us-central1", // 9, 10
687+
"__name__", "target_info", // 11, 12
688+
},
689+
Timeseries: []writev2.TimeSeries{
690+
{
691+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
692+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
693+
},
694+
},
695+
}
696+
697+
// Metric 1 because it has the same job/instance as the target_info metric.
698+
metric1 := &writev2.Request{
699+
Symbols: []string{
700+
"",
701+
"job", "production/service_a", // 1, 2
702+
"instance", "host1", // 3, 4
703+
"__name__", "normal_metric", // 5, 6
704+
"foo", "bar", // 7, 8
705+
},
706+
Timeseries: []writev2.TimeSeries{
707+
{
708+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
709+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
710+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
711+
},
712+
},
713+
}
714+
715+
// Metric 2. Different job/instance than metric1/target_info.
716+
metric2 := &writev2.Request{
717+
Symbols: []string{
718+
"",
719+
"job", "production/service_b", // 1, 2
720+
"instance", "host2", // 3, 4
721+
"__name__", "different_metric", // 5, 6
722+
"baz", "qux", // 7, 8
723+
},
724+
Timeseries: []writev2.TimeSeries{
725+
{
726+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
727+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
728+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
729+
},
730+
},
731+
}
732+
733+
// Metric 1_1. Same job/instance as metric 1. As it will be inserted after cache eviction, it should not be cached.
734+
// And should generate a new resource metric/metric even having the same job/instance than the metric1.
735+
metric1_1 := &writev2.Request{
736+
Symbols: []string{
737+
"",
738+
"job", "production/service_a", // 1, 2
739+
"instance", "host1", // 3, 4
740+
"__name__", "normal_metric2", // 5, 6
741+
"joo", "kar", // 7, 8
742+
},
743+
Timeseries: []writev2.TimeSeries{
744+
{
745+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
746+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
747+
Samples: []writev2.Sample{{Value: 11, Timestamp: 11}},
748+
},
749+
},
750+
}
751+
752+
// This metric is the result of target_info and metric1.
753+
expectedMetrics1 := func() pmetric.Metrics {
754+
metrics := pmetric.NewMetrics()
755+
rm := metrics.ResourceMetrics().AppendEmpty()
756+
attrs := rm.Resource().Attributes()
757+
attrs.PutStr("service.namespace", "production")
758+
attrs.PutStr("service.name", "service_a")
759+
attrs.PutStr("service.instance.id", "host1")
760+
attrs.PutStr("machine_type", "n1-standard-1")
761+
attrs.PutStr("cloud_provider", "gcp")
762+
attrs.PutStr("region", "us-central1")
763+
764+
sm := rm.ScopeMetrics().AppendEmpty()
765+
sm.Scope().SetName("OpenTelemetry Collector")
766+
sm.Scope().SetVersion("latest")
767+
768+
m1 := sm.Metrics().AppendEmpty()
769+
m1.SetName("normal_metric")
770+
m1.SetUnit("")
771+
m1.SetDescription("")
772+
773+
dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty()
774+
dp1.SetDoubleValue(1.0)
775+
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
776+
dp1.Attributes().PutStr("foo", "bar")
777+
778+
return metrics
779+
}()
780+
781+
// Result of metric2.
782+
expectedMetrics2 := func() pmetric.Metrics {
783+
metrics := pmetric.NewMetrics()
784+
rm := metrics.ResourceMetrics().AppendEmpty()
785+
attrs := rm.Resource().Attributes()
786+
attrs.PutStr("service.namespace", "production")
787+
attrs.PutStr("service.name", "service_b")
788+
attrs.PutStr("service.instance.id", "host2")
789+
790+
sm := rm.ScopeMetrics().AppendEmpty()
791+
sm.Scope().SetName("OpenTelemetry Collector")
792+
sm.Scope().SetVersion("latest")
793+
794+
m2 := sm.Metrics().AppendEmpty()
795+
m2.SetName("different_metric")
796+
m2.SetUnit("")
797+
798+
dp1 := m2.SetEmptyGauge().DataPoints().AppendEmpty()
799+
dp1.SetDoubleValue(2.0)
800+
dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
801+
dp1.Attributes().PutStr("baz", "qux")
802+
803+
return metrics
804+
}()
805+
806+
// Result of metric1_1.
807+
expectedMetrics1_1 := func() pmetric.Metrics {
808+
metrics := pmetric.NewMetrics()
809+
rm := metrics.ResourceMetrics().AppendEmpty()
810+
attrs := rm.Resource().Attributes()
811+
attrs.PutStr("service.namespace", "production")
812+
attrs.PutStr("service.name", "service_a")
813+
attrs.PutStr("service.instance.id", "host1")
814+
815+
sm := rm.ScopeMetrics().AppendEmpty()
816+
sm.Scope().SetName("OpenTelemetry Collector")
817+
sm.Scope().SetVersion("latest")
818+
819+
m1_1 := sm.Metrics().AppendEmpty()
820+
m1_1.SetName("normal_metric2")
821+
m1_1.SetUnit("")
822+
823+
dp1_1 := m1_1.SetEmptyGauge().DataPoints().AppendEmpty()
824+
dp1_1.SetDoubleValue(11.0)
825+
dp1_1.SetTimestamp(pcommon.Timestamp(11 * int64(time.Millisecond)))
826+
dp1_1.Attributes().PutStr("joo", "kar")
827+
828+
return metrics
829+
}()
830+
831+
mockConsumer := new(MockConsumer)
832+
prwReceiver.nextConsumer = mockConsumer
833+
834+
ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW))
835+
defer ts.Close()
836+
837+
for _, req := range []*writev2.Request{targetInfoRequest, metric1, metric2, metric1_1} {
838+
pBuf := proto.NewBuffer(nil)
839+
err := pBuf.Marshal(req)
840+
assert.NoError(t, err)
841+
842+
resp, err := http.Post(
843+
ts.URL,
844+
fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
845+
bytes.NewBuffer(pBuf.Bytes()),
846+
)
847+
assert.NoError(t, err)
848+
849+
body, err := io.ReadAll(resp.Body)
850+
assert.NoError(t, err)
851+
assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body))
852+
}
853+
854+
// As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0].
855+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0]))
856+
// As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2.
857+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2]))
858+
// As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1.
859+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[3]))
860+
}

0 commit comments

Comments
 (0)