Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# datadog
from datadog import api
from datadog.dogstatsd import DogStatsd, statsd # noqa
from datadog.threadstats import ThreadStats # noqa
from datadog.threadstats import ThreadStats, datadog_lambda_wrapper, lambda_metric # noqa
from datadog.util.compat import iteritems, NullHandler
from datadog.util.config import get_version
from datadog.util.hostname import get_hostname
Expand Down
1 change: 1 addition & 0 deletions datadog/threadstats/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from datadog.threadstats.base import ThreadStats # noqa
from datadog.threadstats.aws_lambda import lambda_metric, datadog_lambda_wrapper # noqa
72 changes: 72 additions & 0 deletions datadog/threadstats/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datadog.threadstats import ThreadStats
from threading import Lock
from datadog import api
import os


"""
Usage:

from datadog import datadog_lambda_wrapper, lambda_metric

@datadog_lambda_wrapper
def my_lambda_handle(event, context):
lambda_metric("some_metric", 10)
"""


class _LambdaDecorator(object):
""" Decorator to automatically init & flush metrics, created for Lambda functions"""

# Number of opened wrappers, flush when 0
_counter = 0
_counter_lock = Lock()
_flush_lock = Lock()
_was_initialized = False

def __init__(self, func):
self.func = func

@classmethod
def _enter(cls):
with cls._counter_lock:
if not cls._was_initialized:
cls._was_initialized = True
api._api_key = os.environ.get('DATADOG_API_KEY')
api._api_host = os.environ.get('DATADOG_HOST', 'https://api.datadoghq.com')
cls._counter = cls._counter + 1

@classmethod
def _close(cls):
should_flush = False
with cls._counter_lock:
cls._counter = cls._counter - 1

# Flush only when all wrappers are closed
if cls._counter <= 0:
should_flush = True

if should_flush:
with cls._flush_lock:
# Don't flush if other wrappers were opened while _flush_lock was locked
with cls._counter_lock:
if cls._counter > 0:
should_flush = False
if should_flush:
_lambda_stats.flush(float("inf"))

def __call__(self, *args, **kw):
_LambdaDecorator._enter()
result = self.func(*args, **kw)
_LambdaDecorator._close()
return result


_lambda_stats = ThreadStats()
_lambda_stats.start(flush_in_greenlet=False, flush_in_thread=False)
datadog_lambda_wrapper = _LambdaDecorator


def lambda_metric(*args, **kw):
""" Alias to expose only distributions for lambda functions"""
_lambda_stats.distribution(*args, **kw)
47 changes: 47 additions & 0 deletions tests/performance/test_lambda_wrapper_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
# import unittest
import threading

from datadog import lambda_metric, datadog_lambda_wrapper
from datadog.threadstats.aws_lambda import _lambda_stats


TOTAL_NUMBER_OF_THREADS = 1000


class MemoryReporter(object):
""" A reporting class that reports to memory for testing. """

def __init__(self):
self.distributions = []
self.dist_flush_counter = 0

def flush_distributions(self, dists):
self.distributions += dists
self.dist_flush_counter = self.dist_flush_counter + 1


@datadog_lambda_wrapper
def wrapped_function(id):
lambda_metric("dist_" + str(id), 42)
# sleep makes the os continue another thread
time.sleep(0.001)

lambda_metric("common_dist", 42)


class TestWrapperThreadSafety(object):

def test_wrapper_thread_safety(self):
_lambda_stats.reporter = MemoryReporter()

for i in range(TOTAL_NUMBER_OF_THREADS):
threading.Thread(target=wrapped_function, args=[i]).start()
# Wait all threads to finish
time.sleep(10)

# Check that at least one flush happened
self.assertGreater(_lambda_stats.reporter.dist_flush_counter, 0)

dists = _lambda_stats.reporter.distributions
self.assertEqual(len(dists), TOTAL_NUMBER_OF_THREADS + 1)
43 changes: 41 additions & 2 deletions tests/unit/threadstats/test_threadstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import nose.tools as nt

# datadog
from datadog import ThreadStats
from datadog import ThreadStats, lambda_metric, datadog_lambda_wrapper
from datadog.threadstats.aws_lambda import _lambda_stats
from tests.util.contextmanagers import preserve_environment_variable


# Silence the logger.
logger = logging.getLogger('dd.datadogpy')
logger.setLevel(logging.ERROR)
Expand All @@ -32,9 +32,11 @@ def __init__(self):
self.distributions = []
self.metrics = []
self.events = []
self.dist_flush_counter = 0

def flush_distributions(self, distributions):
self.distributions += distributions
self.dist_flush_counter = self.dist_flush_counter + 1

def flush_metrics(self, metrics):
self.metrics += metrics
Expand Down Expand Up @@ -740,3 +742,40 @@ def test_metric_type(self):
nt.assert_equal(cnt['type'], 'rate')
nt.assert_equal(max_['type'], 'gauge')
nt.assert_equal(min_['type'], 'gauge')


# Test lambda_wrapper (uses ThreadStats under the hood)

def test_basic_lambda_decorator(self):

@datadog_lambda_wrapper
def basic_wrapped_function():
lambda_metric("lambda.somemetric", 100)

_lambda_stats.reporter = self.reporter
basic_wrapped_function()

nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1)
dists = self.sort_metrics(_lambda_stats.reporter.distributions)
nt.assert_equal(len(dists), 1)

def test_embedded_lambda_decorator(self):
"""
Test that the lambda decorator flushes metrics correctly and only once
"""

@datadog_lambda_wrapper
def wrapped_function_1():
lambda_metric("lambda.dist.1", 10)

@datadog_lambda_wrapper
def wrapped_function_2():
wrapped_function_1()
lambda_metric("lambda.dist.2", 30)

_lambda_stats.reporter = self.reporter
wrapped_function_2()
nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1)

dists = self.sort_metrics(_lambda_stats.reporter.distributions)
nt.assert_equal(len(dists), 2)