Skip to content

Commit d2825ea

Browse files
authored
[processor/tailsampling] Drop policy type (#39668)
#### Description This pull-request introduces a new policy type to explicitly drop traces regardless of other policy decisions. The new `drop` policy type behaves much like an `and`, however, it returns a `Dropped` decision when all of its policies return `Sampled`. A `Dropped` decision takes precedence over all others. The `Dropped` decision is not new, however, it was unused and therefor safe to use for this purpose. This new policy type is very **approachable** and capable. This new policy type can be used to replace the top-level (not within an `and` policy) use of `invert_match` to control the final sampling decision. We could deprecate the current `invert_match` top-level decision logic. The string, numeric, and boolean filter policies still support `invert_match`, which continues to flip the decision for the individual policy. Let `invert_match` be simple. Example: ``` { name: drop-it-like-its-hot, type: drop, drop: { drop_sub_policy: [ { name: do-not-sample, type: boolean_attribute, boolean_attribute: { key: app.do_not_sample, value: true }, } ] } } ``` This is a reduced version of a previous PR #37760 #### Related Issues - #36673 - #33656 - #36795 - #34296 - #34085 - #29637 - #27049 - Probably more 😅 --------- Signed-off-by: Sean Porter <[email protected]>
1 parent 7619824 commit d2825ea

File tree

10 files changed

+287
-28
lines changed

10 files changed

+287
-28
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/tailsampling
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: New policy type to explicitly drop traces regardless of other policy decisions.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39668]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/tailsamplingprocessor/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ const (
3333
Composite PolicyType = "composite"
3434
// And allows defining a And policy, combining the other policies in one
3535
And PolicyType = "and"
36+
// Drop allows defining a Drop policy, combining one or more policies to drop traces.
37+
Drop PolicyType = "drop"
3638
// SpanCount sample traces that are have more spans per Trace than a given threshold.
3739
SpanCount PolicyType = "span_count"
3840
// TraceState sample traces with specified values by the given key
@@ -100,6 +102,11 @@ type AndCfg struct {
100102
SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"`
101103
}
102104

105+
// DropCfg holds the common configuration to all policies under drop policy.
106+
type DropCfg struct {
107+
SubPolicyCfg []AndSubPolicyCfg `mapstructure:"drop_sub_policy"`
108+
}
109+
103110
// CompositeCfg holds the configurable settings to create a composite
104111
// sampling policy evaluator.
105112
type CompositeCfg struct {
@@ -123,6 +130,8 @@ type PolicyCfg struct {
123130
CompositeCfg CompositeCfg `mapstructure:"composite"`
124131
// Configs for defining and policy
125132
AndCfg AndCfg `mapstructure:"and"`
133+
// Configs for defining drop policy
134+
DropCfg DropCfg `mapstructure:"drop"`
126135
}
127136

128137
// LatencyCfg holds the configurable settings to create a latency filter sampling policy
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
5+
6+
import (
7+
"go.opentelemetry.io/collector/component"
8+
9+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
10+
)
11+
12+
func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sampling.PolicyEvaluator, error) {
13+
subPolicyEvaluators := make([]sampling.PolicyEvaluator, len(config.SubPolicyCfg))
14+
for i := range config.SubPolicyCfg {
15+
policyCfg := &config.SubPolicyCfg[i]
16+
policy, err := getDropSubPolicyEvaluator(settings, policyCfg)
17+
if err != nil {
18+
return nil, err
19+
}
20+
subPolicyEvaluators[i] = policy
21+
}
22+
return sampling.NewDrop(settings.Logger, subPolicyEvaluators), nil
23+
}
24+
25+
// Return instance of and sub-policy
26+
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (sampling.PolicyEvaluator, error) {
27+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
28+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package tailsamplingprocessor
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/collector/component/componenttest"
12+
"go.uber.org/zap"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
15+
)
16+
17+
func TestDropHelper(t *testing.T) {
18+
t.Run("valid", func(t *testing.T) {
19+
actual, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{
20+
SubPolicyCfg: []AndSubPolicyCfg{
21+
{
22+
sharedPolicyCfg: sharedPolicyCfg{
23+
Name: "test-and-policy-1",
24+
Type: Latency,
25+
LatencyCfg: LatencyCfg{ThresholdMs: 100},
26+
},
27+
},
28+
},
29+
})
30+
require.NoError(t, err)
31+
32+
expected := sampling.NewDrop(zap.NewNop(), []sampling.PolicyEvaluator{
33+
sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0),
34+
})
35+
assert.Equal(t, expected, actual)
36+
})
37+
38+
t.Run("unsupported sampling policy type", func(t *testing.T) {
39+
_, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{
40+
SubPolicyCfg: []AndSubPolicyCfg{
41+
{
42+
sharedPolicyCfg: sharedPolicyCfg{
43+
Name: "test-and-policy-2",
44+
Type: Drop, // nested drop is not allowed
45+
},
46+
},
47+
},
48+
})
49+
require.EqualError(t, err, "unknown sampling policy type drop")
50+
})
51+
}

