Skip to content

Commit 31fcfbd

Browse files
authored
[processor/tailsampling] Add first policy match decision to tailsampling processor (#39655)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The policies are evaluated in order, and the first policy that matches will be used to determine the sample rate. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue #36795 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit-Test added <!--Describe the documentation added.--> #### Documentation ✅ new config added to README <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 38c514b commit 31fcfbd

File tree

7 files changed

+166
-30
lines changed

7 files changed

+166
-30
lines changed

.chloggen/first-policy-match.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/tailsampling
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add first policy match decision to tailsampling processor
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36795]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/tailsamplingprocessor/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ The following configuration options can also be modified:
6060
- `non_sampled_cache_size` (default = 0) Configures amount of trace IDs to be kept in an LRU cache,
6161
persisting the "drop" decisions for traces that may have already been released from memory.
6262
By default, the size is 0 and the cache is inactive.
63+
- `sample_on_first_match`: Make decision as soon as a policy matches
6364

6465

6566
Each policy will result in a decision, and the processor will evaluate them to make a final decision:

processor/tailsamplingprocessor/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,6 @@ type Config struct {
264264
DecisionCache DecisionCacheConfig `mapstructure:"decision_cache"`
265265
// Options allows for additional configuration of the tail-based sampling processor in code.
266266
Options []Option `mapstructure:"-"`
267+
// Make decision as soon as a policy matches
268+
SampleOnFirstMatch bool `mapstructure:"sample_on_first_match"`
267269
}

processor/tailsamplingprocessor/factory.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ func NewFactory() processor.Factory {
2727

2828
func createDefaultConfig() component.Config {
2929
return &Config{
30-
DecisionWait: 30 * time.Second,
31-
NumTraces: 50000,
30+
DecisionWait: 30 * time.Second,
31+
NumTraces: 50000,
32+
SampleOnFirstMatch: false,
3233
}
3334
}
3435

processor/tailsamplingprocessor/processor.go

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"math"
1111
"runtime"
12+
"slices"
1213
"sync"
1314
"sync/atomic"
1415
"time"
@@ -50,20 +51,21 @@ type tailSamplingSpanProcessor struct {
5051
telemetry *metadata.TelemetryBuilder
5152
logger *zap.Logger
5253

53-
nextConsumer consumer.Traces
54-
maxNumTraces uint64
55-
policies []*policy
56-
idToTrace sync.Map
57-
policyTicker timeutils.TTicker
58-
tickerFrequency time.Duration
59-
decisionBatcher idbatcher.Batcher
60-
sampledIDCache cache.Cache[bool]
61-
nonSampledIDCache cache.Cache[bool]
62-
deleteChan chan pcommon.TraceID
63-
numTracesOnMap *atomic.Uint64
64-
recordPolicy bool
65-
setPolicyMux sync.Mutex
66-
pendingPolicy []PolicyCfg
54+
nextConsumer consumer.Traces
55+
maxNumTraces uint64
56+
policies []*policy
57+
idToTrace sync.Map
58+
policyTicker timeutils.TTicker
59+
tickerFrequency time.Duration
60+
decisionBatcher idbatcher.Batcher
61+
sampledIDCache cache.Cache[bool]
62+
nonSampledIDCache cache.Cache[bool]
63+
deleteChan chan pcommon.TraceID
64+
numTracesOnMap *atomic.Uint64
65+
recordPolicy bool
66+
setPolicyMux sync.Mutex
67+
pendingPolicy []PolicyCfg
68+
sampleOnFirstMatch bool
6769
}
6870

6971
// spanAndScope a structure for holding information about span and its instrumentation scope.
@@ -113,16 +115,17 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
113115
}
114116

115117
tsp := &tailSamplingSpanProcessor{
116-
ctx: ctx,
117-
set: set,
118-
telemetry: telemetry,
119-
nextConsumer: nextConsumer,
120-
maxNumTraces: cfg.NumTraces,
121-
sampledIDCache: sampledDecisions,
122-
nonSampledIDCache: nonSampledDecisions,
123-
logger: telemetrySettings.Logger,
124-
numTracesOnMap: &atomic.Uint64{},
125-
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
118+
ctx: ctx,
119+
set: set,
120+
telemetry: telemetry,
121+
nextConsumer: nextConsumer,
122+
maxNumTraces: cfg.NumTraces,
123+
sampledIDCache: sampledDecisions,
124+
nonSampledIDCache: nonSampledDecisions,
125+
logger: telemetrySettings.Logger,
126+
numTracesOnMap: &atomic.Uint64{},
127+
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
128+
sampleOnFirstMatch: cfg.SampleOnFirstMatch,
126129
}
127130
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}
128131

@@ -263,6 +266,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
263266

264267
cLen := len(cfgs)
265268
policies := make([]*policy, 0, cLen)
269+
dropPolicies := make([]*policy, 0, cLen)
266270
policyNames := make(map[string]struct{}, cLen)
267271

