-
Notifications
You must be signed in to change notification settings - Fork 2.9k
elasticsearchexporter: Introduce LRU cache for profiles #38606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
0d7d076
e663670
fdcebae
2cb32a0
6b02110
18755fb
a848fc1
8a93a3f
55998a9
f263b61
9c10cd5
2b4184f
2ad815b
ed5bdc2
df27a37
988758f
d4cf4ca
7ed361b
57ac2c0
6c95df8
1aa5864
62c6ff3
d37dc4d
00ed712
d945545
d52eedb
401b336
60dd3e4
18babf8
3f52cdc
4001866
9747793
d6365d9
3ab0f03
74dd183
bbbdaa4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: elasticsearchexporter | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Introduce LRU cache for profiles | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [38606] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package lru // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru" | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/cespare/xxhash" | ||
"github.com/elastic/go-freelru" | ||
"go.opentelemetry.io/ebpf-profiler/libpf/xsync" | ||
) | ||
|
||
type void struct{} | ||
|
||
func stringHashFn(s string) uint32 { | ||
return uint32(xxhash.Sum64String(s)) | ||
} | ||
|
||
// LockedLRUSet is the interface provided to the LRUSet once a lock has been | ||
// acquired. | ||
type LockedLRUSet interface { | ||
// CheckAndAdd checks whether the entry is already stored in the cache, and | ||
// adds it. | ||
// It returns whether the entry should be excluded, as it was already present | ||
// in cache. | ||
CheckAndAdd(entry string) bool | ||
} | ||
|
||
// LRUSet is an LRU cache implementation that allows acquiring a lock, and | ||
// checking whether specific keys have already been stored. | ||
type LRUSet struct { | ||
syncMu *xsync.RWMutex[*freelru.LRU[string, void]] | ||
} | ||
|
||
func (l *LRUSet) WithLock(fn func(LockedLRUSet) error) error { | ||
if l == nil || l.syncMu == nil { | ||
return fn(nilLockedLRUSet{}) | ||
} | ||
dmathieu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
lru := l.syncMu.WLock() | ||
defer l.syncMu.WUnlock(&lru) | ||
|
||
return fn(lockedLRUSet{*lru}) | ||
} | ||
|
||
func NewLRUSet(size uint32, rollover time.Duration) (*LRUSet, error) { | ||
lru, err := freelru.New[string, void](size, stringHashFn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
lru.SetLifetime(rollover) | ||
|
||
syncMu := xsync.NewRWMutex(lru) | ||
return &LRUSet{syncMu: &syncMu}, nil | ||
} | ||
|
||
type lockedLRUSet struct { | ||
lru *freelru.LRU[string, void] | ||
} | ||
|
||
func (l lockedLRUSet) CheckAndAdd(entry string) (excluded bool) { | ||
if _, exclude := (l.lru).Get(entry); exclude { | ||
return true | ||
} | ||
(l.lru).Add(entry, void{}) | ||
return false | ||
} | ||
|
||
type nilLockedLRUSet struct{} | ||
|
||
func (l nilLockedLRUSet) CheckAndAdd(string) bool { | ||
return false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package lru | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestLRUSet(t *testing.T) { | ||
cache, err := NewLRUSet(5, time.Minute) | ||
require.NoError(t, err) | ||
|
||
err = cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
assert.True(t, lock.CheckAndAdd("a")) | ||
assert.False(t, lock.CheckAndAdd("b")) | ||
|
||
assert.InDelta(t, 0.0, testing.AllocsPerRun(5, func() { | ||
dmathieu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_ = lock.CheckAndAdd("c") | ||
}), 0) | ||
|
||
return nil | ||
}) | ||
|
||
assert.NoError(t, err) | ||
} | ||
|
||
func TestLRUSetLifeTime(t *testing.T) { | ||
const lifetime = 100 * time.Millisecond | ||
cache, err := NewLRUSet(5, lifetime) | ||
require.NoError(t, err) | ||
|
||
err = cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
assert.True(t, lock.CheckAndAdd("a")) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Wait until cache item is expired. | ||
time.Sleep(lifetime) | ||
err = cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
assert.True(t, lock.CheckAndAdd("a")) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Wait 50% of the lifetime, so the item is not expired. | ||
time.Sleep(lifetime / 2) | ||
err = cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.True(t, lock.CheckAndAdd("a")) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
// Wait another 50% of the lifetime, so the item should be expired. | ||
time.Sleep(lifetime / 2) | ||
err = cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
return nil | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
func TestNilLRUSet(t *testing.T) { | ||
cache := &LRUSet{} | ||
|
||
err := cache.WithLock(func(lock LockedLRUSet) error { | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
assert.False(t, lock.CheckAndAdd("a")) | ||
assert.False(t, lock.CheckAndAdd("b")) | ||
|
||
assert.InDelta(t, 0.0, testing.AllocsPerRun(5, func() { | ||
_ = lock.CheckAndAdd("c") | ||
}), 0) | ||
|
||
return nil | ||
}) | ||
|
||
assert.NoError(t, err) | ||
} | ||
|
||
func BenchmarkLRUSetCheck(b *testing.B) { | ||
cache, err := NewLRUSet(5, time.Minute) | ||
require.NoError(b, err) | ||
|
||
_ = cache.WithLock(func(lock LockedLRUSet) error { | ||
b.ReportAllocs() | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
lock.CheckAndAdd("a") | ||
} | ||
|
||
return nil | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,34 @@ | |
|
||
package otelserializer // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer" | ||
|
||
type Serializer struct{} | ||
import ( | ||
"sync" | ||
"time" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru" | ||
) | ||
|
||
const ( | ||
knownExecutablesCacheSize = 16 * 1024 | ||
knownFramesCacheSize = 128 * 1024 | ||
knownTracesCacheSize = 128 * 1024 | ||
knownUnsymbolizedFramesCacheSize = 128 * 1024 | ||
knownUnsymbolizedExecutablesCacheSize = 16 * 1024 | ||
|
||
minILMRolloverTime = 3 * time.Hour | ||
) | ||
|
||
type Serializer struct { | ||
// Data cache for profiles | ||
loadLRUsOnce sync.Once | ||
knownTraces *lru.LRUSet | ||
knownFrames *lru.LRUSet | ||
knownExecutables *lru.LRUSet | ||
knownUnsymbolizedFrames *lru.LRUSet | ||
knownUnsymbolizedExecutables *lru.LRUSet | ||
} | ||
|
||
// New builds a new Serializer | ||
func New() *Serializer { | ||
return &Serializer{} | ||
func New() (*Serializer, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this New is shared by all signal types, but is called once per signal type IIUC. Given There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, unless we start having per-signal exporter structs, we don't really have a solution here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're talking about for each LRUSet
for Let's say each element is 28B, it will be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's ~ correct. With the suggestion to reduce the size for two of the 5, it would be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. go bench is not the correct tool to measure the memory usage, as it gets averaged out over b.N. I compiled contrib with and without this PR, and i use this config:
before:
after:
To confirm it is related to the number of pipelines, I added 2 more pipelines with different es exporter configs:
This time the rss increases to 269MiB. This pretty much matches up with our math above. This is a significant resource increase and needs to be resolved before moving forward with this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed the implementation to only create the LRUs when they are needed. |
||
return &Serializer{}, nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.