Skip to content

Commit f9b1f51

Browse files
committed
introduce lru cache for profiles
1 parent 5d32648 commit f9b1f51

File tree

6 files changed

+232
-20
lines changed

6 files changed

+232
-20
lines changed

exporter/elasticsearchexporter/exporter.go

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"time"
1112

13+
"github.com/cespare/xxhash"
1214
"github.com/elastic/go-docappender/v2"
15+
"github.com/elastic/go-freelru"
1316
"go.opentelemetry.io/collector/client"
1417
"go.opentelemetry.io/collector/component"
1518
"go.opentelemetry.io/collector/exporter"
@@ -22,10 +25,26 @@ import (
2225

2326
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"
2427
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru"
2529
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
2630
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer"
2731
)
2832

33+
const (
34+
KiB = 1024
35+
MiB = 1024 * KiB
36+
37+
knownExecutablesCacheSize = 128 * KiB
38+
knownFramesCacheSize = 128 * KiB
39+
knownTracesCacheSize = 128 * KiB
40+
41+
minILMRolloverTime = 3 * time.Hour
42+
)
43+
44+
var stringHashFn = func(s string) uint32 {
45+
return uint32(xxhash.Sum64String(s))
46+
}
47+
2948
type elasticsearchExporter struct {
3049
set exporter.Settings
3150
config *Config
@@ -36,16 +55,41 @@ type elasticsearchExporter struct {
3655
allowedMappingModes map[string]MappingMode
3756
bulkIndexers bulkIndexers
3857
bufferPool *pool.BufferPool
58+
59+
// Data cache for profiles
60+
knownTraces *lru.LRUSet[string]
61+
knownFrames *lru.LRUSet[string]
62+
knownExecutables *lru.LRUSet[string]
3963
}
4064

4165
func newExporter(
4266
cfg *Config,
4367
set exporter.Settings,
4468
index string,
4569
dynamicIndex bool,
46-
) *elasticsearchExporter {
70+
) (*elasticsearchExporter, error) {
4771
allowedMappingModes := cfg.allowedMappingModes()
4872
defaultMappingMode := allowedMappingModes[canonicalMappingModeName(cfg.Mapping.Mode)]
73+
74+
// Create LRUs with MinILMRolloverTime as lifetime to avoid losing data by ILM roll-over.
75+
knownTraces, err := freelru.New[string, lru.Void](knownTracesCacheSize, stringHashFn)
76+
if err != nil {
77+
return nil, fmt.Errorf("failed to create traces LRU: %w", err)
78+
}
79+
knownTraces.SetLifetime(minILMRolloverTime)
80+
81+
knownFrames, err := freelru.New[string, lru.Void](knownFramesCacheSize, stringHashFn)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create frames LRU: %w", err)
84+
}
85+
knownFrames.SetLifetime(minILMRolloverTime)
86+
87+
knownExecutables, err := freelru.New[string, lru.Void](knownExecutablesCacheSize, stringHashFn)
88+
if err != nil {
89+
return nil, fmt.Errorf("failed to create executables LRU: %w", err)
90+
}
91+
knownExecutables.SetLifetime(minILMRolloverTime)
92+
4993
return &elasticsearchExporter{
5094
set: set,
5195
config: cfg,
@@ -55,7 +99,11 @@ func newExporter(
5599
allowedMappingModes: allowedMappingModes,
56100
defaultMappingMode: defaultMappingMode,
57101
bufferPool: pool.NewBufferPool(),
58-
}
102+
103+
knownTraces: lru.NewLRUSet(knownTraces),
104+
knownFrames: lru.NewLRUSet(knownFrames),
105+
knownExecutables: lru.NewLRUSet(knownExecutables),
106+
}, nil
59107
}
60108

