Skip to content

feat: lambda support for DSM #622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open

Conversation

michael-zhao459
Copy link
Collaborator

@michael-zhao459 michael-zhao459 commented Jun 20, 2025

What does this PR do?

This PR adds lambda support for Data Streams Monitoring (DSM) and reworks the original implementation.

  1. DSM context is passed through the trace propagation headers, code is refactored to use existing extraction logic (deleted dsm.py, reinventing the wheel here).

  2. If DSM is enabled, add custom DSM logic to the extracted context afterwards

Motivation

Remove redundant code. DSM customers wanted to have Lambda support, currently context is not propagated correctly with lambdas.

Testing Guidelines

Test case Expected outcome SQS SNS SNS -> SQS (arn from SQS is used) Kinesis
Datadog Context propagated properly through stringValue Data streams context propagated & data streams checkpoint set DNE
Datadog Context propagated properly through binaryValue Data streams context propagated & data streams checkpoint set
No _datadog message attribute Checkpoint set, no context propagation
Empty datadog message attribute Checkpoint set, no context propagation
No data streams context in _datadog message attribute Checkpoint set, no context propagation
Invalid datadog message attribute Checkpoint set, no context propagation, debug logger called
source_arn is not found No checkpoint set
Data streams disabled No checkpoint set

The tests go through all of the SQS case first, then all of the SNS case, then all of the SNS->SQS case, then all of the Kinesis case

Additional Notes

Types of Changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)

if config.data_streams_enabled:
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64

data_streams_ctx = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the else is redundant but datadog gets mad if i just do the if too many indents

except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
return extract_context_from_lambda_context(lambda_context)
return extract_context_from_lambda_context(lambda_context), None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not return None here

@michael-zhao459 michael-zhao459 marked this pull request as ready for review June 25, 2025 13:25
@michael-zhao459 michael-zhao459 requested review from a team as code owners June 25, 2025 13:25
@@ -265,15 +265,27 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
if dd_json_data:
dd_data = json.loads(dd_json_data)

data_streams_ctx = {}
if config.data_streams_enabled:
from ddtrace.data_streams import PROPAGATION_KEY_BASE_64
Copy link
Contributor

@joeyzhao2018 joeyzhao2018 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concerns are

  1. Creating dictionary objects and bound methods for every invocation is inefficient.
  2. It is very hard to follow the logic and hard to maintain and may introduce unexpected behaviors that are hard to debug in the future.

May I suggest the following alternative implementation? Let me know what do you think.

def _create_dsm_carrier_func(dd_data):
      """Create a carrier function for DSM context extraction."""
      def carrier_get(key):
          return dd_data.get(key) if dd_data else None
      return carrier_get

# then in In the extraction functions:
if config.data_streams_enabled:
    dsm_carrier = _create_dsm_carrier_func(dd_data)  # Pass the original dd_data
else:
    dsm_carrier = None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the justifications you made for this change will change the code now!

@joeyzhao2018 joeyzhao2018 self-requested a review June 26, 2025 14:02
Copy link
Contributor

@joeyzhao2018 joeyzhao2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@joeyzhao2018 joeyzhao2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-lambda branch from 66fdbb3 to c4fa49b Compare July 3, 2025 19:43
pyproject.toml Outdated
@@ -28,7 +28,7 @@ classifiers = [
python = ">=3.8.0,<4"
datadog = ">=0.51.0,<1.0.0"
wrapt = "^1.11.2"
ddtrace = ">=2.20.0,<4"
ddtrace = ">=3.10.0"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes the major version. Is that what we want to do? Also, should we keep <4?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake on the <4. The code will break without ddtrace version 3.10.0.

)

@patch("datadog_lambda.tracing._dsm_set_checkpoint")
def test_sqs_incorrect_datadog_message_attribute(self, mock_dsm_set_checkpoint):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: incorrect -> invalid


@patch("datadog_lambda.tracing._dsm_set_checkpoint")
@patch("datadog_lambda.tracing.logger")
def test_sqs_invalid_datadog_message_attribute_raises_exception(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove raises_exception from the test name. We care about what the test case tests, not the logic under the hood. Maybe in the future, the code won't raise an exception, and it's still OK because the function accepts invalid datadog message attributes.

event, self.lambda_context, parse_event_source(event)
)

mock_dsm_set_checkpoint.assert_called_once_with(None, "sqs", "")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, are we testing mock_dsm_set_checkpoint, or self.mock_checkpoint. We should test only one of them, and it should be consistent across all tests. If possible, it's better to test the lower level one (self.mock_checkpoint)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants