Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

## 💡 Enhancements 💡

- `tailsampling` processor: Add new policy `status_code` (#3754)
- `tailsampling` processor:
- Add new policy `status_code` (#3754)
- Add new policy `percentage` (#3876)

## v0.29.0

Expand Down
12 changes: 9 additions & 3 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `always_sample`: Sample all traces
- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between.
- `numeric_attribute`: Sample based on number attributes
- `percentage`: Sample a percentage of traces. Only traces that have not been sampled yet by another policy are taken into account.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, wasn't aware of the probabilistic sampling processor (for some reason) 🤦🏻 I'll take a look at it and make sure our terminology is consistent.

I think the main difference is when the sampling decision happens. If you only want percentage or probabilistic sampling it doesn't make sense to use this tail sampling processor.
But combining this with other tail sampling policies makes it more meaningful. For instance:

    policies:
      [
          {
            name: all-errors,
            type: status_code,
            status_code: {status_codes: [ERROR]}
          },
          {
            name: half-of-remaining,
            type: percentage,
            percentage: {percentage: 0.5}
          },
     ]

This pipeline would sample all traces with status code error and 50% of the remaining traces. Using the probabilistic sampling processor you risk dropping traces with errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Make sure to mention that in the readme then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between this and just putting the probabilistic sampling processor after tail sampling at 50%?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result will be slightly different: putting the probabilistic sampling processor after the tail sampling processor will filter what the tail sampling processor samples.
The percentage sampler as implemented here will filter what is not sampled by another policy.

So for instance a pipeline:

tail sampling (sample all errors) -> probabilistic (at 50%)

Result: 50% of traces with error
Why:

  • the tail sampler drops every non-error
  • the probabilistic sampler drops 50% of what the tail sampler returns

While the following:

tail sampling (sampler all errors -> sample 50%)

Result: all traces with error + 50% of traces without

- `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`)
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `rate_limiting`: Sample based on rate
Expand Down Expand Up @@ -50,21 +51,26 @@ processors:
},
{
name: test-policy-4,
type: percentage,
percentage: {percentage: 0.1}
},
{
name: test-policy-5,
type: status_code,
status_code: {status_codes: [ERROR, UNSET]}
},
{
name: test-policy-5,
name: test-policy-6,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-6,
name: test-policy-7,
type: string_attribute,
string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10}
},
{
name: test-policy-7,
name: test-policy-8,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
Expand Down
10 changes: 10 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttribute PolicyType = "numeric_attribute"
// Percentage samples a given percentage of traces.
Percentage PolicyType = "percentage"
// StatusCode sample traces that have a given status code.
StatusCode PolicyType = "status_code"
// StringAttribute sample traces that a attribute, of type string, matching
Expand All @@ -50,6 +52,8 @@ type PolicyCfg struct {
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for percentage filter sampling policy evaluator.
PercentageCfg PercentageCfg `mapstructure:"percentage"`
// Configs for status code filter sampling policy evaluator.
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
// Configs for string attribute filter sampling policy evaluator.
Expand All @@ -76,6 +80,12 @@ type NumericAttributeCfg struct {
MaxValue int64 `mapstructure:"max_value"`
}

// PercentageCfg holds the configurable settings to create a percentage filter sampling
// policy evaluator.
type PercentageCfg struct {
Percentage float32 `mapstructure:"percentage"`
}

// StatusCodeCfg holds the configurable settings to create a status code filter sampling
// policy evaluator.
type StatusCodeCfg struct {
Expand Down
9 changes: 7 additions & 2 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ func TestLoadConfig(t *testing.T) {
},
{
Name: "test-policy-4",
Type: Percentage,
PercentageCfg: PercentageCfg{Percentage: 0.1},
},
{
Name: "test-policy-5",
Type: StatusCode,
StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"ERROR", "UNSET"}},
},
{
Name: "test-policy-5",
Name: "test-policy-6",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-6",
Name: "test-policy-7",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case Percentage:
pCfg := cfg.PercentageCfg
return sampling.NewPercentageFilter(logger, pCfg.Percentage)
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize), nil
Expand Down
81 changes: 81 additions & 0 deletions processor/tailsamplingprocessor/sampling/percentage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"errors"

"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

type percentageFilter struct {
logger *zap.Logger
percentage float32
tracesSampled int
tracesProcessed int
}

var _ PolicyEvaluator = (*percentageFilter)(nil)

// NewPercentageFilter creates a policy evaluator that samples a percentage of
// traces.
func NewPercentageFilter(logger *zap.Logger, percentage float32) (PolicyEvaluator, error) {
if percentage < 0 || percentage > 1 {
return nil, errors.New("expected a percentage between 0 and 1")
}

return &percentageFilter{
logger: logger,
percentage: percentage,
}, nil
}

// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
// after the sampling decision was already taken for the trace.
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (r *percentageFilter) OnLateArrivingSpans(Decision, []*pdata.Span) error {
r.logger.Debug("Triggering action for late arriving spans in percentage filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (r *percentageFilter) Evaluate(_ pdata.TraceID, trace *TraceData) (Decision, error) {
r.logger.Debug("Evaluating spans in percentage filter")

// ignore traces that have already been sampled before
for _, decision := range trace.Decisions {
if decision == Sampled {
return NotSampled, nil
}
}

decision := NotSampled

if float32(r.tracesSampled)/float32(r.tracesProcessed) <= r.percentage {
r.tracesSampled++
decision = Sampled
}
r.tracesProcessed++

// reset counters to avoid overflow
if r.tracesProcessed == 1000 {
r.tracesSampled = 0
r.tracesProcessed = 0
}

return decision, nil
}
103 changes: 103 additions & 0 deletions processor/tailsamplingprocessor/sampling/percentage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestNewPercentageFilter_errorHandling(t *testing.T) {
_, err := NewPercentageFilter(zap.NewNop(), -1)
assert.EqualError(t, err, "expected a percentage between 0 and 1")

_, err = NewPercentageFilter(zap.NewNop(), 1.5)
assert.EqualError(t, err, "expected a percentage between 0 and 1")
}

func TestPercentageSampling(t *testing.T) {
var empty = map[string]pdata.AttributeValue{}

cases := []float32{0.01, 0.1, 0.125, 0.33, 0.5, 0.66}

for _, percentage := range cases {
t.Run(fmt.Sprintf("sample %.2f", percentage), func(t *testing.T) {
trace := newTraceStringAttrs(empty, "example", "value")
traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})

percentageFilter, err := NewPercentageFilter(zap.NewNop(), percentage)
assert.NoError(t, err)

traceCount := 2000
sampled := 0

for i := 0; i < traceCount; i++ {
decision, err := percentageFilter.Evaluate(traceID, trace)
assert.NoError(t, err)

if decision == Sampled {
sampled++
}
}

assert.InDelta(t, percentage*float32(traceCount), sampled, 0.001, "Amount of sampled traces")
})
}
}

