Skip to content

Commit c51bea5

Browse files
author
Koenraad Verheyden
authored
Add new tail sampling processor policy: probabilistic (#3876)
* Add new tail sampling processor policy: percentage * Update CHANGELOG.md * Fix tail_sampling_config.yaml * Fix typo * Reset counters to avoid overflow, improve tests * Move into internal as well * Use sampling_percentage similar to the probabilistic sampling processor Use sampling_percentage instead of percentage Make sampling_percentage a value between 0 and 100 * Combine tests in a single table-test * Rename percentage -> probabilistic for consistency with probabilistic processor * Add IncludeAlreadySampled option * Clarify test cases * Use the same algorithm as the probabilistic sampling processor * Simplify if-else case * Typo * Order of tail sampling policies does not matter * Switch hashing implementation from murmur3 to fnv-1a * Lower amount of traces to sample in test to speed up tests
1 parent bc2781f commit c51bea5

File tree

9 files changed

+280
-10
lines changed

9 files changed

+280
-10
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
## 💡 Enhancements 💡
6+
7+
- `tailsampling` processor: Add new policy `probabilistic` (#3876)
8+
59
## v0.33.0
610

711
# 🎉 OpenTelemetry Collector Contrib v0.33.0 (Beta) 🎉

processor/tailsamplingprocessor/README.md

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Multiple policies exist today and it is straight forward to add more. These incl
1515
- `always_sample`: Sample all traces
1616
- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between.
1717
- `numeric_attribute`: Sample based on number attributes
18+
- `probabilistic`: Sample a percentage of traces. Read [a comparison with the Probabilistic Sampling Processor](#probabilistic-sampling-processor-compared-to-the-tail-sampling-processor-with-the-probabilistic-policy).
1819
- `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`)
1920
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
2021
- `rate_limiting`: Sample based on rate
@@ -50,21 +51,26 @@ processors:
5051
},
5152
{
5253
name: test-policy-4,
54+
type: probabilistic,
55+
probabilistic: {sampling_percentage: 10}
56+
},
57+
{
58+
name: test-policy-5,
5359
type: status_code,
5460
status_code: {status_codes: [ERROR, UNSET]}
5561
},
5662
{
57-
name: test-policy-5,
63+
name: test-policy-6,
5864
type: string_attribute,
5965
string_attribute: {key: key2, values: [value1, value2]}
6066
},
6167
{
62-
name: test-policy-6,
68+
name: test-policy-7,
6369
type: string_attribute,
6470
string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10}
6571
},
6672
{
67-
name: test-policy-7,
73+
name: test-policy-8,
6874
type: rate_limiting,
6975
rate_limiting: {spans_per_second: 35}
7076
}
@@ -73,3 +79,21 @@ processors:
7379

7480
Refer to [tail_sampling_config.yaml](./testdata/tail_sampling_config.yaml) for detailed
7581
examples on using the processor.
82+
83+
### Probabilistic Sampling Processor compared to the Tail Sampling Processor with the Probabilistic policy
84+
85+
The [probabilistic sampling processor][probabilistic_sampling_processor] and the probabilistic tail sampling processor policy work very similar:
86+
based upon a configurable sampling percentage they will sample a fixed ratio of received traces.
87+
But depending on the overall processing pipeline you should prefer using one over the other.
88+
89+
As a rule of thumb, if you want to add probabilistic sampling and...
90+
91+
...you are not using the tail sampling processor already: use the [probabilistic sampling processor][probabilistic_sampling_processor].
92+
Running the probabilistic sampling processor is more efficient than the tail sampling processor.
93+
The probabilistic sampling policy makes decision based upon the trace ID, so waiting until more spans have arrived will not influence its decision.
94+
95+
...you are already using the tail sampling processor: add the probabilistic sampling policy.
96+
You are already incurring the cost of running the tail sampling processor, adding the probabilistic policy will be negligible.
97+
Additionally, using the policy within the tail sampling processor will ensure traces that are sampled by other policies will not be dropped.
98+
99+
[probabilistic_sampling_processor]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/probabilisticsamplerprocessor

