Skip to content

Add resource metrics cache cleanup #39957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/add-resource-metrics-cache-cleanup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewritereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add LRU cache for resource metrics in prometheusremotewritereceiver.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: To avoid memory leaks, the receiver will now use a LRU cache to store resource metrics.
The less recently used resource metrics are evicted from the cache given a maximum size. For now the maximum size is 1000.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 11 additions & 0 deletions receiver/prometheusremotewritereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,14 @@

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->

# Resource Metrics Cache

`target_info` metrics and "normal" metrics are a match when they have the same job/instance labels (Please read the [specification](https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1) for more details). But these metrics do not always come in the same Remote-Write request. For this reason, the receiver uses an internal LRU (Least Recently Used) and stateless cache implementation to store resource metrics across requests.

The cleanup is based on a maximum size of 1000 resource metrics, and it will be cleaned when the limit is reached, cleaning the less recently used resource metrics.

This approach has some limitations, for example:
- If the process dies or restarts, the cache will be lost.
- Some inconsistencies can happen according to the order of the requests and the current cache size.
- The limit of 1000 resource metrics is hardcoded and not configurable for now.
1 change: 1 addition & 0 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v1.0.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.127.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.127.0
github.com/prometheus/prometheus v0.302.1
Expand Down
2 changes: 2 additions & 0 deletions receiver/prometheusremotewritereceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cespare/xxhash/v2"
"github.com/gogo/protobuf/proto"
lru "github.com/hashicorp/golang-lru/v2"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
Expand All @@ -30,25 +31,30 @@ import (
)

func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
cache, err := lru.New[uint64, pmetric.ResourceMetrics](1000)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
return &prometheusRemoteWriteReceiver{
settings: settings,
nextConsumer: nextConsumer,
config: cfg,
server: &http.Server{
ReadTimeout: 60 * time.Second,
},
rmCache: make(map[uint64]pmetric.ResourceMetrics),
rmCache: cache,
}, nil
}

type prometheusRemoteWriteReceiver struct {
settings receiver.Settings
nextConsumer consumer.Metrics

config *Config
server *http.Server
wg sync.WaitGroup
rmCache map[uint64]pmetric.ResourceMetrics
config *Config
server *http.Server
wg sync.WaitGroup

rmCache *lru.Cache[uint64, pmetric.ResourceMetrics]
}

// MetricIdentity contains all the components that uniquely identify a metric
Expand Down Expand Up @@ -237,7 +243,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))

if existingRM, ok := prw.rmCache[hashedLabels]; ok {
if existingRM, ok := prw.rmCache.Get(hashedLabels); ok {
rm = existingRM
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
Expand All @@ -252,20 +258,20 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
attrs.PutStr(l.Name, l.Value)
}
}
prw.rmCache[hashedLabels] = rm
prw.rmCache.Add(hashedLabels, rm)
continue
}

// For metrics other than target_info, we need to follow the standard process of creating a metric.
var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
existingRM, ok := prw.rmCache[hashedLabels]
existingRM, ok := prw.rmCache.Get(hashedLabels)
if ok {
rm = existingRM
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
prw.rmCache[hashedLabels] = rm
prw.rmCache.Add(hashedLabels, rm)
}

scopeName, scopeVersion := prw.extractScopeInfo(ls)
Expand Down
208 changes: 206 additions & 2 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,14 @@ func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")
writeReceiver := prwReceiver.(*prometheusRemoteWriteReceiver)

return prwReceiver.(*prometheusRemoteWriteReceiver)
// Add cleanup to ensure LRU cache is properly purged
t.Cleanup(func() {
writeReceiver.rmCache.Purge()
})

return writeReceiver
}

func TestHandlePRWContentTypeNegotiation(t *testing.T) {
Expand Down Expand Up @@ -474,7 +480,7 @@ func TestTranslateV2(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
// since we are using the rmCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test
prwReceiver.rmCache = make(map[uint64]pmetric.ResourceMetrics)
prwReceiver.rmCache.Purge()
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
Expand Down Expand Up @@ -654,3 +660,201 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) {
})
}
}

// TestLRUCacheResourceMetrics verifies the LRU cache behavior for resource metrics:
// 1. Caching: Metrics with same job/instance share resource attributes
// 2. Eviction: When cache is full, least recently used entries are evicted
// 3. Reuse: After eviction, same job/instance metrics create new resource attributes
func TestLRUCacheResourceMetrics(t *testing.T) {
prwReceiver := setupMetricsReceiver(t)

// Set a small cache size to emulate the cache eviction
prwReceiver.rmCache.Resize(1)

t.Cleanup(func() {
prwReceiver.rmCache.Purge()
})

// Metric 1.
targetInfoRequest := &writev2.Request{
Symbols: []string{
"",
"job", "production/service_a", // 1, 2
"instance", "host1", // 3, 4
"machine_type", "n1-standard-1", // 5, 6
"cloud_provider", "gcp", // 7, 8
"region", "us-central1", // 9, 10
"__name__", "target_info", // 11, 12
},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
},
},
}

