@@ -249,34 +249,40 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
249
249
telemetrySettings := tsp .set .TelemetrySettings
250
250
componentID := tsp .set .ID .Name ()
251
251
252
- policyNames := map [string ]bool {}
253
- tsp .policies = make ([]* policy , len (cfgs ))
252
+ cLen := len (cfgs )
253
+ policies := make ([]* policy , 0 , cLen )
254
+ policyNames := make (map [string ]struct {}, cLen )
254
255
255
- for i := range cfgs {
256
- policyCfg := & cfgs [i ]
256
+ for _ , cfg := range cfgs {
257
+ if cfg .Name == "" {
258
+ return fmt .Errorf ("policy name cannot be empty" )
259
+ }
257
260
258
- if policyNames [policyCfg .Name ] {
259
- return fmt .Errorf ("duplicate policy name %q" , policyCfg .Name )
261
+ if _ , exists := policyNames [cfg .Name ]; exists {
262
+ return fmt .Errorf ("duplicate policy name %q" , cfg .Name )
260
263
}
261
- policyNames [policyCfg .Name ] = true
264
+ policyNames [cfg .Name ] = struct {}{}
262
265
263
- eval , err := getPolicyEvaluator (telemetrySettings , policyCfg )
266
+ eval , err := getPolicyEvaluator (telemetrySettings , & cfg )
264
267
if err != nil {
265
- return err
268
+ return fmt . Errorf ( "failed to create policy evaluator for %q: %w" , cfg . Name , err )
266
269
}
267
- uniquePolicyName := policyCfg .Name
270
+
271
+ uniquePolicyName := cfg .Name
268
272
if componentID != "" {
269
- uniquePolicyName = fmt .Sprintf ("%s.%s" , componentID , policyCfg .Name )
273
+ uniquePolicyName = fmt .Sprintf ("%s.%s" , componentID , cfg .Name )
270
274
}
271
- p := & policy {
272
- name : policyCfg .Name ,
275
+
276
+ policies = append (policies , & policy {
277
+ name : cfg .Name ,
273
278
evaluator : eval ,
274
279
attribute : metric .WithAttributes (attribute .String ("policy" , uniquePolicyName )),
275
- }
276
- tsp .policies [i ] = p
280
+ })
277
281
}
278
282
279
- tsp .logger .Debug ("Loaded sampling policy" , zap .Int ("policies.len" , len (tsp .policies )))
283
+ tsp .policies = policies
284
+
285
+ tsp .logger .Debug ("Loaded sampling policy" , zap .Int ("policies.len" , len (policies )))
280
286
281
287
return nil
282
288
}
@@ -302,9 +308,6 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
302
308
303
309
tsp .logger .Debug ("Loading pending sampling policy" , zap .Int ("pending.len" , pLen ))
304
310
305
- // In case something goes wrong.
306
- prev := tsp .policies
307
-
308
311
err := tsp .loadSamplingPolicy (tsp .pendingPolicy )
309
312
310
313
// Empty pending regardless of error. If policy is invalid, it will fail on
@@ -313,20 +316,22 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
313
316
314
317
if err != nil {
315
318
tsp .logger .Error ("Failed to load pending sampling policy" , zap .Error (err ))
316
- tsp .logger .Debug ("Falling back to previous sampling policy" )
317
- tsp .policies = prev
319
+ tsp .logger .Debug ("Continuing to use the previously loaded sampling policy" )
318
320
}
319
321
}
320
322
321
323
func (tsp * tailSamplingSpanProcessor ) samplingPolicyOnTick () {
324
+ tsp .logger .Debug ("Sampling Policy Evaluation ticked" )
325
+
322
326
tsp .loadPendingSamplingPolicy ()
323
327
328
+ ctx := context .Background ()
324
329
metrics := policyMetrics {}
325
-
326
330
startTime := time .Now ()
331
+
327
332
batch , _ := tsp .decisionBatcher .CloseCurrentAndTakeFirstBatch ()
328
333
batchLen := len (batch )
329
- tsp . logger . Debug ( "Sampling Policy Evaluation ticked" )
334
+
330
335
for _ , id := range batch {
331
336
d , ok := tsp .idToTrace .Load (id )
332
337
if ! ok {
@@ -337,9 +342,8 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
337
342
trace .DecisionTime = time .Now ()
338
343
339
344
decision := tsp .makeDecision (id , trace , & metrics )
345
+
340
346
tsp .telemetry .ProcessorTailSamplingSamplingDecisionTimerLatency .Record (tsp .ctx , int64 (time .Since (startTime )/ time .Microsecond ))
341
- tsp .telemetry .ProcessorTailSamplingSamplingTraceDroppedTooEarly .Add (tsp .ctx , metrics .idNotFoundOnMapCount )
342
- tsp .telemetry .ProcessorTailSamplingSamplingPolicyEvaluationError .Add (tsp .ctx , metrics .evaluateErrorCount )
343
347
tsp .telemetry .ProcessorTailSamplingSamplingTracesOnMemory .Record (tsp .ctx , int64 (tsp .numTracesOnMap .Load ()))
344
348
tsp .telemetry .ProcessorTailSamplingGlobalCountTracesSampled .Add (tsp .ctx , 1 , decisionToAttribute [decision ])
345
349
@@ -352,12 +356,15 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
352
356
353
357
switch decision {
354
358
case sampling .Sampled :
355
- tsp .releaseSampledTrace (context . Background () , id , allSpans )
359
+ tsp .releaseSampledTrace (ctx , id , allSpans )
356
360
case sampling .NotSampled :
357
361
tsp .releaseNotSampledTrace (id )
358
362
}
359
363
}
360
364
365
+ tsp .telemetry .ProcessorTailSamplingSamplingTraceDroppedTooEarly .Add (tsp .ctx , metrics .idNotFoundOnMapCount )
366
+ tsp .telemetry .ProcessorTailSamplingSamplingPolicyEvaluationError .Add (tsp .ctx , metrics .evaluateErrorCount )
367
+
361
368
tsp .logger .Debug ("Sampling policy evaluation completed" ,
362
369
zap .Int ("batch.len" , batchLen ),
363
370
zap .Int64 ("sampled" , metrics .decisionSampled ),
@@ -368,49 +375,47 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
368
375
}
369
376
370
377
func (tsp * tailSamplingSpanProcessor ) makeDecision (id pcommon.TraceID , trace * sampling.TraceData , metrics * policyMetrics ) sampling.Decision {
371
- finalDecision := sampling .NotSampled
372
- samplingDecision := map [sampling.Decision ]bool {
373
- sampling .Error : false ,
374
- sampling .Sampled : false ,
375
- sampling .NotSampled : false ,
376
- sampling .InvertSampled : false ,
377
- sampling .InvertNotSampled : false ,
378
- }
379
-
378
+ var decisions [8 ]bool
380
379
ctx := context .Background ()
381
- // Check all policies before making a final decision
380
+ startTime := time .Now ()
381
+
382
+ // Check all policies before making a final decision.
382
383
for _ , p := range tsp .policies {
383
- policyEvaluateStartTime := time .Now ()
384
384
decision , err := p .evaluator .Evaluate (ctx , id , trace )
385
- tsp .telemetry .ProcessorTailSamplingSamplingDecisionLatency .Record (ctx , int64 (time .Since (policyEvaluateStartTime )/ time .Microsecond ), p .attribute )
385
+ latency := time .Since (startTime )
386
+ tsp .telemetry .ProcessorTailSamplingSamplingDecisionLatency .Record (ctx , int64 (latency / time .Microsecond ), p .attribute )
387
+
386
388
if err != nil {
387
- samplingDecision [sampling .Error ] = true
389
+ decisions [sampling .Error ] = true
388
390
metrics .evaluateErrorCount ++
389
391
tsp .logger .Debug ("Sampling policy error" , zap .Error (err ))
390
- } else {
391
- tsp .telemetry .ProcessorTailSamplingCountTracesSampled .Add (ctx , 1 , p .attribute , decisionToAttribute [decision ])
392
- if telemetry .IsMetricStatCountSpansSampledEnabled () {
393
- tsp .telemetry .ProcessorTailSamplingCountSpansSampled .Add (ctx , trace .SpanCount .Load (), p .attribute , decisionToAttribute [decision ])
394
- }
392
+ continue
393
+ }
395
394
396
- samplingDecision [decision ] = true
395
+ tsp .telemetry .ProcessorTailSamplingCountTracesSampled .Add (ctx , 1 , p .attribute , decisionToAttribute [decision ])
396
+
397
+ if telemetry .IsMetricStatCountSpansSampledEnabled () {
398
+ tsp .telemetry .ProcessorTailSamplingCountSpansSampled .Add (ctx , trace .SpanCount .Load (), p .attribute , decisionToAttribute [decision ])
397
399
}
400
+
401
+ decisions [decision ] = true
398
402
}
399
403
400
- // InvertNotSampled takes precedence over any other decision
404
+ var finalDecision sampling. Decision
401
405
switch {
402
- case samplingDecision [sampling .InvertNotSampled ]:
406
+ case decisions [sampling .InvertNotSampled ]: // InvertNotSampled takes precedence
403
407
finalDecision = sampling .NotSampled
404
- case samplingDecision [sampling .Sampled ]:
408
+ case decisions [sampling .Sampled ]:
405
409
finalDecision = sampling .Sampled
406
- case samplingDecision [sampling .InvertSampled ] && ! samplingDecision [sampling .NotSampled ]:
410
+ case decisions [sampling .InvertSampled ] && ! decisions [sampling .NotSampled ]:
407
411
finalDecision = sampling .Sampled
412
+ default :
413
+ finalDecision = sampling .NotSampled
408
414
}
409
415
410
- switch finalDecision {
411
- case sampling .Sampled :
416
+ if finalDecision == sampling .Sampled {
412
417
metrics .decisionSampled ++
413
- case sampling . NotSampled :
418
+ } else {
414
419
metrics .decisionNotSampled ++
415
420
}
416
421
@@ -447,6 +452,8 @@ func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace.
447
452
}
448
453
449
454
func (tsp * tailSamplingSpanProcessor ) processTraces (resourceSpans ptrace.ResourceSpans ) {
455
+ currTime := time .Now ()
456
+
450
457
// Group spans per their traceId to minimize contention on idToTrace
451
458
idToSpansAndScope := tsp .groupSpansByTraceKey (resourceSpans )
452
459
var newTraceIDs int64
@@ -476,59 +483,60 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
476
483
if ! loaded {
477
484
spanCount := & atomic.Int64 {}
478
485
spanCount .Store (lenSpans )
479
- d , loaded = tsp .idToTrace .LoadOrStore (id , & sampling.TraceData {
480
- ArrivalTime : time .Now (),
486
+
487
+ td := & sampling.TraceData {
488
+ ArrivalTime : currTime ,
481
489
SpanCount : spanCount ,
482
490
ReceivedBatches : ptrace .NewTraces (),
483
- })
491
+ }
492
+
493
+ if d , loaded = tsp .idToTrace .LoadOrStore (id , td ); ! loaded {
494
+ newTraceIDs ++
495
+ tsp .decisionBatcher .AddToCurrentBatch (id )
496
+ tsp .numTracesOnMap .Add (1 )
497
+ postDeletion := false
498
+ for ! postDeletion {
499
+ select {
500
+ case tsp .deleteChan <- id :
501
+ postDeletion = true
502
+ default :
503
+ traceKeyToDrop := <- tsp .deleteChan
504
+ tsp .dropTrace (traceKeyToDrop , currTime )
505
+ }
506
+ }
507
+ }
484
508
}
509
+
485
510
actualData := d .(* sampling.TraceData )
486
511
if loaded {
487
512
actualData .SpanCount .Add (lenSpans )
488
- } else {
489
- newTraceIDs ++
490
- tsp .decisionBatcher .AddToCurrentBatch (id )
491
- tsp .numTracesOnMap .Add (1 )
492
- postDeletion := false
493
- currTime := time .Now ()
494
- for ! postDeletion {
495
- select {
496
- case tsp .deleteChan <- id :
497
- postDeletion = true
498
- default :
499
- traceKeyToDrop := <- tsp .deleteChan
500
- tsp .dropTrace (traceKeyToDrop , currTime )
501
- }
502
- }
503
513
}
504
514
505
- // The only thing we really care about here is the final decision.
506
515
actualData .Lock ()
507
516
finalDecision := actualData .FinalDecision
508
517
509
518
if finalDecision == sampling .Unspecified {
510
519
// If the final decision hasn't been made, add the new spans under the lock.
511
520
appendToTraces (actualData .ReceivedBatches , resourceSpans , spans )
512
521
actualData .Unlock ()
513
- } else {
514
- actualData . Unlock ()
522
+ continue
523
+ }
515
524
516
- switch finalDecision {
517
- case sampling .Sampled :
518
- // Forward the spans to the policy destinations
519
- traceTd := ptrace .NewTraces ()
520
- appendToTraces (traceTd , resourceSpans , spans )
521
- tsp .releaseSampledTrace (tsp .ctx , id , traceTd )
522
- case sampling .NotSampled :
523
- tsp .releaseNotSampledTrace (id )
524
- default :
525
- tsp .logger .Warn ("Encountered unexpected sampling decision" ,
526
- zap .Int ("decision" , int (finalDecision )))
527
- }
525
+ actualData .Unlock ()
528
526
529
- if ! actualData .DecisionTime .IsZero () {
530
- tsp .telemetry .ProcessorTailSamplingSamplingLateSpanAge .Record (tsp .ctx , int64 (time .Since (actualData .DecisionTime )/ time .Second ))
531
- }
527
+ switch finalDecision {
528
+ case sampling .Sampled :
529
+ traceTd := ptrace .NewTraces ()
530
+ appendToTraces (traceTd , resourceSpans , spans )
531
+ tsp .releaseSampledTrace (tsp .ctx , id , traceTd )
532
+ case sampling .NotSampled :
533
+ tsp .releaseNotSampledTrace (id )
534
+ default :
535
+ tsp .logger .Warn ("Unexpected sampling decision" , zap .Int ("decision" , int (finalDecision )))
536
+ }
537
+
538
+ if ! actualData .DecisionTime .IsZero () {
539
+ tsp .telemetry .ProcessorTailSamplingSamplingLateSpanAge .Record (tsp .ctx , int64 (time .Since (actualData .DecisionTime )/ time .Second ))
532
540
}
533
541
}
534
542
0 commit comments