diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d59f3f79b57a..443fca5526da0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## 💡 Enhancements 💡 + +- `tailsampling` processor: Add new policy `probabilistic` (#3876) + ## v0.33.0 # 🎉 OpenTelemetry Collector Contrib v0.33.0 (Beta) 🎉 diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index e4187a06a04e2..a3798d579a585 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -15,6 +15,7 @@ Multiple policies exist today and it is straight forward to add more. These incl - `always_sample`: Sample all traces - `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between. - `numeric_attribute`: Sample based on number attributes +- `probabilistic`: Sample a percentage of traces. Read [a comparison with the Probabilistic Sampling Processor](#probabilistic-sampling-processor-compared-to-the-tail-sampling-processor-with-the-probabilistic-policy). - `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`) - `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported - `rate_limiting`: Sample based on rate @@ -50,21 +51,26 @@ processors: }, { name: test-policy-4, + type: probabilistic, + probabilistic: {sampling_percentage: 10} + }, + { + name: test-policy-5, type: status_code, status_code: {status_codes: [ERROR, UNSET]} }, { - name: test-policy-5, + name: test-policy-6, type: string_attribute, string_attribute: {key: key2, values: [value1, value2]} }, { - name: test-policy-6, + name: test-policy-7, type: string_attribute, string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10} }, { - name: test-policy-7, + name: test-policy-8, type: rate_limiting, rate_limiting: {spans_per_second: 35} } @@ -73,3 +79,21 @@ processors: Refer to [tail_sampling_config.yaml](./testdata/tail_sampling_config.yaml) for detailed examples on using the processor. + +### Probabilistic Sampling Processor compared to the Tail Sampling Processor with the Probabilistic policy + +The [probabilistic sampling processor][probabilistic_sampling_processor] and the probabilistic tail sampling processor policy work very similar: +based upon a configurable sampling percentage they will sample a fixed ratio of received traces. +But depending on the overall processing pipeline you should prefer using one over the other. + +As a rule of thumb, if you want to add probabilistic sampling and... + +...you are not using the tail sampling processor already: use the [probabilistic sampling processor][probabilistic_sampling_processor]. +Running the probabilistic sampling processor is more efficient than the tail sampling processor. +The probabilistic sampling policy makes decision based upon the trace ID, so waiting until more spans have arrived will not influence its decision. + +...you are already using the tail sampling processor: add the probabilistic sampling policy. +You are already incurring the cost of running the tail sampling processor, adding the probabilistic policy will be negligible. +Additionally, using the policy within the tail sampling processor will ensure traces that are sampled by other policies will not be dropped. + +[probabilistic_sampling_processor]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/probabilisticsamplerprocessor diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 7b9311a4517a3..541560c8c0301 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -31,6 +31,8 @@ const ( // NumericAttribute sample traces that have a given numeric attribute in a specified // range, e.g.: attribute "http.status_code" >= 399 and <= 999. NumericAttribute PolicyType = "numeric_attribute" + // Probabilistic samples a given percentage of traces. + Probabilistic PolicyType = "probabilistic" // StatusCode sample traces that have a given status code. StatusCode PolicyType = "status_code" // StringAttribute sample traces that a attribute, of type string, matching @@ -50,6 +52,8 @@ type PolicyCfg struct { LatencyCfg LatencyCfg `mapstructure:"latency"` // Configs for numeric attribute filter sampling policy evaluator. NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"` + // Configs for probabilistic sampling policy evaluator. + ProbabilisticCfg ProbabilisticCfg `mapstructure:"probabilistic"` // Configs for status code filter sampling policy evaluator. StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"` // Configs for string attribute filter sampling policy evaluator. @@ -76,6 +80,18 @@ type NumericAttributeCfg struct { MaxValue int64 `mapstructure:"max_value"` } +// ProbabilisticCfg holds the configurable settings to create a probabilistic +// sampling policy evaluator. +type ProbabilisticCfg struct { + // HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors + // have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have + // different sampling rates, configuring different salts avoids that. + HashSalt string `mapstructure:"hash_salt"` + // SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample. + // Values greater or equal 100 are treated as "sample all traces". + SamplingPercentage float64 `mapstructure:"sampling_percentage"` +} + // StatusCodeCfg holds the configurable settings to create a status code filter sampling // policy evaluator. type StatusCodeCfg struct { diff --git a/processor/tailsamplingprocessor/config_test.go b/processor/tailsamplingprocessor/config_test.go index fc596eff651d8..4feeb7c2a9979 100644 --- a/processor/tailsamplingprocessor/config_test.go +++ b/processor/tailsamplingprocessor/config_test.go @@ -59,17 +59,22 @@ func TestLoadConfig(t *testing.T) { NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100}, }, { - Name: "test-policy-4", + Name: "test-policy-4", + Type: Probabilistic, + ProbabilisticCfg: ProbabilisticCfg{HashSalt: "custom-salt", SamplingPercentage: 0.1}, + }, + { + Name: "test-policy-5", Type: StatusCode, StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"ERROR", "UNSET"}}, }, { - Name: "test-policy-5", + Name: "test-policy-6", Type: StringAttribute, StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}}, }, { - Name: "test-policy-6", + Name: "test-policy-7", Type: RateLimiting, RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35}, }, diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go new file mode 100644 index 0000000000000..e8bd039a6f8b8 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sampling + +import ( + "hash/fnv" + "math" + "math/big" + + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +const ( + defaultHashSalt = "default-hash-seed" +) + +type probabilisticSampler struct { + logger *zap.Logger + threshold uint64 + hashSalt string +} + +var _ PolicyEvaluator = (*probabilisticSampler)(nil) + +// NewProbabilisticSampler creates a policy evaluator that samples a percentage of +// traces. +func NewProbabilisticSampler(logger *zap.Logger, hashSalt string, samplingPercentage float64) PolicyEvaluator { + if hashSalt == "" { + hashSalt = defaultHashSalt + } + + return &probabilisticSampler{ + logger: logger, + // calculate threshold once + threshold: calculateThreshold(samplingPercentage / 100), + hashSalt: hashSalt, + } +} + +// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived +// after the sampling decision was already taken for the trace. +// This gives the evaluator a chance to log any message/metrics and/or update any +// related internal state. +func (s *probabilisticSampler) OnLateArrivingSpans(Decision, []*pdata.Span) error { + s.logger.Debug("Triggering action for late arriving spans in probabilistic filter") + return nil +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (s *probabilisticSampler) Evaluate(traceID pdata.TraceID, _ *TraceData) (Decision, error) { + s.logger.Debug("Evaluating spans in probabilistic filter") + + traceIDBytes := traceID.Bytes() + if hashTraceID(s.hashSalt, traceIDBytes[:]) <= s.threshold { + return Sampled, nil + } + + return NotSampled, nil +} + +// calculateThreshold converts a ratio into a value between 0 and MaxUint64 +func calculateThreshold(ratio float64) uint64 { + // Use big.Float and big.Int to calculate threshold because directly convert + // math.MaxUint64 to float64 will cause digits/bits to be cut off if the converted value + // doesn't fit into bits that are used to store digits for float64 in Golang + boundary := new(big.Float).SetInt(new(big.Int).SetUint64(math.MaxUint64)) + res, _ := boundary.Mul(boundary, big.NewFloat(ratio)).Uint64() + return res +} + +// hashTraceID creates a hash using the FNV-1a algorithm. +func hashTraceID(salt string, b []byte) uint64 { + hasher := fnv.New64a() + // the implementation fnv.Write() never returns an error, see hash/fnv/fnv.go + _, _ = hasher.Write([]byte(salt)) + _, _ = hasher.Write(b) + return hasher.Sum64() +} diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go new file mode 100644 index 0000000000000..ce0279095c74d --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic_test.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sampling + +import ( + "encoding/binary" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" +) + +func TestProbabilisticSampling(t *testing.T) { + tests := []struct { + name string + samplingPercentage float64 + hashSalt string + expectedSamplingPercentage float64 + }{ + { + "100%", + 100, + "", + 100, + }, + { + "0%", + 0, + "", + 0, + }, + { + "25%", + 25, + "", + 25, + }, + { + "33%", + 33, + "", + 33, + }, + { + "33% - custom salt", + 33, + "test-salt", + 33, + }, + { + "-%50", + -50, + "", + 0, + }, + { + "150%", + 150, + "", + 100, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + traceCount := 100_000 + + var emptyAttrs = map[string]pdata.AttributeValue{} + + probabilisticSampler := NewProbabilisticSampler(zap.NewNop(), tt.hashSalt, tt.samplingPercentage) + + sampled := 0 + for _, traceID := range genRandomTraceIDs(traceCount) { + trace := newTraceStringAttrs(emptyAttrs, "example", "value") + + decision, err := probabilisticSampler.Evaluate(traceID, trace) + assert.NoError(t, err) + + if decision == Sampled { + sampled++ + } + } + + effectiveSamplingPercentage := float32(sampled) / float32(traceCount) * 100 + assert.InDelta(t, tt.expectedSamplingPercentage, effectiveSamplingPercentage, 0.2, + "Effective sampling percentage is %f, expected %f", effectiveSamplingPercentage, tt.expectedSamplingPercentage, + ) + }) + } +} + +func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) { + probabilisticSampler := NewProbabilisticSampler(zap.NewNop(), "", 10) + + err := probabilisticSampler.OnLateArrivingSpans(NotSampled, nil) + assert.Nil(t, err) +} + +func genRandomTraceIDs(num int) (ids []pdata.TraceID) { + r := rand.New(rand.NewSource(1)) + ids = make([]pdata.TraceID, 0, num) + for i := 0; i < num; i++ { + traceID := [16]byte{} + binary.BigEndian.PutUint64(traceID[:8], r.Uint64()) + binary.BigEndian.PutUint64(traceID[8:], r.Uint64()) + ids = append(ids, pdata.NewTraceID(traceID)) + } + return ids +} diff --git a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go index 7fbc2e5f123c3..1666dbd98fa1a 100644 --- a/processor/tailsamplingprocessor/internal/sampling/status_code_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/status_code_test.go @@ -30,7 +30,7 @@ func TestNewStatusCodeFilter_errorHandling(t *testing.T) { assert.EqualError(t, err, "unknown status code \"ERR\", supported: OK, ERROR, UNSET") } -func TestPercentageSampling(t *testing.T) { +func TestStatusCodeSampling(t *testing.T) { traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) cases := []struct { @@ -92,7 +92,7 @@ func TestPercentageSampling(t *testing.T) { } } -func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) { +func TestOnLateArrivingSpans_StatusCodeSampling(t *testing.T) { statusCode, err := NewStatusCodeFilter(zap.NewNop(), []string{"ERROR"}) assert.Nil(t, err) diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index d82e3d690af37..73bf985b9bf05 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -123,6 +123,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval case NumericAttribute: nafCfg := cfg.NumericAttributeCfg return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil + case Probabilistic: + pCfg := cfg.ProbabilisticCfg + return sampling.NewProbabilisticSampler(logger, pCfg.HashSalt, pCfg.SamplingPercentage), nil case StringAttribute: safCfg := cfg.StringAttributeCfg return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize), nil diff --git a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml index 04e1787f907ca..7e3539c9019ca 100644 --- a/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml +++ b/processor/tailsamplingprocessor/testdata/tail_sampling_config.yaml @@ -27,16 +27,21 @@ processors: }, { name: test-policy-4, + type: probabilistic, + probabilistic: {hash_salt: "custom-salt", sampling_percentage: 0.1} + }, + { + name: test-policy-5, type: status_code, status_code: {status_codes: [ERROR, UNSET]} }, { - name: test-policy-5, + name: test-policy-6, type: string_attribute, string_attribute: {key: key2, values: [value1, value2]} }, { - name: test-policy-6, + name: test-policy-7, type: rate_limiting, rate_limiting: {spans_per_second: 35} },