// Metric 1 because it has the same job/instance as the target_info metric.
metric1 := &writev2.Request{
Symbols: []string{
"",
"job", "production/service_a", // 1, 2
"instance", "host1", // 3, 4
"__name__", "normal_metric", // 5, 6
"foo", "bar", // 7, 8
},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
}

// Metric 2. Different job/instance than metric1/target_info.
metric2 := &writev2.Request{
Symbols: []string{
"",
"job", "production/service_b", // 1, 2
"instance", "host2", // 3, 4
"__name__", "different_metric", // 5, 6
"baz", "qux", // 7, 8
},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
},
}

// Metric 1_1. Same job/instance as metric 1. As it will be inserted after cache eviction, it should not be cached.
// And should generate a new resource metric/metric even having the same job/instance than the metric1.
metric1_1 := &writev2.Request{
Symbols: []string{
"",
"job", "production/service_a", // 1, 2
"instance", "host1", // 3, 4
"__name__", "normal_metric2", // 5, 6
"joo", "kar", // 7, 8
},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8},
Samples: []writev2.Sample{{Value: 11, Timestamp: 11}},
},
},
}

// This metric is the result of target_info and metric1.
expectedMetrics1 := func() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
attrs := rm.Resource().Attributes()
attrs.PutStr("service.namespace", "production")
attrs.PutStr("service.name", "service_a")
attrs.PutStr("service.instance.id", "host1")
attrs.PutStr("machine_type", "n1-standard-1")
attrs.PutStr("cloud_provider", "gcp")
attrs.PutStr("region", "us-central1")

sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("OpenTelemetry Collector")
sm.Scope().SetVersion("latest")

m1 := sm.Metrics().AppendEmpty()
m1.SetName("normal_metric")
m1.SetUnit("")
m1.SetDescription("")

dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty()
dp1.SetDoubleValue(1.0)
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
dp1.Attributes().PutStr("foo", "bar")

return metrics
}()

// Result of metric2.
expectedMetrics2 := func() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
attrs := rm.Resource().Attributes()
attrs.PutStr("service.namespace", "production")
attrs.PutStr("service.name", "service_b")
attrs.PutStr("service.instance.id", "host2")

sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("OpenTelemetry Collector")
sm.Scope().SetVersion("latest")

m2 := sm.Metrics().AppendEmpty()
m2.SetName("different_metric")
m2.SetUnit("")

dp1 := m2.SetEmptyGauge().DataPoints().AppendEmpty()
dp1.SetDoubleValue(2.0)
dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
dp1.Attributes().PutStr("baz", "qux")

return metrics
}()

// Result of metric1_1.
expectedMetrics1_1 := func() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
attrs := rm.Resource().Attributes()
attrs.PutStr("service.namespace", "production")
attrs.PutStr("service.name", "service_a")
attrs.PutStr("service.instance.id", "host1")

sm := rm.ScopeMetrics().AppendEmpty()
sm.Scope().SetName("OpenTelemetry Collector")
sm.Scope().SetVersion("latest")

m1_1 := sm.Metrics().AppendEmpty()
m1_1.SetName("normal_metric2")
m1_1.SetUnit("")

dp1_1 := m1_1.SetEmptyGauge().DataPoints().AppendEmpty()
dp1_1.SetDoubleValue(11.0)
dp1_1.SetTimestamp(pcommon.Timestamp(11 * int64(time.Millisecond)))
dp1_1.Attributes().PutStr("joo", "kar")

return metrics
}()

mockConsumer := new(MockConsumer)
prwReceiver.nextConsumer = mockConsumer

ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW))
defer ts.Close()

for _, req := range []*writev2.Request{targetInfoRequest, metric1, metric2, metric1_1} {
pBuf := proto.NewBuffer(nil)
err := pBuf.Marshal(req)
assert.NoError(t, err)

resp, err := http.Post(
ts.URL,
fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
bytes.NewBuffer(pBuf.Bytes()),
)
assert.NoError(t, err)

body, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body))
}

// As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0].
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0]))
// As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2.
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2]))
// As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1.
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[3]))
}
Loading