Skip to content

Commit adcdc4e

Browse files
mabdinurbrettlangdon
authored andcommitted
fix(sampling): ensure agent based sampling is not reset after forking and on tracer.configure (#13560)
Builds on 7fbdc9f - Fix: Avoids reinitializing the SpanAggregator on `tracer.configure(...)` and when an application forks. Instead `SpanAggregator.reset()` is called. This operation ensures global configurations are re-applied, trace buffer can be reset, and trace writer is recreated. This ensures agent based sampling rules are not reset. - Clean up - Removes `writer` parameter from `SpanAggregator.__init__(...)` with this change the intialization of the global writer is an implementation detail of the SpanAggregator. There is no longer a need to supply the `SpanAggregator` with a writer on the initialization of the global tracer. - Moves all implementation details of resetting the `SpanAggregator` from `Tracer.configure(...)` and `Tracer._recreate(...)` to `SpanAggregator.reset(...)`. - Removes the initialization of the SpanAggregator from `_default_span_processors_factory`. With this change the global tracer's SpanAggregator is never re-created. It's only modified when `tracer.configure(..)` is used. - Rename `DatadogSampler._service_based_samplers` property to `DatadogSampler._agent_based_sampler` to improve clarity. These sampling rules are no longer supplied via environment variables or a programatic api, they can only be set by the Datadog Agent. - Splits `SpanAggregator.trace_proccessors` into two properties `SpanAggregator.dd_proccessors` and `SpanAggregator.user_processors`. `SpanAggregator.users_proccessors` is set after application startup via `Tracer.configure(..)` while `SpanAggregator.dd_proccessors` is internal to the ddtrace library and should only be set by ddtrace components. This separation allows us to avoid recreating all trace processors when `tracer.configure()` is called. - Adds a more descriptive release note to an unreleased fix. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Brett Langdon <[email protected]> (cherry picked from commit ca79351)
1 parent a3c6229 commit adcdc4e

File tree

13 files changed

+300
-100
lines changed

13 files changed

+300
-100
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from os import environ
55
from threading import RLock
66
from typing import Dict
7-
from typing import Iterable
87
from typing import List
98
from typing import Optional
109

@@ -128,7 +127,13 @@ class TraceSamplingProcessor(TraceProcessor):
128127
Agent even if the dropped trace is not (as is the case when trace stats computation is enabled).
129128
"""
130129

131-
def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamplingRule], apm_opt_out: bool):
130+
def __init__(
131+
self,
132+
compute_stats_enabled: bool,
133+
single_span_rules: List[SpanSamplingRule],
134+
apm_opt_out: bool,
135+
agent_based_samplers: Optional[dict] = None,
136+
):
132137
super(TraceSamplingProcessor, self).__init__()
133138
self._compute_stats_enabled = compute_stats_enabled
134139
self.single_span_rules = single_span_rules
@@ -137,9 +142,14 @@ def __init__(self, compute_stats_enabled: bool, single_span_rules: List[SpanSamp
137142
# If ASM is enabled but tracing is disabled,
138143
# we need to set the rate limiting to 1 trace per minute
139144
# for the backend to consider the service as alive.
140-
self.sampler = DatadogSampler(rate_limit=1, rate_limit_window=60e9, rate_limit_always_on=True)
145+
self.sampler = DatadogSampler(
146+
rate_limit=1,
147+
rate_limit_window=60e9,
148+
rate_limit_always_on=True,
149+
agent_based_samplers=agent_based_samplers,
150+
)
141151
else:
142-
self.sampler = DatadogSampler()
152+
self.sampler = DatadogSampler(agent_based_samplers=agent_based_samplers)
143153

144154
def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
145155
if trace:
@@ -260,8 +270,8 @@ def __init__(
260270
self,
261271
partial_flush_enabled: bool,
262272
partial_flush_min_spans: int,
263-
trace_processors: Iterable[TraceProcessor],
264-
writer: Optional[TraceWriter] = None,
273+
dd_processors: Optional[List[TraceProcessor]] = None,
274+
user_processors: Optional[List[TraceProcessor]] = None,
265275
):
266276
# Set partial flushing
267277
self.partial_flush_enabled = partial_flush_enabled
@@ -271,12 +281,10 @@ def __init__(
271281
config._trace_compute_stats, get_span_sampling_rules(), asm_config._apm_opt_out
272282
)
273283
self.tags_processor = TraceTagsProcessor()
274-
self.trace_processors = trace_processors
275-
# Initialize writer
276-
if writer is not None:
277-
self.writer: TraceWriter = writer
278-
elif SpanAggregator._use_log_writer():
279-
self.writer = LogWriter()
284+
self.dd_processors = dd_processors or []
285+
self.user_processors = user_processors or []
286+
if SpanAggregator._use_log_writer():
287+
self.writer: TraceWriter = LogWriter()
280288
else:
281289
verify_url(agent_config.trace_agent_url)
282290
self.writer = AgentWriter(
@@ -307,7 +315,9 @@ def __repr__(self) -> str:
307315
f"{self.partial_flush_min_spans}, "
308316
f"{self.sampling_processor},"
309317
f"{self.tags_processor},"
310-
f"{self.trace_processors}, "
318+
f"{self.dd_processors}, "
319+
f"{self.user_processors}, "
320+
f"{self._span_metrics}, "
311321
f"{self.writer})"
312322
)
313323

@@ -369,7 +379,9 @@ def on_span_finish(self, span: Span) -> None:
369379
finished[0].set_metric("_dd.py.partial_flush", num_finished)
370380

371381
spans: Optional[List[Span]] = finished
372-
for tp in chain(self.trace_processors, [self.sampling_processor, self.tags_processor]):
382+
for tp in chain(
383+
self.dd_processors, self.user_processors, [self.sampling_processor, self.tags_processor]
384+
):
373385
try:
374386
if spans is None:
375387
return
@@ -480,3 +492,58 @@ def _queue_span_count_metrics(self, metric_name: str, tag_name: str, min_count:
480492
TELEMETRY_NAMESPACE.TRACERS, metric_name, count, tags=((tag_name, tag_value),)
481493
)
482494
self._span_metrics[metric_name] = defaultdict(int)
495+
496+
def reset(
497+
self,
498+
user_processors: Optional[List[TraceProcessor]] = None,
499+
compute_stats: Optional[bool] = None,
500+
apm_opt_out: Optional[bool] = None,
501+
appsec_enabled: Optional[bool] = None,
502+
reset_buffer: bool = True,
503+
) -> None:
504+
"""
505+
Resets the internal state of the SpanAggregator, including the writer, sampling processor,
506+
user-defined processors, and optionally the trace buffer and span metrics.
507+
508+
This method is typically used after a process fork or during runtime reconfiguration.
509+
Arguments that are None will not override existing values.
510+
"""
511+
try:
512+
# Stop the writer to ensure it is not running while we reconfigure it.
513+
self.writer.stop()
514+
except ServiceStatusError:
515+
# Writers like AgentWriter may not start until the first trace is encoded.
516+
# Stopping them before that will raise a ServiceStatusError.
517+
pass
518+
519+
if isinstance(self.writer, AgentWriter) and appsec_enabled:
520+
# Ensure AppSec metadata is encoded by setting the API version to v0.4.
521+
self.writer._api_version = "v0.4"
522+
# Re-create the writer to ensure it is consistent with updated configurations (ex: api_version)
523+
self.writer = self.writer.recreate()
524+
525+
# Recreate the sampling processor using new or existing config values.
526+
# If an argument is None, the current value is preserved.
527+
if compute_stats is None:
528+
compute_stats = self.sampling_processor._compute_stats_enabled
529+
if apm_opt_out is None:
530+
apm_opt_out = self.sampling_processor.apm_opt_out
531+
self.sampling_processor = TraceSamplingProcessor(
532+
compute_stats,
533+
get_span_sampling_rules(),
534+
apm_opt_out,
535+
self.sampling_processor.sampler._agent_based_samplers,
536+
)
537+
538+
# Update user processors if provided.
539+
if user_processors is not None:
540+
self.user_processors = user_processors
541+
542+
# Reset the trace buffer and span metrics.
543+
# Useful when forking to prevent sending duplicate spans from parent and child processes.
544+
if reset_buffer:
545+
self._traces = defaultdict(lambda: _Trace())
546+
self._span_metrics = {
547+
"spans_created": defaultdict(int),
548+
"spans_finished": defaultdict(int),
549+
}

ddtrace/_trace/sampler.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class DatadogSampler:
7575
"limiter",
7676
"rules",
7777
"_rate_limit_always_on",
78-
"_by_service_samplers",
78+
"_agent_based_samplers",
7979
)
8080
_default_key = "service:,env:"
8181

@@ -85,6 +85,7 @@ def __init__(
8585
rate_limit: Optional[int] = None,
8686
rate_limit_window: float = 1e9,
8787
rate_limit_always_on: bool = False,
88+
agent_based_samplers: Optional[Dict[str, RateSampler]] = None,
8889
):
8990
"""
9091
Constructor for DatadogSampler sampler
@@ -101,7 +102,7 @@ def __init__(
101102
else:
102103
self.rules: List[SamplingRule] = rules or []
103104
# Set Agent based samplers
104-
self._by_service_samplers: Dict[str, RateSampler] = {}
105+
self._agent_based_samplers = agent_based_samplers or {}
105106
# Set rate limiter
106107
self._rate_limit_always_on: bool = rate_limit_always_on
107108
if rate_limit is None:
@@ -119,10 +120,10 @@ def update_rate_by_service_sample_rates(self, rate_by_service: Dict[str, float])
119120
samplers: Dict[str, RateSampler] = {}
120121
for key, sample_rate in rate_by_service.items():
121122
samplers[key] = RateSampler(sample_rate)
122-
self._by_service_samplers = samplers
123+
self._agent_based_samplers = samplers
123124

124125
def __str__(self):
125-
rates = {key: sampler.sample_rate for key, sampler in self._by_service_samplers.items()}
126+
rates = {key: sampler.sample_rate for key, sampler in self._agent_based_samplers.items()}
126127
return "{}(agent_rates={!r}, limiter={!r}, rules={!r}), rate_limit_always_on={!r}".format(
127128
self.__class__.__name__,
128129
rates,
@@ -181,11 +182,11 @@ def sample(self, span: Span) -> bool:
181182
sample_rate = matched_rule.sample_rate
182183
else:
183184
key = self._key(span.service, span.get_tag(ENV_KEY))
184-
if key in self._by_service_samplers:
185+
if key in self._agent_based_samplers:
185186
# Agent service based sampling
186187
agent_service_based = True
187-
sampled = self._by_service_samplers[key].sample(span)
188-
sample_rate = self._by_service_samplers[key].sample_rate
188+
sampled = self._agent_based_samplers[key].sample(span)
189+
sample_rate = self._agent_based_samplers[key].sample_rate
189190

190191
if matched_rule or self._rate_limit_always_on:
191192
# Avoid rate limiting when trace sample rules and/or sample rates are NOT provided

ddtrace/_trace/tracer.py

Lines changed: 30 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,10 @@
4646
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
4747
from ddtrace.internal.runtime import get_runtime_id
4848
from ddtrace.internal.schema.processor import BaseServiceProcessor
49-
from ddtrace.internal.service import ServiceStatusError
5049
from ddtrace.internal.utils import _get_metas_to_propagate
5150
from ddtrace.internal.utils.formats import format_trace_id
5251
from ddtrace.internal.writer import AgentWriter
5352
from ddtrace.internal.writer import HTTPWriter
54-
from ddtrace.internal.writer import TraceWriter
5553
from ddtrace.settings._config import config
5654
from ddtrace.settings.asm import config as asm_config
5755
from ddtrace.settings.peer_service import _ps_config
@@ -89,20 +87,9 @@ def _start_appsec_processor() -> Optional["AppSecSpanProcessor"]:
8987

9088

9189
def _default_span_processors_factory(
92-
trace_filters: List[TraceProcessor],
93-
writer: Optional[TraceWriter],
94-
partial_flush_enabled: bool,
95-
partial_flush_min_spans: int,
9690
profiling_span_processor: EndpointCallCounterProcessor,
97-
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"], SpanAggregator]:
91+
) -> Tuple[List[SpanProcessor], Optional["AppSecSpanProcessor"]]:
9892
"""Construct the default list of span processors to use."""
99-
trace_processors: List[TraceProcessor] = []
100-
trace_processors += [
101-
PeerServiceProcessor(_ps_config),
102-
BaseServiceProcessor(),
103-
]
104-
trace_processors += trace_filters
105-
10693
span_processors: List[SpanProcessor] = []
10794
span_processors += [TopLevelSpanProcessor()]
10895

@@ -140,14 +127,7 @@ def _default_span_processors_factory(
140127

141128
span_processors.append(profiling_span_processor)
142129

143-
# These need to run after all the other processors
144-
span_aggregagtor = SpanAggregator(
145-
partial_flush_enabled=partial_flush_enabled,
146-
partial_flush_min_spans=partial_flush_min_spans,
147-
trace_processors=trace_processors,
148-
writer=writer,
149-
)
150-
return span_processors, appsec_processor, span_aggregagtor
130+
return span_processors, appsec_processor
151131

152132

153133
class Tracer(object):
@@ -181,8 +161,6 @@ def __init__(self) -> None:
181161
"Initializing multiple Tracer instances is not supported. Use ``ddtrace.trace.tracer`` instead.",
182162
)
183163

184-
self._user_trace_processors: List[TraceProcessor] = []
185-
186164
# globally set tags
187165
self._tags = config.tags.copy()
188166

@@ -199,12 +177,13 @@ def __init__(self) -> None:
199177
config._trace_compute_stats = False
200178
# Direct link to the appsec processor
201179
self._endpoint_call_counter_span_processor = EndpointCallCounterProcessor()
202-
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
203-
self._user_trace_processors,
204-
None,
205-
config._partial_flush_enabled,
206-
config._partial_flush_min_spans,
207-
self._endpoint_call_counter_span_processor,
180+
self._span_processors, self._appsec_processor = _default_span_processors_factory(
181+
self._endpoint_call_counter_span_processor
182+
)
183+
self._span_aggregator = SpanAggregator(
184+
partial_flush_enabled=config._partial_flush_enabled,
185+
partial_flush_min_spans=config._partial_flush_min_spans,
186+
dd_processors=[PeerServiceProcessor(_ps_config), BaseServiceProcessor()],
208187
)
209188
if config._data_streams_enabled:
210189
# Inline the import to avoid pulling in ddsketch or protobuf
@@ -389,13 +368,6 @@ def configure(
389368
if compute_stats_enabled is not None:
390369
config._trace_compute_stats = compute_stats_enabled
391370

392-
if isinstance(self._span_aggregator.writer, AgentWriter):
393-
if appsec_enabled:
394-
self._span_aggregator.writer._api_version = "v0.4"
395-
396-
if trace_processors:
397-
self._user_trace_processors = trace_processors
398-
399371
if any(
400372
x is not None
401373
for x in [
@@ -405,7 +377,9 @@ def configure(
405377
iast_enabled,
406378
]
407379
):
408-
self._recreate()
380+
self._recreate(
381+
trace_processors, compute_stats_enabled, asm_config._apm_opt_out, appsec_enabled, reset_buffer=False
382+
)
409383

410384
if context_provider is not None:
411385
self.context_provider = context_provider
@@ -433,31 +407,31 @@ def _generate_diagnostic_logs(self):
433407

434408
def _child_after_fork(self):
435409
self._pid = getpid()
436-
self._recreate()
410+
self._recreate(reset_buffer=True)
437411
self._new_process = True
438412

439-
def _recreate(self):
440-
"""Re-initialize the tracer's processors and trace writer. This method should only be used in tests."""
413+
def _recreate(
414+
self,
415+
trace_processors: Optional[List[TraceProcessor]] = None,
416+
compute_stats_enabled: Optional[bool] = None,
417+
apm_opt_out: Optional[bool] = None,
418+
appsec_enabled: Optional[bool] = None,
419+
reset_buffer: bool = True,
420+
) -> None:
421+
"""Re-initialize the tracer's processors and trace writer"""
441422
# Stop the writer.
442423
# This will stop the periodic thread in HTTPWriters, preventing memory leaks and unnecessary I/O.
443-
try:
444-
self._span_aggregator.writer.stop()
445-
except ServiceStatusError:
446-
# Some writers (ex: AgentWriter), start when the first trace chunk is encoded. Stopping
447-
# the writer before that point will raise a ServiceStatusError.
448-
pass
449-
# Re-create the background writer thread
450-
rules = self._span_aggregator.sampling_processor.sampler._by_service_samplers
451-
self._span_aggregator.writer = self._span_aggregator.writer.recreate()
452424
self.enabled = config._tracing_enabled
453-
self._span_processors, self._appsec_processor, self._span_aggregator = _default_span_processors_factory(
454-
self._user_trace_processors,
455-
self._span_aggregator.writer,
456-
self._span_aggregator.partial_flush_enabled,
457-
self._span_aggregator.partial_flush_min_spans,
425+
self._span_aggregator.reset(
426+
user_processors=trace_processors,
427+
compute_stats=compute_stats_enabled,
428+
apm_opt_out=apm_opt_out,
429+
appsec_enabled=appsec_enabled,
430+
reset_buffer=reset_buffer,
431+
)
432+
self._span_processors, self._appsec_processor = _default_span_processors_factory(
458433
self._endpoint_call_counter_span_processor,
459434
)
460-
self._span_aggregator.sampling_processor.sampler._by_service_samplers = rules.copy()
461435

462436
def _start_span_after_shutdown(
463437
self,

ddtrace/internal/ci_visibility/recorder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ def disable(cls) -> None:
630630
log.debug("%s disabled", cls.__name__)
631631

632632
def _start_service(self) -> None:
633-
tracer_filters = self.tracer._user_trace_processors
633+
tracer_filters = self.tracer._span_aggregator.user_processors
634634
if not any(isinstance(tracer_filter, TraceCiVisibilityFilter) for tracer_filter in tracer_filters):
635635
tracer_filters += [TraceCiVisibilityFilter(self._tags, self._service)] # type: ignore[arg-type]
636636
self.tracer.configure(trace_processors=tracer_filters)

ddtrace/internal/writer/writer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ def recreate(self) -> HTTPWriter:
553553
api_version=self._api_version,
554554
headers=self._headers,
555555
report_metrics=self._report_metrics,
556+
response_callback=self._response_cb,
556557
)
557558
return new_instance
558559

ddtrace/opentracer/tracer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(
104104
trace_processors = None
105105
if isinstance(self._config.get(keys.SETTINGS), dict) and self._config[keys.SETTINGS].get("FILTERS"): # type: ignore[union-attr]
106106
trace_processors = self._config[keys.SETTINGS]["FILTERS"] # type: ignore[index]
107-
self._dd_tracer._user_trace_processors = trace_processors
107+
self._dd_tracer._span_aggregator.user_processors = trace_processors
108108

109109
if self._config[keys.ENABLED]:
110110
self._dd_tracer.enabled = self._config[keys.ENABLED]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: Resolves a sampling issue where agent-based sampling rates were not correctly applied after a process forked or the tracer was reconfigured.

0 commit comments

Comments
 (0)