diff --git a/datadog/__init__.py b/datadog/__init__.py index 110ddfa5b..5a061a89c 100644 --- a/datadog/__init__.py +++ b/datadog/__init__.py @@ -15,7 +15,7 @@ # datadog from datadog import api from datadog.dogstatsd import DogStatsd, statsd # noqa -from datadog.threadstats import ThreadStats # noqa +from datadog.threadstats import ThreadStats, datadog_lambda_wrapper, lambda_metric # noqa from datadog.util.compat import iteritems, NullHandler from datadog.util.config import get_version from datadog.util.hostname import get_hostname diff --git a/datadog/threadstats/__init__.py b/datadog/threadstats/__init__.py index eec391a83..6b6d3e4a1 100644 --- a/datadog/threadstats/__init__.py +++ b/datadog/threadstats/__init__.py @@ -1 +1,2 @@ from datadog.threadstats.base import ThreadStats # noqa +from datadog.threadstats.aws_lambda import lambda_metric, datadog_lambda_wrapper # noqa diff --git a/datadog/threadstats/aws_lambda.py b/datadog/threadstats/aws_lambda.py new file mode 100644 index 000000000..6e8c3e057 --- /dev/null +++ b/datadog/threadstats/aws_lambda.py @@ -0,0 +1,72 @@ +from datadog.threadstats import ThreadStats +from threading import Lock +from datadog import api +import os + + +""" +Usage: + +from datadog import datadog_lambda_wrapper, lambda_metric + +@datadog_lambda_wrapper +def my_lambda_handle(event, context): + lambda_metric("some_metric", 10) +""" + + +class _LambdaDecorator(object): + """ Decorator to automatically init & flush metrics, created for Lambda functions""" + + # Number of opened wrappers, flush when 0 + _counter = 0 + _counter_lock = Lock() + _flush_lock = Lock() + _was_initialized = False + + def __init__(self, func): + self.func = func + + @classmethod + def _enter(cls): + with cls._counter_lock: + if not cls._was_initialized: + cls._was_initialized = True + api._api_key = os.environ.get('DATADOG_API_KEY') + api._api_host = os.environ.get('DATADOG_HOST', 'https://api.datadoghq.com') + cls._counter = cls._counter + 1 + + @classmethod + def _close(cls): + should_flush = False + with cls._counter_lock: + cls._counter = cls._counter - 1 + + # Flush only when all wrappers are closed + if cls._counter <= 0: + should_flush = True + + if should_flush: + with cls._flush_lock: + # Don't flush if other wrappers were opened while _flush_lock was locked + with cls._counter_lock: + if cls._counter > 0: + should_flush = False + if should_flush: + _lambda_stats.flush(float("inf")) + + def __call__(self, *args, **kw): + _LambdaDecorator._enter() + result = self.func(*args, **kw) + _LambdaDecorator._close() + return result + + +_lambda_stats = ThreadStats() +_lambda_stats.start(flush_in_greenlet=False, flush_in_thread=False) +datadog_lambda_wrapper = _LambdaDecorator + + +def lambda_metric(*args, **kw): + """ Alias to expose only distributions for lambda functions""" + _lambda_stats.distribution(*args, **kw) diff --git a/tests/performance/test_lambda_wrapper_thread_safety.py b/tests/performance/test_lambda_wrapper_thread_safety.py new file mode 100644 index 000000000..bb73f754e --- /dev/null +++ b/tests/performance/test_lambda_wrapper_thread_safety.py @@ -0,0 +1,47 @@ +import time +# import unittest +import threading + +from datadog import lambda_metric, datadog_lambda_wrapper +from datadog.threadstats.aws_lambda import _lambda_stats + + +TOTAL_NUMBER_OF_THREADS = 1000 + + +class MemoryReporter(object): + """ A reporting class that reports to memory for testing. """ + + def __init__(self): + self.distributions = [] + self.dist_flush_counter = 0 + + def flush_distributions(self, dists): + self.distributions += dists + self.dist_flush_counter = self.dist_flush_counter + 1 + + +@datadog_lambda_wrapper +def wrapped_function(id): + lambda_metric("dist_" + str(id), 42) + # sleep makes the os continue another thread + time.sleep(0.001) + + lambda_metric("common_dist", 42) + + +class TestWrapperThreadSafety(object): + + def test_wrapper_thread_safety(self): + _lambda_stats.reporter = MemoryReporter() + + for i in range(TOTAL_NUMBER_OF_THREADS): + threading.Thread(target=wrapped_function, args=[i]).start() + # Wait all threads to finish + time.sleep(10) + + # Check that at least one flush happened + self.assertGreater(_lambda_stats.reporter.dist_flush_counter, 0) + + dists = _lambda_stats.reporter.distributions + self.assertEqual(len(dists), TOTAL_NUMBER_OF_THREADS + 1) diff --git a/tests/unit/threadstats/test_threadstats.py b/tests/unit/threadstats/test_threadstats.py index 5e14d99fc..e4c8e6059 100644 --- a/tests/unit/threadstats/test_threadstats.py +++ b/tests/unit/threadstats/test_threadstats.py @@ -14,10 +14,10 @@ import nose.tools as nt # datadog -from datadog import ThreadStats +from datadog import ThreadStats, lambda_metric, datadog_lambda_wrapper +from datadog.threadstats.aws_lambda import _lambda_stats from tests.util.contextmanagers import preserve_environment_variable - # Silence the logger. logger = logging.getLogger('dd.datadogpy') logger.setLevel(logging.ERROR) @@ -32,9 +32,11 @@ def __init__(self): self.distributions = [] self.metrics = [] self.events = [] + self.dist_flush_counter = 0 def flush_distributions(self, distributions): self.distributions += distributions + self.dist_flush_counter = self.dist_flush_counter + 1 def flush_metrics(self, metrics): self.metrics += metrics @@ -740,3 +742,40 @@ def test_metric_type(self): nt.assert_equal(cnt['type'], 'rate') nt.assert_equal(max_['type'], 'gauge') nt.assert_equal(min_['type'], 'gauge') + + + # Test lambda_wrapper (uses ThreadStats under the hood) + + def test_basic_lambda_decorator(self): + + @datadog_lambda_wrapper + def basic_wrapped_function(): + lambda_metric("lambda.somemetric", 100) + + _lambda_stats.reporter = self.reporter + basic_wrapped_function() + + nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1) + dists = self.sort_metrics(_lambda_stats.reporter.distributions) + nt.assert_equal(len(dists), 1) + + def test_embedded_lambda_decorator(self): + """ + Test that the lambda decorator flushes metrics correctly and only once + """ + + @datadog_lambda_wrapper + def wrapped_function_1(): + lambda_metric("lambda.dist.1", 10) + + @datadog_lambda_wrapper + def wrapped_function_2(): + wrapped_function_1() + lambda_metric("lambda.dist.2", 30) + + _lambda_stats.reporter = self.reporter + wrapped_function_2() + nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1) + + dists = self.sort_metrics(_lambda_stats.reporter.distributions) + nt.assert_equal(len(dists), 2)