Skip to content

[processor/tailsampling] Minor rework to improve performance #37560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/tailsamplingprocessor-performance-rework.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Reworked the consume traces, sampling decision, and policy loading paths to improve performance and readability"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37560]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
184 changes: 96 additions & 88 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,34 +249,40 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
telemetrySettings := tsp.set.TelemetrySettings
componentID := tsp.set.ID.Name()

policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfgs))
cLen := len(cfgs)
policies := make([]*policy, 0, cLen)
policyNames := make(map[string]struct{}, cLen)

for i := range cfgs {
policyCfg := &cfgs[i]
for _, cfg := range cfgs {
if cfg.Name == "" {
return fmt.Errorf("policy name cannot be empty")
}

if policyNames[policyCfg.Name] {
return fmt.Errorf("duplicate policy name %q", policyCfg.Name)
if _, exists := policyNames[cfg.Name]; exists {
return fmt.Errorf("duplicate policy name %q", cfg.Name)
}
policyNames[policyCfg.Name] = true
policyNames[cfg.Name] = struct{}{}

eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
eval, err := getPolicyEvaluator(telemetrySettings, &cfg)
if err != nil {
return err
return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err)
}
uniquePolicyName := policyCfg.Name

uniquePolicyName := cfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, cfg.Name)
}
p := &policy{
name: policyCfg.Name,

policies = append(policies, &policy{
name: cfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
})
}

tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies)))
tsp.policies = policies

tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies)))

return nil
}
Expand All @@ -302,9 +308,6 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {

tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))

// In case something goes wrong.
prev := tsp.policies

err := tsp.loadSamplingPolicy(tsp.pendingPolicy)

// Empty pending regardless of error. If policy is invalid, it will fail on
Expand All @@ -313,20 +316,22 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {

if err != nil {
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
tsp.logger.Debug("Falling back to previous sampling policy")
tsp.policies = prev
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
}
}

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
tsp.logger.Debug("Sampling Policy Evaluation ticked")

tsp.loadPendingSamplingPolicy()

ctx := context.Background()
metrics := policyMetrics{}

startTime := time.Now()

batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()
batchLen := len(batch)
tsp.logger.Debug("Sampling Policy Evaluation ticked")

for _, id := range batch {
d, ok := tsp.idToTrace.Load(id)
if !ok {
Expand All @@ -337,9 +342,8 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace.DecisionTime = time.Now()

decision := tsp.makeDecision(id, trace, &metrics)

tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])

Expand All @@ -352,12 +356,15 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {

switch decision {
case sampling.Sampled:
tsp.releaseSampledTrace(context.Background(), id, allSpans)
tsp.releaseSampledTrace(ctx, id, allSpans)
case sampling.NotSampled:
tsp.releaseNotSampledTrace(id)
}
}

tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)

tsp.logger.Debug("Sampling policy evaluation completed",
zap.Int("batch.len", batchLen),
zap.Int64("sampled", metrics.decisionSampled),
Expand All @@ -368,49 +375,47 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
}

func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision {
finalDecision := sampling.NotSampled
samplingDecision := map[sampling.Decision]bool{
sampling.Error: false,
sampling.Sampled: false,
sampling.NotSampled: false,
sampling.InvertSampled: false,
sampling.InvertNotSampled: false,
}

var decisions [8]bool
ctx := context.Background()
// Check all policies before making a final decision
startTime := time.Now()

// Check all policies before making a final decision.
for _, p := range tsp.policies {
policyEvaluateStartTime := time.Now()
decision, err := p.evaluator.Evaluate(ctx, id, trace)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute)
latency := time.Since(startTime)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(latency/time.Microsecond), p.attribute)

if err != nil {
samplingDecision[sampling.Error] = true
decisions[sampling.Error] = true
metrics.evaluateErrorCount++
tsp.logger.Debug("Sampling policy error", zap.Error(err))
} else {
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
if telemetry.IsMetricStatCountSpansSampledEnabled() {
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
}
continue
}

samplingDecision[decision] = true
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])

if telemetry.IsMetricStatCountSpansSampledEnabled() {
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
}

decisions[decision] = true
}

// InvertNotSampled takes precedence over any other decision
var finalDecision sampling.Decision
switch {
case samplingDecision[sampling.InvertNotSampled]:
case decisions[sampling.InvertNotSampled]: // InvertNotSampled takes precedence
finalDecision = sampling.NotSampled
case samplingDecision[sampling.Sampled]:
case decisions[sampling.Sampled]:
finalDecision = sampling.Sampled
case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]:
case decisions[sampling.InvertSampled] && !decisions[sampling.NotSampled]:
finalDecision = sampling.Sampled
default:
finalDecision = sampling.NotSampled
}

switch finalDecision {
case sampling.Sampled:
if finalDecision == sampling.Sampled {
metrics.decisionSampled++
case sampling.NotSampled:
} else {
metrics.decisionNotSampled++
}

Expand Down Expand Up @@ -447,6 +452,8 @@ func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.
}

func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) {
currTime := time.Now()

// Group spans per their traceId to minimize contention on idToTrace
idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans)
var newTraceIDs int64
Expand Down Expand Up @@ -476,59 +483,60 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
if !loaded {
spanCount := &atomic.Int64{}
spanCount.Store(lenSpans)
d, loaded = tsp.idToTrace.LoadOrStore(id, &sampling.TraceData{
ArrivalTime: time.Now(),

td := &sampling.TraceData{
ArrivalTime: currTime,
SpanCount: spanCount,
ReceivedBatches: ptrace.NewTraces(),
})
}

if d, loaded = tsp.idToTrace.LoadOrStore(id, td); !loaded {
newTraceIDs++
tsp.decisionBatcher.AddToCurrentBatch(id)
tsp.numTracesOnMap.Add(1)
postDeletion := false
for !postDeletion {
select {
case tsp.deleteChan <- id:
postDeletion = true
default:
traceKeyToDrop := <-tsp.deleteChan
tsp.dropTrace(traceKeyToDrop, currTime)
}
}
}
}

actualData := d.(*sampling.TraceData)
if loaded {
actualData.SpanCount.Add(lenSpans)
} else {
newTraceIDs++
tsp.decisionBatcher.AddToCurrentBatch(id)
tsp.numTracesOnMap.Add(1)
postDeletion := false
currTime := time.Now()
for !postDeletion {
select {
case tsp.deleteChan <- id:
postDeletion = true
default:
traceKeyToDrop := <-tsp.deleteChan
tsp.dropTrace(traceKeyToDrop, currTime)
}
}
}

// The only thing we really care about here is the final decision.
actualData.Lock()
finalDecision := actualData.FinalDecision

if finalDecision == sampling.Unspecified {
// If the final decision hasn't been made, add the new spans under the lock.
appendToTraces(actualData.ReceivedBatches, resourceSpans, spans)
actualData.Unlock()
} else {
actualData.Unlock()
continue
}

switch finalDecision {
case sampling.Sampled:
// Forward the spans to the policy destinations
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
case sampling.NotSampled:
tsp.releaseNotSampledTrace(id)
default:
tsp.logger.Warn("Encountered unexpected sampling decision",
zap.Int("decision", int(finalDecision)))
}
actualData.Unlock()

if !actualData.DecisionTime.IsZero() {
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
}
switch finalDecision {
case sampling.Sampled:
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
case sampling.NotSampled:
tsp.releaseNotSampledTrace(id)
default:
tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision)))
}

if !actualData.DecisionTime.IsZero() {
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
}
}

Expand Down