Skip to content

Commit d2a195e

Browse files
feat: lambda support for DSM (#622)
* removed current dsm implementation * new dsm lambda implementation * check env variable in proper way * add tests * add more detailed comment to function * fixed lint for tests * remove not needed tests * fix * error handling * fix * some fixes * renamed to extract_context_with_datastreams * changed to explicit check of dsm context * move dsm tests to test_tracing.py * add wanted tests * caught sns -> sqs bug * revert back to original tracing.py implementation * fix lint * revert spacing stuff to original * remove unneccessary checks, still set checkpoints even when dsm context fails to be propagated * remove not needed comment * fixes * lambda functions not allowed by lint * use lambda function, add checks before checkpoint * remove unneccesary import * move if statement with least work first * changed function name to original, arn exception handle w test, return None instead of {} * some fixes * remove comments that are not needed * fix * fix * extra check * remove unneccesary work associated with event_source * fix lint * add tests for empty arn logic * more descriptive name * fix lint * fix * moved arn check to checkpoint, remove comments, add variable dec * kinesis fix, tests fix * remove not needed test * formatting fix * made tests clearer and more comprehensive * fix * made comments more obvious on point of tests * update toml file * edit build make more sense * made sure tests match with table * fix * removed not needed comments * addc comment * add some more kinesis tests * add logger check in exception test cases * fix comment * fix * updated tests to use inner checkpoint check, keep build at <4 * fix * change to 3.10.2
1 parent ae7df53 commit d2a195e

File tree

7 files changed

+976
-221
lines changed

7 files changed

+976
-221
lines changed

datadog_lambda/dsm.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

datadog_lambda/tracing.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,24 @@
6767
LOWER_64_BITS = "LOWER_64_BITS"
6868

6969

70+
def _dsm_set_checkpoint(context_json, event_type, arn):
71+
if not config.data_streams_enabled:
72+
return
73+
74+
if not arn:
75+
return
76+
77+
try:
78+
from ddtrace.data_streams import set_consume_checkpoint
79+
80+
carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
81+
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
82+
except Exception as e:
83+
logger.debug(
84+
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
85+
)
86+
87+
7088
def _convert_xray_trace_id(xray_trace_id):
7189
"""
7290
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -202,7 +220,9 @@ def create_sns_event(message):
202220
}
203221

204222

205-
def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
223+
def extract_context_from_sqs_or_sns_event_or_context(
224+
event, lambda_context, event_source
225+
):
206226
"""
207227
Extract Datadog trace context from an SQS event.
208228
@@ -214,7 +234,10 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
214234
Lambda Context.
215235
216236
Falls back to lambda context if no trace data is found in the SQS message attributes.
237+
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
217238
"""
239+
source_arn = ""
240+
event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns"
218241

219242
# EventBridge => SQS
220243
try:
@@ -226,6 +249,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
226249

227250
try:
228251
first_record = event.get("Records")[0]
252+
source_arn = first_record.get("eventSourceARN", "")
229253

230254
# logic to deal with SNS => SQS event
231255
if "body" in first_record:
@@ -241,6 +265,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
241265
msg_attributes = first_record.get("messageAttributes")
242266
if msg_attributes is None:
243267
sns_record = first_record.get("Sns") or {}
268+
# SNS->SQS event would extract SNS arn without this check
269+
if event_source.equals(EventTypes.SNS):
270+
source_arn = sns_record.get("TopicArn", "")
244271
msg_attributes = sns_record.get("MessageAttributes") or {}
245272
dd_payload = msg_attributes.get("_datadog")
246273
if dd_payload:
@@ -272,8 +299,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
272299
logger.debug(
273300
"Failed to extract Step Functions context from SQS/SNS event."
274301
)
275-
276-
return propagator.extract(dd_data)
302+
context = propagator.extract(dd_data)
303+
_dsm_set_checkpoint(dd_data, event_type, source_arn)
304+
return context
277305
else:
278306
# Handle case where trace context is injected into attributes.AWSTraceHeader
279307
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -296,9 +324,13 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
296324
span_id=int(x_ray_context["parent_id"], 16),
297325
sampling_priority=float(x_ray_context["sampled"]),
298326
)
327+
# Still want to set a DSM checkpoint even if DSM context not propagated
328+
_dsm_set_checkpoint(None, event_type, source_arn)
299329
return extract_context_from_lambda_context(lambda_context)
300330
except Exception as e:
301331
logger.debug("The trace extractor returned with error %s", e)
332+
# Still want to set a DSM checkpoint even if DSM context not propagated
333+
_dsm_set_checkpoint(None, event_type, source_arn)
302334
return extract_context_from_lambda_context(lambda_context)
303335

304336

@@ -357,9 +389,12 @@ def extract_context_from_eventbridge_event(event, lambda_context):
357389
def extract_context_from_kinesis_event(event, lambda_context):
358390
"""
359391
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
392+
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
360393
"""
394+
source_arn = ""
361395
try:
362396
record = get_first_record(event)
397+
source_arn = record.get("eventSourceARN", "")
363398
kinesis = record.get("kinesis")
364399
if not kinesis:
365400
return extract_context_from_lambda_context(lambda_context)
@@ -373,10 +408,13 @@ def extract_context_from_kinesis_event(event, lambda_context):
373408
data_obj = json.loads(data_str)
374409
dd_ctx = data_obj.get("_datadog")
375410
if dd_ctx:
376-
return propagator.extract(dd_ctx)
411+
context = propagator.extract(dd_ctx)
412+
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
413+
return context
377414
except Exception as e:
378415
logger.debug("The trace extractor returned with error %s", e)
379-
416+
# Still want to set a DSM checkpoint even if DSM context not propagated
417+
_dsm_set_checkpoint(None, "kinesis", source_arn)
380418
return extract_context_from_lambda_context(lambda_context)
381419

382420

@@ -594,7 +632,7 @@ def extract_dd_trace_context(
594632
)
595633
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
596634
context = extract_context_from_sqs_or_sns_event_or_context(
597-
event, lambda_context
635+
event, lambda_context, event_source
598636
)
599637
elif event_source.equals(EventTypes.EVENTBRIDGE):
600638
context = extract_context_from_eventbridge_event(event, lambda_context)

datadog_lambda/wrapper.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from time import time_ns
1111

1212
from datadog_lambda.asm import asm_set_context, asm_start_response, asm_start_request
13-
from datadog_lambda.dsm import set_dsm_context
1413
from datadog_lambda.extension import should_use_extension, flush_extension
1514
from datadog_lambda.cold_start import (
1615
set_cold_start,
@@ -237,8 +236,6 @@ def _before(self, event, context):
237236
self.inferred_span = create_inferred_span(
238237
event, context, event_source, config.decode_authorizer_context
239238
)
240-
if config.data_streams_enabled:
241-
set_dsm_context(event, event_source)
242239

243240
if config.appsec_enabled:
244241
asm_set_context(event_source)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ classifiers = [
2828
python = ">=3.8.0,<4"
2929
datadog = ">=0.51.0,<1.0.0"
3030
wrapt = "^1.11.2"
31-
ddtrace = ">=2.20.0,<4"
31+
ddtrace = ">=3.10.2,<4"
3232
ujson = ">=5.9.0"
3333
botocore = { version = "^1.34.0", optional = true }
3434
requests = { version ="^2.22.0", optional = true }

tests/test_dsm.py

Lines changed: 0 additions & 112 deletions
This file was deleted.

0 commit comments

Comments
 (0)