61109
func (e *elasticsearchExporter) Start(ctx context.Context, host component.Host) error {
@@ -579,19 +627,34 @@ func (e *elasticsearchExporter) pushProfileRecord(
579627
profile pprofile.Profile,
580628
defaultSession, eventsSession, stackTracesSession, stackFramesSession, executablesSession bulkIndexerSession,
581629
) error {
582-
return encoder.encodeProfile(ec, profile, func(buf *bytes.Buffer, docID, index string) error {
583-
switch index {
584-
case otelserializer.StackTraceIndex:
585-
return stackTracesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
586-
case otelserializer.StackFrameIndex:
587-
return stackFramesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
588-
case otelserializer.AllEventsIndex:
589-
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
590-
case otelserializer.ExecutablesIndex:
591-
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
592-
default:
593-
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
594-
}
630+
return e.knownTraces.WithLock(func(tracesSet lru.LockedLRUSet[string]) error {
631+
return e.knownFrames.WithLock(func(framesSet lru.LockedLRUSet[string]) error {
632+
return e.knownExecutables.WithLock(func(executablesSet lru.LockedLRUSet[string]) error {
633+
return encoder.encodeProfile(ec, profile, func(buf *bytes.Buffer, docID, index string) error {
634+
switch index {
635+
case otelserializer.StackTraceIndex:
636+
if !tracesSet.CheckAndAdd(docID) {
637+
return stackTracesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
638+
}
639+
return nil
640+
case otelserializer.StackFrameIndex:
641+
if !framesSet.CheckAndAdd(docID) {
642+
return stackFramesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
643+
}
644+
return nil
645+
case otelserializer.AllEventsIndex:
646+
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
647+
case otelserializer.ExecutablesIndex:
648+
if !executablesSet.CheckAndAdd(docID) {
649+
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
650+
}
651+
return nil
652+
default:
653+
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
654+
}
655+
})
656+
})
657+
})
595658
})
596659
}
597660

exporter/elasticsearchexporter/factory.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ func createLogsExporter(
127127

128128
handleDeprecatedConfig(cf, set.Logger)
129129

130-
exporter := newExporter(cf, set, cf.LogsIndex, cf.LogsDynamicIndex.Enabled)
130+
exporter, err := newExporter(cf, set, cf.LogsIndex, cf.LogsDynamicIndex.Enabled)
131+
if err != nil {
132+
return nil, err
133+
}
131134

132135
return exporterhelper.NewLogs(
133136
ctx,
@@ -146,7 +149,10 @@ func createMetricsExporter(
146149
cf := cfg.(*Config)
147150
handleDeprecatedConfig(cf, set.Logger)
148151

149-
exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)
152+
exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled)
153+
if err != nil {
154+
return nil, err
155+
}
150156

151157
return exporterhelper.NewMetrics(
152158
ctx,
@@ -164,7 +170,10 @@ func createTracesExporter(ctx context.Context,
164170
cf := cfg.(*Config)
165171
handleDeprecatedConfig(cf, set.Logger)
166172

167-
exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)
173+
exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled)
174+
if err != nil {
175+
return nil, err
176+
}
168177

169178
return exporterhelper.NewTraces(
170179
ctx,
@@ -187,7 +196,10 @@ func createProfilesExporter(
187196

188197
handleDeprecatedConfig(cf, set.Logger)
189198

190-
exporter := newExporter(cf, set, "", false)
199+
exporter, err := newExporter(cf, set, "", false)
200+
if err != nil {
201+
return nil, err
202+
}
191203

192204
return xexporterhelper.NewProfilesExporter(
193205
ctx,

exporter/elasticsearchexporter/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ go 1.23.0
44

55
require (
66
github.com/cenkalti/backoff/v4 v4.3.0
7+
github.com/cespare/xxhash v1.1.0
78
github.com/elastic/go-docappender/v2 v2.6.1
89
github.com/elastic/go-elasticsearch/v8 v8.17.1
10+
github.com/elastic/go-freelru v0.16.0
911
github.com/elastic/go-structform v0.0.12
1012
github.com/klauspost/compress v1.18.0
1113
github.com/lestrrat-go/strftime v1.1.0
@@ -42,7 +44,6 @@ require (
4244
github.com/cilium/ebpf v0.16.0 // indirect
4345
github.com/davecgh/go-spew v1.1.1 // indirect
4446
github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect
45-
github.com/elastic/go-freelru v0.16.0 // indirect
4647
github.com/elastic/go-sysinfo v1.15.0 // indirect
4748
github.com/elastic/go-windows v1.0.2 // indirect
4849
github.com/felixge/httpsnoop v1.0.4 // indirect

exporter/elasticsearchexporter/go.sum

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package lru // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/lru"
5+
6+
import (
7+
"github.com/elastic/go-freelru"
8+
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
9+
)
10+
11+
type Void struct{}
12+
13+
// LockedLRUSet is the interface provided to the LRUSet once a lock has been
14+
// acquired.
15+
type LockedLRUSet[T comparable] interface {
16+
// CheckAndAdd checks whether the entry is already stored in the cache, and
17+
// adds it.
18+
// It returns whether the entry should be excluded, as it was already present
19+
// in cache.
20+
CheckAndAdd(entry T) bool
21+
}
22+
23+
// LRUSet is an LRU cache implementation that allows acquiring a lock, and
24+
// checking whether specific keys have already been stored.
25+
type LRUSet[T comparable] struct {
26+
syncMu *xsync.RWMutex[*freelru.LRU[T, Void]]
27+
}
28+
29+
func (l *LRUSet[T]) WithLock(fn func(LockedLRUSet[T]) error) error {
30+
if l == nil || l.syncMu == nil {
31+
return fn(nilLockedLRUSet[T]{})
32+
}
33+
34+
excluded := l.syncMu.WLock()
35+
defer l.syncMu.WUnlock(&excluded)
36+
37+
return fn(lockedLRUSet[T]{*excluded})
38+
}
39+
40+
func NewLRUSet[T comparable](lru *freelru.LRU[T, Void]) *LRUSet[T] {
41+
syncMu := xsync.NewRWMutex(lru)
42+
return &LRUSet[T]{syncMu: &syncMu}
43+
}
44+
45+
type lockedLRUSet[T comparable] struct {
46+
excluded *freelru.LRU[T, Void]
47+
}
48+
49+
func (l lockedLRUSet[T]) CheckAndAdd(entry T) bool {
50+
defer (l.excluded).Add(entry, Void{})
51+
52+
if _, exclude := (l.excluded).Get(entry); exclude {
53+
return true
54+
}
55+
return false
56+
}
57+
58+
type nilLockedLRUSet[T comparable] struct{}
59+
60+
func (l nilLockedLRUSet[T]) CheckAndAdd(T) bool {
61+
return false
62+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package lru
2+
3+
import (
4+
"testing"
5+
6+
"github.com/cespare/xxhash"
7+
"github.com/elastic/go-freelru"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLRUSet(t *testing.T) {
13+
lru, err := freelru.New[string, Void](5, func(s string) uint32 {
14+
return uint32(xxhash.Sum64String(s))
15+
})
16+
require.NoError(t, err)
17+
cache := NewLRUSet(lru)
18+
19+
err = cache.WithLock(func(lock LockedLRUSet[string]) error {
20+
assert.False(t, lock.CheckAndAdd("a"))
21+
assert.True(t, lock.CheckAndAdd("a"))
22+
assert.False(t, lock.CheckAndAdd("b"))
23+
24+
assert.InDelta(t, 0.0, testing.AllocsPerRun(5, func() {
25+
_ = lock.CheckAndAdd("c")
26+
}), 0)
27+
28+
return nil
29+
})
30+
31+
assert.NoError(t, err)
32+
}
33+
34+
func TestNilLRUSet(t *testing.T) {
35+
cache := &LRUSet[string]{}
36+
37+
err := cache.WithLock(func(lock LockedLRUSet[string]) error {
38+
assert.False(t, lock.CheckAndAdd("a"))
39+
assert.False(t, lock.CheckAndAdd("a"))
40+
assert.False(t, lock.CheckAndAdd("b"))
41+
42+
assert.InDelta(t, 0.0, testing.AllocsPerRun(5, func() {
43+
_ = lock.CheckAndAdd("c")
44+
}), 0)
45+
46+
return nil
47+
})
48+
49+
assert.NoError(t, err)
50+
}
51+
52+
func BenchmarkLRUSetCheck(b *testing.B) {
53+
lru, err := freelru.New[string, Void](5, func(s string) uint32 {
54+
return uint32(xxhash.Sum64String(s))
55+
})
56+
require.NoError(b, err)
57+
cache := NewLRUSet(lru)
58+
59+
_ = cache.WithLock(func(lock LockedLRUSet[string]) error {
60+
b.ReportAllocs()
61+
b.ResetTimer()
62+
for i := 0; i < b.N; i++ {
63+
lock.CheckAndAdd("a")
64+
}
65+
66+
return nil
67+
})
68+
}

0 commit comments

Comments
 (0)