processor/tailsamplingprocessor/internal/sampling/and.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,3 @@ func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *Trac
4141
}
4242
return Sampled, nil
4343
}
44-
45-
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
46-
// pressure, before the decision_wait time has been reached.
47-
func (c *And) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) {
48-
return Sampled, nil
49-
}

processor/tailsamplingprocessor/internal/sampling/composite.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -132,21 +132,3 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace
132132

133133
return NotSampled, nil
134134
}
135-
136-
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
137-
// pressure, before the decision_wait time has been reached.
138-
func (c *Composite) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) {
139-
// Here we have a number of possible solutions:
140-
// 1. Random sample traces based on maxTotalSPS.
141-
// 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially
142-
// using partial trace data for sampling.
143-
// 3. Sample everything.
144-
//
145-
// It seems that #2 may be the best choice from end user perspective, but
146-
// it is not certain and it is also additional performance penalty when we are
147-
// already under a memory (and possibly CPU) pressure situation.
148-
//
149-
// For now we are playing safe and go with #3. Investigating alternate options
150-
// should be a future task.
151-
return Sampled, nil
152-
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/pdata/pcommon"
10+
"go.uber.org/zap"
11+
)
12+
13+
type Drop struct {
14+
// the subpolicy evaluators
15+
subpolicies []PolicyEvaluator
16+
logger *zap.Logger
17+
}
18+
19+
func NewDrop(
20+
logger *zap.Logger,
21+
subpolicies []PolicyEvaluator,
22+
) PolicyEvaluator {
23+
return &Drop{
24+
subpolicies: subpolicies,
25+
logger: logger,
26+
}
27+
}
28+
29+
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
30+
func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) {
31+
// The policy iterates over all sub-policies and returns Dropped if all
32+
// sub-policies returned a Sampled Decision. If any subpolicy returns
33+
// NotSampled, it returns NotSampled Decision.
34+
for _, sub := range c.subpolicies {
35+
decision, err := sub.Evaluate(ctx, traceID, trace)
36+
if err != nil {
37+
return Unspecified, err
38+
}
39+
if decision == NotSampled || decision == InvertNotSampled {
40+
return NotSampled, nil
41+
}
42+
}
43+
return Dropped, nil
44+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sampling
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component/componenttest"
13+
"go.opentelemetry.io/collector/pdata/ptrace"
14+
"go.uber.org/zap"
15+
)
16+
17+
func TestDropEvaluatorNotSampled(t *testing.T) {
18+
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "name", []string{"value"}, false, 0, false)
19+
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
20+
require.NoError(t, err)
21+
22+
and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})
23+
24+
traces := ptrace.NewTraces()
25+
rs := traces.ResourceSpans().AppendEmpty()
26+
ils := rs.ScopeSpans().AppendEmpty()
27+
28+
span := ils.Spans().AppendEmpty()
29+
span.Status().SetCode(ptrace.StatusCodeError)
30+
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
31+
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
32+
33+
trace := &TraceData{
34+
ReceivedBatches: traces,
35+
}
36+
decision, err := and.Evaluate(context.Background(), traceID, trace)
37+
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
38+
assert.Equal(t, NotSampled, decision)
39+
}
40+
41+
func TestDropEvaluatorSampled(t *testing.T) {
42+
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, false)
43+
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
44+
require.NoError(t, err)
45+
46+
and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})
47+
48+
traces := ptrace.NewTraces()
49+
rs := traces.ResourceSpans().AppendEmpty()
50+
ils := rs.ScopeSpans().AppendEmpty()
51+
52+
span := ils.Spans().AppendEmpty()
53+
span.Attributes().PutStr("attribute_name", "attribute_value")
54+
span.Status().SetCode(ptrace.StatusCodeError)
55+
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
56+
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
57+
58+
trace := &TraceData{
59+
ReceivedBatches: traces,
60+
}
61+
decision, err := and.Evaluate(context.Background(), traceID, trace)
62+
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
63+
assert.Equal(t, Dropped, decision)
64+
}
65+
66+
func TestDropEvaluatorStringInvertMatch(t *testing.T) {
67+
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true)
68+
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
69+
require.NoError(t, err)
70+
71+
and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})
72+
73+
traces := ptrace.NewTraces()
74+
rs := traces.ResourceSpans().AppendEmpty()
75+
ils := rs.ScopeSpans().AppendEmpty()
76+
77+
span := ils.Spans().AppendEmpty()
78+
span.Attributes().PutStr("attribute_name", "attribute_value")
79+
span.Status().SetCode(ptrace.StatusCodeError)
80+
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
81+
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
82+
83+
trace := &TraceData{
84+
ReceivedBatches: traces,
85+
}
86+
decision, err := and.Evaluate(context.Background(), traceID, trace)
87+
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
88+
assert.Equal(t, Dropped, decision)
89+
}
90+
91+
func TestDropEvaluatorStringInvertNotMatch(t *testing.T) {
92+
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true)
93+
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
94+
require.NoError(t, err)
95+
96+
and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})
97+
98+
traces := ptrace.NewTraces()
99+
rs := traces.ResourceSpans().AppendEmpty()
100+
ils := rs.ScopeSpans().AppendEmpty()
101+
102+
span := ils.Spans().AppendEmpty()
103+
span.Attributes().PutStr("attribute_name", "attribute_value")
104+
span.Status().SetCode(ptrace.StatusCodeError)
105+
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
106+
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
107+
108+
trace := &TraceData{
109+
ReceivedBatches: traces,
110+
}
111+
decision, err := and.Evaluate(context.Background(), traceID, trace)
112+
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
113+
assert.Equal(t, NotSampled, decision)
114+
}

