Skip to content

Commit 7101cbd

Browse files
committed
move span aggregator implementation details to reset method
1 parent 4d311a8 commit 7101cbd

File tree

3 files changed

+84
-20
lines changed

3 files changed

+84
-20
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -493,23 +493,48 @@ def _queue_span_count_metrics(self, metric_name: str, tag_name: str, min_count:
493493
)
494494
self._span_metrics[metric_name] = defaultdict(int)
495495

496-
def _reset(self):
496+
def _reset(
497+
self,
498+
user_processors: Optional[List[TraceProcessor]] = None,
499+
compute_stats: Optional[bool] = None,
500+
apm_opt_out: Optional[bool] = None,
501+
appsec_enabled: Optional[bool] = None,
502+
reset_buffer: bool = True,
503+
) -> None:
497504
try:
498505
self.writer.stop()
499506
except ServiceStatusError:
500507
# Some writers (ex: AgentWriter), start when the first trace chunk is encoded. Stopping
501508
# the writer before that point will raise a ServiceStatusError.
502509
pass
510+
511+
if isinstance(self.writer, AgentWriter) and appsec_enabled:
512+
# If appsec is enabled, we need to reset the API version to v0.4
513+
# to ensure that the appsec meta_struct data is encoded.
514+
self.writer._api_version = "v0.4"
503515
# Re-create the background writer thread
504516
self.writer = self.writer.recreate()
517+
# Re-create the sampling processor with the compute_stats and apm_opt_out values
518+
# Avoids overriding the agent sampling rules
519+
if compute_stats is None:
520+
compute_stats = self.sampling_processor._compute_stats_enabled
521+
if apm_opt_out is None:
522+
apm_opt_out = self.sampling_processor.apm_opt_out
505523
self.sampling_processor = TraceSamplingProcessor(
506-
config._trace_compute_stats,
524+
compute_stats,
507525
get_span_sampling_rules(),
508-
asm_config._apm_opt_out,
526+
apm_opt_out,
509527
self.sampling_processor.sampler._agent_based_samplers,
510528
)
511-
self._traces = defaultdict(lambda: _Trace())
512-
self._span_metrics = {
513-
"spans_created": defaultdict(int),
514-
"spans_finished": defaultdict(int),
515-
}
529+
# Override previously set user processors
530+
if user_processors is not None:
531+
self.user_processors = user_processors
532+
# Reset the trace buffer and span metrics. This is useful when a process forks and we would like to
533+
# avoid sending traces from both the parent and child processes.
534+
if reset_buffer:
535+
self._traces = defaultdict(lambda: _Trace())
536+
self._span_metrics = {
537+
"spans_created": defaultdict(int),
538+
"spans_finished": defaultdict(int),
539+
}
540+
self._lock = RLock()

ddtrace/_trace/tracer.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -368,13 +368,6 @@ def configure(
368368
if compute_stats_enabled is not None:
369369
config._trace_compute_stats = compute_stats_enabled
370370

371-
if isinstance(self._span_aggregator.writer, AgentWriter):
372-
if appsec_enabled:
373-
self._span_aggregator.writer._api_version = "v0.4"
374-
375-
if trace_processors:
376-
self._span_aggregator.user_processors = trace_processors
377-
378371
if any(
379372
x is not None
380373
for x in [
@@ -384,7 +377,7 @@ def configure(
384377
iast_enabled,
385378
]
386379
):
387-
self._recreate()
380+
self._recreate(trace_processors, compute_stats_enabled, appsec_enabled, reset_buffer=False)
388381

389382
if context_provider is not None:
390383
self.context_provider = context_provider
@@ -412,15 +405,26 @@ def _generate_diagnostic_logs(self):
412405

413406
def _child_after_fork(self):
414407
self._pid = getpid()
415-
self._recreate()
408+
self._recreate(reset_buffer=True)
416409
self._new_process = True
417410

418-
def _recreate(self):
411+
def _recreate(
412+
self,
413+
trace_processors: Optional[List[TraceProcessor]] = None,
414+
compute_stats_enabled: Optional[bool] = None,
415+
appsec_enabled: Optional[bool] = None,
416+
reset_buffer: bool = True,
417+
) -> None:
419418
"""Re-initialize the tracer's processors and trace writer"""
420419
# Stop the writer.
421420
# This will stop the periodic thread in HTTPWriters, preventing memory leaks and unnecessary I/O.
422421
self.enabled = config._tracing_enabled
423-
self._span_aggregator._reset()
422+
self._span_aggregator._reset(
423+
user_processors=trace_processors,
424+
compute_stats=compute_stats_enabled,
425+
appsec_enabled=appsec_enabled,
426+
reset_buffer=reset_buffer,
427+
)
424428
self._span_processors, self._appsec_processor = _default_span_processors_factory(
425429
self._endpoint_call_counter_span_processor,
426430
)

tests/tracer/test_processors.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ddtrace.internal.sampling import SamplingMechanism
2525
from ddtrace.internal.sampling import SpanSamplingRule
2626
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
27+
from ddtrace.internal.writer import AgentWriter
2728
from ddtrace.trace import Context
2829
from ddtrace.trace import Span
2930
from tests.utils import DummyTracer
@@ -116,7 +117,9 @@ def process_trace(self, trace):
116117

117118

118119
def test_aggregator_reset():
119-
"""Test that the aggregator can reset trace buffers, sampling processor and trace writer"""
120+
"""Test that on reset, the aggregator clears user processors,
121+
recreates the sampling processor, trace writer, and trace buffer
122+
"""
120123

121124
class DDProc(TraceProcessor):
122125
def process_trace(self, trace):
@@ -155,6 +158,38 @@ def process_trace(self, trace):
155158
assert len(aggr._span_metrics["spans_created"]) == 0
156159

157160

161+
def test_aggregator_reset_with_args():
162+
"""Test that the span aggregator can reset trace buffers, sampling processor and trace api version"""
163+
164+
class DDProc(TraceProcessor):
165+
def process_trace(self, trace):
166+
return trace
167+
168+
class UserProc(TraceProcessor):
169+
def process_trace(self, trace):
170+
return trace
171+
172+
dd_proc = DDProc()
173+
user_proc = UserProc()
174+
aggr = SpanAggregator(
175+
partial_flush_enabled=False,
176+
partial_flush_min_spans=1,
177+
dd_processors=[dd_proc],
178+
user_processors=[user_proc],
179+
)
180+
181+
aggr.writer = AgentWriter("", api_version="v0.5")
182+
span = Span("span", on_finish=[aggr.on_span_finish])
183+
aggr.on_span_start(span)
184+
aggr._reset(user_processors=[], apm_opt_out=False, compute_stats=False, appsec_enabled=True, reset_buffer=False)
185+
assert aggr.user_processors == []
186+
assert aggr.sampling_processor.apm_opt_out is False
187+
assert aggr.sampling_processor._compute_stats_enabled is False
188+
assert aggr.writer._api_version == "v0.4"
189+
assert span.trace_id in aggr._traces
190+
assert len(aggr._span_metrics["spans_created"]) == 1
191+
192+
158193
def test_aggregator_bad_processor():
159194
class Proc(TraceProcessor):
160195
def process_trace(self, trace):

0 commit comments

Comments
 (0)