Skip to content

Commit dcf7b2a

Browse files
authored
[exporter/elasticsearch] Add downsampling for profiling events (#37893)
The stratified downsampling is essential to reduce the query latency for profiling data.
1 parent 8c70241 commit dcf7b2a

File tree

4 files changed

+220
-23
lines changed

4 files changed

+220
-23
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add stratified downsampling to the profiles support in the elasticsearch exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37893]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,52 +22,45 @@ const (
2222

2323
// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
2424
func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error {
25+
pushDataAsJSON := func(data any, id, index string) error {
26+
c, err := toJSON(data)
27+
if err != nil {
28+
return err
29+
}
30+
return pushData(c, id, index)
31+
}
32+
2533
data, err := serializeprofiles.Transform(resource, scope, profile)
2634
if err != nil {
2735
return err
2836
}
2937

3038
for _, payload := range data {
31-
if payload.StackTraceEvent.StackTraceID != "" {
32-
c, err := toJSON(payload.StackTraceEvent)
33-
if err != nil {
39+
event := payload.StackTraceEvent
40+
41+
if event.StackTraceID != "" {
42+
if err = pushDataAsJSON(event, "", AllEventsIndex); err != nil {
3443
return err
3544
}
36-
err = pushData(c, "", AllEventsIndex)
37-
if err != nil {
45+
if err = serializeprofiles.IndexDownsampledEvent(event, pushDataAsJSON); err != nil {
3846
return err
3947
}
4048
}
4149

4250
if payload.StackTrace.DocID != "" {
43-
c, err := toJSON(payload.StackTrace)
44-
if err != nil {
45-
return err
46-
}
47-
err = pushData(c, payload.StackTrace.DocID, StackTraceIndex)
48-
if err != nil {
51+
if err = pushDataAsJSON(payload.StackTrace, payload.StackTrace.DocID, StackTraceIndex); err != nil {
4952
return err
5053
}
5154
}
5255

5356
for _, stackFrame := range payload.StackFrames {
54-
c, err := toJSON(stackFrame)
55-
if err != nil {
56-
return err
57-
}
58-
err = pushData(c, stackFrame.DocID, StackFrameIndex)
59-
if err != nil {
57+
if err = pushDataAsJSON(stackFrame, stackFrame.DocID, StackFrameIndex); err != nil {
6058
return err
6159
}
6260
}
6361

6462
for _, executable := range payload.Executables {
65-
c, err := toJSON(executable)
66-
if err != nil {
67-
return err
68-
}
69-
err = pushData(c, executable.DocID, ExecutablesIndex)
70-
if err != nil {
63+
if err = pushDataAsJSON(executable, executable.DocID, ExecutablesIndex); err != nil {
7164
return err
7265
}
7366
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"
5+
6+
import (
7+
"fmt"
8+
"math/rand/v2"
9+
)
10+
11+
// ## Why do we need downsampling ?
12+
// For every (virtual) CPU core, the host agent (HA) retrieves 20 stacktrace events which are
13+
// stored as a timeseries in an ES index (name 'profiling-events'). With an increasing number
14+
// of hosts and/or an increasing number of cores the number of stored events per second
15+
// become high quickly. E.g. data from 10000 cores generate 846 million events per day.
16+
// Since users want to drill down into e.g. single hosts and/or single applications, we can't
17+
// reduce the amount of data in advance. Querying such amounts data is costly, even when using
18+
// highly specialised database backends - costly in terms of I/O, CPU. And this results in
19+
// increased latency - the user has to wait eventually a long time for his query results.
20+
// In order to reduce the costs and to keep the latency as low as possible, we add 'helper'
21+
// indexes with downsampled subsets of the stacktrace events.
22+
//
23+
// ## How does our downsampling approach work ?
24+
// The idea is to create downsampled indexes with factors of 5^N (5, 25, 125, 625, ...).
25+
// In the 5^1 index we would store 1/5th of the original events, in the 5^2 index we store
26+
// 1/25th of the original events and so on.
27+
// So each event has a probability of p=1/5=0.2 to also be stored in the next downsampled index.
28+
// Since we aggregate identical stacktrace events by timestamp when reported and stored, we have
29+
// a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to each single
30+
// event independently and not just to the aggregate. We can do so by looping over 'Count' and
31+
// apply p=0.2 on every iteration to generate a new 'Count' value for the next downsampled index.
32+
// We only store aggregates with 'Count' > 0.
33+
//
34+
// At some point we decided that 20k events per query is good enough. With 5^N it means that we
35+
// possibly can end up with 5x more events (100k) from an index. As of this writing, retrieving
36+
// and processing of 100k events is still fast enough. While in Clickhouse we could further
37+
// downsample on-the-fly to get 20k, ES currently doesn't allow this (may change in the future).
38+
//
39+
// At query time we have to find the index that has enough data to be statistically sound,
40+
// without having too much data to avoid costs and latency. The code for that is implemented on
41+
// the read path (Kibana profiler plugin) and described there in detail.
42+
//
43+
// ## Example of a query / calculation
44+
// Let's imagine, a given query spans a time range of 7 days and would result in 100 million
45+
// events without down-sampling. But we only really need 20k events for a good enough result.
46+
// In the 5^1 downsampled index we have 5x less data - this still results in 20 millions events.
47+
// Going deeper we end up in the 5^5 downsampled index with 32k results - 5^4 would give us 160k
48+
// (too many) and 5^6 would give us 6.4k events (not enough).
49+
// We now read and process all 32k events from the 5^5 index. The counts for any aggregation
50+
// (TopN, Flamegraph, ...) needs to be multiplied by 5^5, which is an estimate of what we would
51+
// have found in the full events index (the not downsampled index).
52+
//
53+
// ## How deep do we have to downsample ?
54+
// The current code assumes an arbitrary upper limit of 100k CPU cores and a query time range
55+
// of 7 days. (Please be aware that we get 20 events per core per second only if the core is
56+
// 100% busy.)
57+
//
58+
// The condition is
59+
//
60+
// (100k * 86400 * 7 * 20) / 5^N in [20k, 100k-1]
61+
// ^-- max number of events per second
62+
// ^------ number of days
63+
// ^-------------- seconds per day
64+
// ^--------------------- number of cores
65+
//
66+
// For N=11 the condition is satisfied with a value of 24772.
67+
// In numbers, the 5^11 downsampled index holds 48828125x fewer entries than the full events table.
68+
//
69+
// ## What is the cost of downsampling ?
70+
// The additional cost in terms of storage size is
71+
//
72+
// 1/5^1 +1/5^2 + ... + 1/5^11 = 25%
73+
//
74+
// The same goes for the additional CPU cost on the write path.
75+
//
76+
// The average benefit on the read/query path depends on the query. But it seems that in average
77+
// a factor of few hundred to a few thousand in terms of I/O, CPU and latency can be achieved.
78+
const (
79+
maxEventsIndexes = 11
80+
samplingFactor = 5
81+
samplingRatio = 1.0 / float64(samplingFactor)
82+
83+
eventsIndexPrefix = "profiling-events"
84+
)
85+
86+
var eventIndices = initEventIndexes(maxEventsIndexes)
87+
88+
// A fixed seed is used for deterministic tests and development.
89+
// There is no downside in using a fixed seed in production.
90+
var rnd = rand.New(rand.NewPCG(0, 0))
91+
92+
// initEventIndexes initializes eventIndexes to avoid calculations for every TraceEvent later.
93+
func initEventIndexes(count int) []string {
94+
indices := make([]string, 0, count)
95+
96+
for i := range count {
97+
indices = append(indices, fmt.Sprintf("%s-%dpow%02d",
98+
eventsIndexPrefix, samplingFactor, i+1))
99+
}
100+
101+
return indices
102+
}
103+
104+
func IndexDownsampledEvent(event StackTraceEvent, pushData func(any, string, string) error) error {
105+
// Each event has a probability of p=1/5=0.2 to go from one index into the next downsampled
106+
// index. Since we aggregate identical stacktrace events by timestamp when reported and stored,
107+
// we have a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to
108+
// each single stacktrace event independently and not just to the aggregate. We can do so by
109+
// looping over 'Count' and apply p=0.2 on every iteration to generate a new 'Count' value for
110+
// the next downsampled index.
111+
// We only store aggregates with 'Count' > 0. If 'Count' becomes 0, we are done and can
112+
// continue with the next stacktrace event.
113+
for _, index := range eventIndices {
114+
var count uint16
115+
for range event.Count {
116+
// samplingRatio is the probability p=0.2 for an event to be copied into the next
117+
// downsampled index.
118+
if rnd.Float64() < samplingRatio {
119+
count++
120+
}
121+
}
122+
if count == 0 {
123+
return nil
124+
}
125+
126+
// Store the event with its new downsampled count in the downsampled index.
127+
event.Count = count
128+
129+
if err := pushData(event, "", index); err != nil {
130+
return err
131+
}
132+
}
133+
134+
return nil
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package serializeprofiles
5+
6+
import (
7+
"math/rand/v2"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestIndexDownsampledEvent(t *testing.T) {
14+
type result struct {
15+
index string
16+
count uint16
17+
}
18+
19+
var pushedData []result
20+
pushData := func(data any, _, index string) error {
21+
pushedData = append(pushedData, result{index, data.(StackTraceEvent).Count})
22+
return nil
23+
}
24+
25+
// To make the expected data deterministic, seed the random number generator.
26+
// If the seed changes or the random number generator changes, this test will fail.
27+
rnd = rand.New(rand.NewPCG(0, 0))
28+
29+
err := IndexDownsampledEvent(StackTraceEvent{Count: 1000}, pushData)
30+
require.NoError(t, err)
31+
32+
expectedData := []result{
33+
{"profiling-events-5pow01", 201},
34+
{"profiling-events-5pow02", 42},
35+
{"profiling-events-5pow03", 9},
36+
{"profiling-events-5pow04", 2},
37+
{"profiling-events-5pow05", 1},
38+
{"profiling-events-5pow06", 1},
39+
}
40+
41+
require.Equal(t, expectedData, pushedData)
42+
}

0 commit comments

Comments
 (0)