processor/tailsamplingprocessor/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ const (
3131
// NumericAttribute sample traces that have a given numeric attribute in a specified
3232
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
3333
NumericAttribute PolicyType = "numeric_attribute"
34+
// Probabilistic samples a given percentage of traces.
35+
Probabilistic PolicyType = "probabilistic"
3436
// StatusCode sample traces that have a given status code.
3537
StatusCode PolicyType = "status_code"
3638
// StringAttribute sample traces that a attribute, of type string, matching
@@ -50,6 +52,8 @@ type PolicyCfg struct {
5052
LatencyCfg LatencyCfg `mapstructure:"latency"`
5153
// Configs for numeric attribute filter sampling policy evaluator.
5254
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
55+
// Configs for probabilistic sampling policy evaluator.
56+
ProbabilisticCfg ProbabilisticCfg `mapstructure:"probabilistic"`
5357
// Configs for status code filter sampling policy evaluator.
5458
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
5559
// Configs for string attribute filter sampling policy evaluator.
@@ -76,6 +80,18 @@ type NumericAttributeCfg struct {
7680
MaxValue int64 `mapstructure:"max_value"`
7781
}
7882

83+
// ProbabilisticCfg holds the configurable settings to create a probabilistic
84+
// sampling policy evaluator.
85+
type ProbabilisticCfg struct {
86+
// HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors
87+
// have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have
88+
// different sampling rates, configuring different salts avoids that.
89+
HashSalt string `mapstructure:"hash_salt"`
90+
// SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample.
91+
// Values greater or equal 100 are treated as "sample all traces".
92+
SamplingPercentage float64 `mapstructure:"sampling_percentage"`
93+
}
94+
7995
// StatusCodeCfg holds the configurable settings to create a status code filter sampling
8096
// policy evaluator.
8197
type StatusCodeCfg struct {

processor/tailsamplingprocessor/config_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,22 @@ func TestLoadConfig(t *testing.T) {
5959
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
6060
},
6161
{
62-
Name: "test-policy-4",
62+
Name: "test-policy-4",
63+
Type: Probabilistic,
64+
ProbabilisticCfg: ProbabilisticCfg{HashSalt: "custom-salt", SamplingPercentage: 0.1},
65+
},
66+
{
67+
Name: "test-policy-5",
6368
Type: StatusCode,
6469
StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"ERROR", "UNSET"}},
6570
},
6671
{
67-
Name: "test-policy-5",
72+
Name: "test-policy-6",
6873
Type: StringAttribute,
6974
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
7075
},
7176
{
72-
Name: "test-policy-6",
77+
Name: "test-policy-7",
7378
Type: RateLimiting,
7479
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
7580
},
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sampling
16+
17+
import (
18+
"hash/fnv"
19+
"math"
20+
"math/big"
21+
22+
"go.opentelemetry.io/collector/model/pdata"
23+
"go.uber.org/zap"
24+
)
25+
26+
const (
27+
defaultHashSalt = "default-hash-seed"
28+
)
29+
30+
type probabilisticSampler struct {
31+
logger *zap.Logger
32+
threshold uint64
33+
hashSalt string
34+
}
35+
36+
var _ PolicyEvaluator = (*probabilisticSampler)(nil)
37+
38+
// NewProbabilisticSampler creates a policy evaluator that samples a percentage of
39+
// traces.
40+
func NewProbabilisticSampler(logger *zap.Logger, hashSalt string, samplingPercentage float64) PolicyEvaluator {
41+
if hashSalt == "" {
42+
hashSalt = defaultHashSalt
43+
}
44+
45+
return &probabilisticSampler{
46+
logger: logger,
47+
// calculate threshold once
48+
threshold: calculateThreshold(samplingPercentage / 100),
49+
hashSalt: hashSalt,
50+
}
51+
}
52+
53+
// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
54+
// after the sampling decision was already taken for the trace.
55+
// This gives the evaluator a chance to log any message/metrics and/or update any
56+
// related internal state.
57+
func (s *probabilisticSampler) OnLateArrivingSpans(Decision, []*pdata.Span) error {
58+
s.logger.Debug("Triggering action for late arriving spans in probabilistic filter")
59+
return nil
60+
}
61+
62+
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
63+
func (s *probabilisticSampler) Evaluate(traceID pdata.TraceID, _ *TraceData) (Decision, error) {
64+
s.logger.Debug("Evaluating spans in probabilistic filter")
65+
66+
traceIDBytes := traceID.Bytes()
67+
if hashTraceID(s.hashSalt, traceIDBytes[:]) <= s.threshold {
68+
return Sampled, nil
69+
}
70+
71+
return NotSampled, nil
72+
}
73+
74+
// calculateThreshold converts a ratio into a value between 0 and MaxUint64
75+
func calculateThreshold(ratio float64) uint64 {
76+
// Use big.Float and big.Int to calculate threshold because directly convert
77+
// math.MaxUint64 to float64 will cause digits/bits to be cut off if the converted value
78+
// doesn't fit into bits that are used to store digits for float64 in Golang
79+
boundary := new(big.Float).SetInt(new(big.Int).SetUint64(math.MaxUint64))
80+
res, _ := boundary.Mul(boundary, big.NewFloat(ratio)).Uint64()
81+
return res
82+
}
83+
84+
// hashTraceID creates a hash using the FNV-1a algorithm.
85+
func hashTraceID(salt string, b []byte) uint64 {
86+
hasher := fnv.New64a()
87+
// the implementation fnv.Write() never returns an error, see hash/fnv/fnv.go
88+
_, _ = hasher.Write([]byte(salt))
89+
_, _ = hasher.Write(b)
90+
return hasher.Sum64()
91+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sampling
16+
17+
import (
18+
"encoding/binary"
19+
"math/rand"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"go.opentelemetry.io/collector/model/pdata"
24+
"go.uber.org/zap"
25+
)
26+
27+
func TestProbabilisticSampling(t *testing.T) {
28+
tests := []struct {
29+
name string
30+
samplingPercentage float64
31+
hashSalt string
32+
expectedSamplingPercentage float64
33+
}{
34+
{
35+
"100%",
36+
100,
37+
"",
38+
100,
39+
},
40+
{
41+
"0%",
42+
0,
43+
"",
44+
0,
45+
},
46+
{
47+
"25%",
48+
25,
49+
"",
50+
25,
51+
},
52+
{
53+
"33%",
54+
33,
55+
"",
56+
33,
57+
},
58+
{
59+
"33% - custom salt",
60+
33,
61+
"test-salt",
62+
33,
63+
},
64+
{
65+
"-%50",
66+
-50,
67+
"",
68+
0,
69+
},
70+
{
71+
"150%",
72+
150,
73+
"",
74+
100,
75+
},
76+
}
77+
for _, tt := range tests {
78+
t.Run(tt.name, func(t *testing.T) {
79+
traceCount := 100_000
80+
81+
var emptyAttrs = map[string]pdata.AttributeValue{}
82+
83+
probabilisticSampler := NewProbabilisticSampler(zap.NewNop(), tt.hashSalt, tt.samplingPercentage)
84+
85+
sampled := 0
86+
for _, traceID := range genRandomTraceIDs(traceCount) {
87+
trace := newTraceStringAttrs(emptyAttrs, "example", "value")
88+
89+
decision, err := probabilisticSampler.Evaluate(traceID, trace)
90+
assert.NoError(t, err)
91+
92+
if decision == Sampled {
93+
sampled++
94+
}
95+
}
96+
97+
effectiveSamplingPercentage := float32(sampled) / float32(traceCount) * 100
98+
assert.InDelta(t, tt.expectedSamplingPercentage, effectiveSamplingPercentage, 0.2,
99+
"Effective sampling percentage is %f, expected %f", effectiveSamplingPercentage, tt.expectedSamplingPercentage,
100+
)
101+
})
102+
}
103+
}
104+
105+
func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) {
106+
probabilisticSampler := NewProbabilisticSampler(zap.NewNop(), "", 10)
107+
108+
err := probabilisticSampler.OnLateArrivingSpans(NotSampled, nil)
109+
assert.Nil(t, err)
110+
}
111+
112+
func genRandomTraceIDs(num int) (ids []pdata.TraceID) {
113+
r := rand.New(rand.NewSource(1))
114+
ids = make([]pdata.TraceID, 0, num)
115+
for i := 0; i < num; i++ {
116+
traceID := [16]byte{}
117+
binary.BigEndian.PutUint64(traceID[:8], r.Uint64())
118+
binary.BigEndian.PutUint64(traceID[8:], r.Uint64())
119+
ids = append(ids, pdata.NewTraceID(traceID))
120+
}
121+
return ids
122+
}

processor/tailsamplingprocessor/internal/sampling/status_code_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestNewStatusCodeFilter_errorHandling(t *testing.T) {
3030
assert.EqualError(t, err, "unknown status code \"ERR\", supported: OK, ERROR, UNSET")
3131
}
3232

33-
func TestPercentageSampling(t *testing.T) {
33+
func TestStatusCodeSampling(t *testing.T) {
3434
traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
3535

3636
cases := []struct {
@@ -92,7 +92,7 @@ func TestPercentageSampling(t *testing.T) {
9292
}
9393
}
9494

95-
func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) {
95+
func TestOnLateArrivingSpans_StatusCodeSampling(t *testing.T) {
9696
statusCode, err := NewStatusCodeFilter(zap.NewNop(), []string{"ERROR"})
9797
assert.Nil(t, err)
9898

processor/tailsamplingprocessor/processor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
123123
case NumericAttribute:
124124
nafCfg := cfg.NumericAttributeCfg
125125
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
126+
case Probabilistic:
127+
pCfg := cfg.ProbabilisticCfg
128+
return sampling.NewProbabilisticSampler(logger, pCfg.HashSalt, pCfg.SamplingPercentage), nil
126129
case StringAttribute:
127130
safCfg := cfg.StringAttributeCfg
128131
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize), nil

0 commit comments

Comments
 (0)