processor/tailsamplingprocessor/internal/sampling/policy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ const (
4242
// NotSampled is used to indicate that the decision was already taken
4343
// to not sample the data.
4444
NotSampled
45-
// Dropped is used when data needs to be purged before the sampling policy
46-
// had a chance to evaluate it.
45+
// Dropped is used to indicate that a trace should be dropped regardless of
46+
// all other decisions.
4747
Dropped
4848
// Error is used to indicate that policy evaluation was not succeeded.
4949
Error

processor/tailsamplingprocessor/processor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ var (
8282
sampling.NotSampled: attrSampledFalse,
8383
sampling.InvertNotSampled: attrSampledFalse,
8484
sampling.InvertSampled: attrSampledTrue,
85+
sampling.Dropped: attrSampledFalse,
8586
}
8687
)
8788

@@ -201,6 +202,8 @@ func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (s
201202
return getNewCompositePolicy(settings, &cfg.CompositeCfg)
202203
case And:
203204
return getNewAndPolicy(settings, &cfg.AndCfg)
205+
case Drop:
206+
return getNewDropPolicy(settings, &cfg.DropCfg)
204207
default:
205208
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
206209
}
@@ -391,6 +394,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
391394
sampling.NotSampled: nil,
392395
sampling.InvertSampled: nil,
393396
sampling.InvertNotSampled: nil,
397+
sampling.Dropped: nil,
394398
}
395399

396400
ctx := context.Background()
@@ -421,13 +425,19 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
421425
if samplingDecisions[decision] == nil {
422426
samplingDecisions[decision] = p
423427
}
428+
429+
// Break early if dropped. This can drastically reduce tick/decision latency.
430+
if decision == sampling.Dropped {
431+
break
432+
}
424433
}
425434

426435
var sampledPolicy *policy
427436

428-
// InvertNotSampled takes precedence over any other decision
429437
switch {
430-
case samplingDecisions[sampling.InvertNotSampled] != nil:
438+
case samplingDecisions[sampling.Dropped] != nil: // Dropped takes precedence
439+
finalDecision = sampling.NotSampled
440+
case samplingDecisions[sampling.InvertNotSampled] != nil: // Then InvertNotSampled
431441
finalDecision = sampling.NotSampled
432442
case samplingDecisions[sampling.Sampled] != nil:
433443
finalDecision = sampling.Sampled

0 commit comments

Comments
 (0)