Skip to content

Commit 1666327

Browse files
authored
[processor/tailsampling] Minor rework to improve performance (#37560)
This pull-request reworks the consume traces, sampling decision, and policy loading paths to improve performance and readability. Pressing the current architecture for the best results, by reducing allocations and calls when sensible. The `makeDecision()` changes alone resulted in a noticeable improvement, using the existing benchmark: `Sampling-8 16.37µ ± 2% 10.96µ ± 3% -33.02% (p=0.000 n=10)` --------- Signed-off-by: Sean Porter <[email protected]>
1 parent 3bbc836 commit 1666327

File tree

2 files changed

+123
-88
lines changed

2 files changed

+123
-88
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tailsamplingprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Reworked the consume traces, sampling decision, and policy loading paths to improve performance and readability"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37560]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/tailsamplingprocessor/processor.go

Lines changed: 96 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -249,34 +249,40 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
249249
telemetrySettings := tsp.set.TelemetrySettings
250250
componentID := tsp.set.ID.Name()
251251

252-
policyNames := map[string]bool{}
253-
tsp.policies = make([]*policy, len(cfgs))
252+
cLen := len(cfgs)
253+
policies := make([]*policy, 0, cLen)
254+
policyNames := make(map[string]struct{}, cLen)
254255

255-
for i := range cfgs {
256-
policyCfg := &cfgs[i]
256+
for _, cfg := range cfgs {
257+
if cfg.Name == "" {
258+
return fmt.Errorf("policy name cannot be empty")
259+
}
257260

258-
if policyNames[policyCfg.Name] {
259-
return fmt.Errorf("duplicate policy name %q", policyCfg.Name)
261+
if _, exists := policyNames[cfg.Name]; exists {
262+
return fmt.Errorf("duplicate policy name %q", cfg.Name)
260263
}
261-
policyNames[policyCfg.Name] = true
264+
policyNames[cfg.Name] = struct{}{}
262265

263-
eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
266+
eval, err := getPolicyEvaluator(telemetrySettings, &cfg)
264267
if err != nil {
265-
return err
268+
return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err)
266269
}
267-
uniquePolicyName := policyCfg.Name
270+
271+
uniquePolicyName := cfg.Name
268272
if componentID != "" {
269-
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
273+
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, cfg.Name)
270274
}
271-
p := &policy{
272-
name: policyCfg.Name,
275+
276+
policies = append(policies, &policy{
277+
name: cfg.Name,
273278
evaluator: eval,
274279
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
275-
}
276-
tsp.policies[i] = p
280+
})
277281
}
278282

279-
tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies)))
283+
tsp.policies = policies
284+
285+
tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies)))
280286

281287
return nil
282288
}
@@ -302,9 +308,6 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
302308

303309
tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))
304310

305-
// In case something goes wrong.
306-
prev := tsp.policies
307-
308311
err := tsp.loadSamplingPolicy(tsp.pendingPolicy)
309312

310313
// Empty pending regardless of error. If policy is invalid, it will fail on
@@ -313,20 +316,22 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
313316

314317
if err != nil {
315318
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
316-
tsp.logger.Debug("Falling back to previous sampling policy")
317-
tsp.policies = prev
319+
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
318320
}
319321
}
320322

321323
func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
324+
tsp.logger.Debug("Sampling Policy Evaluation ticked")
325+
322326
tsp.loadPendingSamplingPolicy()
323327

328+
ctx := context.Background()
324329
metrics := policyMetrics{}
325-
326330
startTime := time.Now()
331+
327332
batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()
328333
batchLen := len(batch)
329-
tsp.logger.Debug("Sampling Policy Evaluation ticked")
334+
330335
for _, id := range batch {
331336
d, ok := tsp.idToTrace.Load(id)
332337
if !ok {
@@ -337,9 +342,8 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
337342
trace.DecisionTime = time.Now()
338343

339344
decision := tsp.makeDecision(id, trace, &metrics)
345+
340346
tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
341-
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
342-
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
343347
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
344348
tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])
345349

@@ -352,12 +356,15 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
352356

353357
switch decision {
354358
case sampling.Sampled:
355-
tsp.releaseSampledTrace(context.Background(), id, allSpans)
359+
tsp.releaseSampledTrace(ctx, id, allSpans)
356360
case sampling.NotSampled:
357361
tsp.releaseNotSampledTrace(id)
358362
}
359363
}
360364

365+
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
366+
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
367+
361368
tsp.logger.Debug("Sampling policy evaluation completed",
362369
zap.Int("batch.len", batchLen),
363370
zap.Int64("sampled", metrics.decisionSampled),
@@ -368,49 +375,47 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
368375
}
369376

370377
func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision {
371-
finalDecision := sampling.NotSampled
372-
samplingDecision := map[sampling.Decision]bool{
373-
sampling.Error: false,
374-
sampling.Sampled: false,
375-
sampling.NotSampled: false,
376-
sampling.InvertSampled: false,
377-
sampling.InvertNotSampled: false,
378-
}
379-
378+
var decisions [8]bool
380379
ctx := context.Background()
381-
// Check all policies before making a final decision
380+
startTime := time.Now()
381+
382+
// Check all policies before making a final decision.
382383
for _, p := range tsp.policies {
383-
policyEvaluateStartTime := time.Now()
384384
decision, err := p.evaluator.Evaluate(ctx, id, trace)
385-
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute)
385+
latency := time.Since(startTime)
386+
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(latency/time.Microsecond), p.attribute)
387+
386388
if err != nil {
387-
samplingDecision[sampling.Error] = true
389+
decisions[sampling.Error] = true
388390
metrics.evaluateErrorCount++
389391
tsp.logger.Debug("Sampling policy error", zap.Error(err))
390-
} else {
391-
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
392-
if telemetry.IsMetricStatCountSpansSampledEnabled() {
393-
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
394-
}
392+
continue
393+
}
395394