func TestPercentageSampling_ignoreAlreadySampledTraces(t *testing.T) {
var empty = map[string]pdata.AttributeValue{}

trace := newTraceStringAttrs(empty, "example", "value")
traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})

var percentage float32 = 0.33

percentageFilter, err := NewPercentageFilter(zap.NewNop(), percentage)
assert.NoError(t, err)

traceCount := 100
sampled := 0

for i := 0; i < traceCount; i++ {
trace.Decisions = []Decision{NotSampled, NotSampled}
decision, err := percentageFilter.Evaluate(traceID, trace)
assert.NoError(t, err)

if decision == Sampled {
sampled++
}

// trace has been sampled, should be ignored
trace.Decisions = []Decision{NotSampled, Sampled}
decision, err = percentageFilter.Evaluate(traceID, trace)
assert.NoError(t, err)
assert.Equal(t, decision, NotSampled)
}

assert.EqualValues(t, percentage*float32(traceCount), sampled)
}

func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) {
percentageFilter, err := NewPercentageFilter(zap.NewNop(), 0.1)
assert.Nil(t, err)

err = percentageFilter.OnLateArrivingSpans(NotSampled, nil)
assert.Nil(t, err)
}
4 changes: 2 additions & 2 deletions processor/tailsamplingprocessor/sampling/status_code_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestNewStatusCodeFilter_errorHandling(t *testing.T) {
assert.EqualError(t, err, "unknown status code \"ERR\", supported: OK, ERROR, UNSET")
}

func TestPercentageSampling(t *testing.T) {
func TestStatusCodeSampling(t *testing.T) {
traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})

cases := []struct {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestPercentageSampling(t *testing.T) {
}
}

func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) {
func TestOnLateArrivingSpans_StatusCodeSampling(t *testing.T) {
statusCode, err := NewStatusCodeFilter(zap.NewNop(), []string{"ERROR"})
assert.Nil(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ processors:
},
{
name: test-policy-4,
type: percentage,
percentage: {percentage: 0.1}
},
{
name: test-policy-5,
type: status_code,
status_code: {status_codes: [ERROR, UNSET]}
},
{
name: test-policy-5,
name: test-policy-6,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-6,
name: test-policy-7,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
},
Expand Down