268272
for _, cfg := range cfgs {
@@ -285,14 +289,20 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
285289
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, cfg.Name)
286290
}
287291

288-
policies = append(policies, &policy{
292+
p := &policy{
289293
name: cfg.Name,
290294
evaluator: eval,
291295
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
292-
})
293-
}
296+
}
294297

295-
tsp.policies = policies
298+
if cfg.Type == Drop {
299+
dropPolicies = append(dropPolicies, p)
300+
} else {
301+
policies = append(policies, p)
302+
}
303+
}
304+
// Dropped decision takes precedence over all others, therefore we evaluate them first.
305+
tsp.policies = slices.Concat(dropPolicies, policies)
296306

297307
tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies)))
298308

@@ -430,6 +440,10 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
430440
if decision == sampling.Dropped {
431441
break
432442
}
443+
// If sampleOnFirstMatch is enabled, make decision as soon as a policy matches
444+
if tsp.sampleOnFirstMatch && decision == sampling.Sampled {
445+
break
446+
}
433447
}
434448

435449
var sampledPolicy *policy

processor/tailsamplingprocessor/processor_decisions_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,3 +636,61 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
636636
require.Equal(t, 1, mpe.EvaluationCount)
637637
require.Equal(t, 0, nextConsumer.SpanCount(), "original final decision not honored")
638638
}
639+
640+
func TestSampleOnFirstMatch(t *testing.T) {
641+
nextConsumer := new(consumertest.TracesSink)
642+
idb := newSyncIDBatcher()
643+
644+
mpe1 := &mockPolicyEvaluator{}
645+
mpe2 := &mockPolicyEvaluator{}
646+
mpe3 := &mockPolicyEvaluator{}
647+
648+
policies := []*policy{
649+
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
650+
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
651+
{name: "mock-policy-3", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-3"))},
652+
}
653+
654+
cfg := Config{
655+
DecisionWait: defaultTestDecisionWait,
656+
NumTraces: defaultNumTraces,
657+
SampleOnFirstMatch: true,
658+
Options: []Option{
659+
withDecisionBatcher(idb),
660+
withPolicies(policies),
661+
},
662+
}
663+
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(metadata.Type), nextConsumer, cfg)
664+
require.NoError(t, err)
665+
666+
require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
667+
defer func() {
668+
require.NoError(t, p.Shutdown(context.Background()))
669+
}()
670+
671+
// Second policy matches, last policy should not be evaluated
672+
mpe1.NextDecision = sampling.NotSampled
673+
mpe2.NextDecision = sampling.Sampled
674+
675+
// Generate and deliver first span
676+
require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces()))
677+
678+
tsp := p.(*tailSamplingSpanProcessor)
679+
680+
// The first tick won't do anything
681+
tsp.policyTicker.OnTick()
682+
require.Equal(t, 0, mpe1.EvaluationCount)
683+
require.Equal(t, 0, mpe2.EvaluationCount)
684+
require.Equal(t, 0, mpe3.EvaluationCount)
685+
686+
// This will cause policy evaluations on the first span
687+
tsp.policyTicker.OnTick()
688+
689+
// Only the first policy should have been evaluated
690+
require.Equal(t, 1, mpe1.EvaluationCount)
691+
require.Equal(t, 1, mpe2.EvaluationCount)
692+
require.Equal(t, 0, mpe3.EvaluationCount)
693+
694+
// The final decision SHOULD be Sampled.
695+
require.Equal(t, 1, nextConsumer.SpanCount())
696+
}

processor/tailsamplingprocessor/processor_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,39 @@ func TestDecisionPolicyMetrics(t *testing.T) {
669669
assert.EqualValues(t, 0, metrics.evaluateErrorCount)
670670
}
671671

672+
func TestDropPolicyIsFirstInPolicyList(t *testing.T) {
673+
idb := newSyncIDBatcher()
674+
msp := new(consumertest.TracesSink)
675+
676+
cfg := Config{
677+
DecisionWait: defaultTestDecisionWait,
678+
NumTraces: defaultNumTraces,
679+
PolicyCfgs: []PolicyCfg{
680+
{
681+
sharedPolicyCfg: sharedPolicyCfg{
682+
Name: "regular-policy",
683+
Type: AlwaysSample,
684+
},
685+
},
686+
{
687+
sharedPolicyCfg: sharedPolicyCfg{
688+
Name: "drop-policy",
689+
Type: Drop,
690+
},
691+
},
692+
},
693+
Options: []Option{
694+
withDecisionBatcher(idb),
695+
},
696+
}
697+
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(metadata.Type), msp, cfg)
698+
require.NoError(t, err)
699+
700+
tsp := p.(*tailSamplingSpanProcessor)
701+
require.GreaterOrEqual(t, len(tsp.policies), 2)
702+
assert.Equal(t, "drop-policy", tsp.policies[0].name)
703+
}
704+
672705
func collectSpanIDs(trace ptrace.Traces) []pcommon.SpanID {
673706
var spanIDs []pcommon.SpanID
674707

0 commit comments

Comments
 (0)