396-
samplingDecision[decision] = true
395+
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
396+
397+
if telemetry.IsMetricStatCountSpansSampledEnabled() {
398+
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
397399
}
400+
401+
decisions[decision] = true
398402
}
399403

400-
// InvertNotSampled takes precedence over any other decision
404+
var finalDecision sampling.Decision
401405
switch {
402-
case samplingDecision[sampling.InvertNotSampled]:
406+
case decisions[sampling.InvertNotSampled]: // InvertNotSampled takes precedence
403407
finalDecision = sampling.NotSampled
404-
case samplingDecision[sampling.Sampled]:
408+
case decisions[sampling.Sampled]:
405409
finalDecision = sampling.Sampled
406-
case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]:
410+
case decisions[sampling.InvertSampled] && !decisions[sampling.NotSampled]:
407411
finalDecision = sampling.Sampled
412+
default:
413+
finalDecision = sampling.NotSampled
408414
}
409415

410-
switch finalDecision {
411-
case sampling.Sampled:
416+
if finalDecision == sampling.Sampled {
412417
metrics.decisionSampled++
413-
case sampling.NotSampled:
418+
} else {
414419
metrics.decisionNotSampled++
415420
}
416421

@@ -447,6 +452,8 @@ func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.
447452
}
448453

449454
func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) {
455+
currTime := time.Now()
456+
450457
// Group spans per their traceId to minimize contention on idToTrace
451458
idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans)
452459
var newTraceIDs int64
@@ -476,59 +483,60 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
476483
if !loaded {
477484
spanCount := &atomic.Int64{}
478485
spanCount.Store(lenSpans)
479-
d, loaded = tsp.idToTrace.LoadOrStore(id, &sampling.TraceData{
480-
ArrivalTime: time.Now(),
486+
487+
td := &sampling.TraceData{
488+
ArrivalTime: currTime,
481489
SpanCount: spanCount,
482490
ReceivedBatches: ptrace.NewTraces(),
483-
})
491+
}
492+
493+
if d, loaded = tsp.idToTrace.LoadOrStore(id, td); !loaded {
494+
newTraceIDs++
495+
tsp.decisionBatcher.AddToCurrentBatch(id)
496+
tsp.numTracesOnMap.Add(1)
497+
postDeletion := false
498+
for !postDeletion {
499+
select {
500+
case tsp.deleteChan <- id:
501+
postDeletion = true
502+
default:
503+
traceKeyToDrop := <-tsp.deleteChan
504+
tsp.dropTrace(traceKeyToDrop, currTime)
505+
}
506+
}
507+
}
484508
}
509+
485510
actualData := d.(*sampling.TraceData)
486511
if loaded {
487512
actualData.SpanCount.Add(lenSpans)
488-
} else {
489-
newTraceIDs++
490-
tsp.decisionBatcher.AddToCurrentBatch(id)
491-
tsp.numTracesOnMap.Add(1)
492-
postDeletion := false
493-
currTime := time.Now()
494-
for !postDeletion {
495-
select {
496-
case tsp.deleteChan <- id:
497-
postDeletion = true
498-
default:
499-
traceKeyToDrop := <-tsp.deleteChan
500-
tsp.dropTrace(traceKeyToDrop, currTime)
501-
}
502-
}
503513
}
504514

505-
// The only thing we really care about here is the final decision.
506515
actualData.Lock()
507516
finalDecision := actualData.FinalDecision
508517

509518
if finalDecision == sampling.Unspecified {
510519
// If the final decision hasn't been made, add the new spans under the lock.
511520
appendToTraces(actualData.ReceivedBatches, resourceSpans, spans)
512521
actualData.Unlock()
513-
} else {
514-
actualData.Unlock()
522+
continue
523+
}
515524

516-
switch finalDecision {
517-
case sampling.Sampled:
518-
// Forward the spans to the policy destinations
519-
traceTd := ptrace.NewTraces()
520-
appendToTraces(traceTd, resourceSpans, spans)
521-
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
522-
case sampling.NotSampled:
523-
tsp.releaseNotSampledTrace(id)
524-
default:
525-
tsp.logger.Warn("Encountered unexpected sampling decision",
526-
zap.Int("decision", int(finalDecision)))
527-
}
525+
actualData.Unlock()
528526

529-
if !actualData.DecisionTime.IsZero() {
530-
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
531-
}
527+
switch finalDecision {
528+
case sampling.Sampled:
529+
traceTd := ptrace.NewTraces()
530+
appendToTraces(traceTd, resourceSpans, spans)
531+
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
532+
case sampling.NotSampled:
533+
tsp.releaseNotSampledTrace(id)
534+
default:
535+
tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision)))
536+
}
537+
538+
if !actualData.DecisionTime.IsZero() {
539+
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
532540
}
533541
}
534542

0 commit comments

Comments
 (0)