diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index da8304e5..9a72e839 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -32,12 +32,16 @@ dev = [ "mypy>=1.17.1", "time-machine>=2.16.0", "types-protobuf>=5.27.0.20240626,<6.0.0", + "datadog>=0.49", ] [project.optional-dependencies] examples = [ "click>=8.3", "setuptools>=80.0", ] +datadog = [ + "datadog>=0.49", +] [build-system] requires = ["uv_build>=0.8.2"] diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index 6324029a..2a76f99c 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -1,9 +1,15 @@ from __future__ import annotations +import atexit +import resource +import time from abc import abstractmethod from collections.abc import Mapping from contextlib import contextmanager -from typing import Generator, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Generator, Protocol, runtime_checkable + +if TYPE_CHECKING: + from datadog.dogstatsd.base import DogStatsd Tags = Mapping[str, str | int | float] @@ -21,7 +27,7 @@ def gauge( value: float, instance: str | None = None, tags: Tags | None = None, - sample_rate: float = 1, + sample_rate: float | None = None, unit: str | None = None, stacklevel: int = 0, ) -> None: @@ -93,7 +99,7 @@ def gauge( value: float, instance: str | None = None, tags: Tags | None = None, - sample_rate: float = 1, + sample_rate: float | None = None, unit: str | None = None, stacklevel: int = 0, ) -> None: @@ -139,3 +145,162 @@ def track_memory_usage( of rss_usage between the context manager opening and closing. """ yield None + + +class DatadogMetrics(MetricsBackend): + """ + An opinionated metrics backend that emits to Datadog via DogStatsD. + + All metrics are tagged with ``application`` and ``processing_pool`` so that + dashboards and alerts can be built once and shared across every application + using taskbroker-client, without depending on a host-application metrics + prefix. + + When ``enable_prefixed_metrics`` is enabled each metric is emitted twice: once + prefix-free with ``application`` as a tag, and once with ``application`` + as a metric prefix (and not included in tags). This eases migrating existing + alerts and dashboards from the prefixed form to the prefix-free form. + + The ``datadog`` package is an optional dependency. Install it with + ``pip install taskbroker-client[datadog]``. + """ + + def __init__( + self, + application: str, + processing_pool: str | None = None, + statsd_host: str | None = None, + statsd_port: str | int | None = None, + sample_rate: float = 1.0, + enable_prefixed_metrics: bool = False, + client: DogStatsd | None = None, + ) -> None: + self.application = application + self.processing_pool = processing_pool or "unknown" + self.sample_rate = sample_rate + self.enable_prefixed_metrics = enable_prefixed_metrics + if client is None: + from datadog.dogstatsd.base import DogStatsd + + client = DogStatsd( + host=statsd_host or "localhost", + port=int(statsd_port) if statsd_port is not None else 8125, + disable_telemetry=True, + # Use a background thread to send metrics + disable_background_sender=False, + # Allow buffering and background delivery + disable_buffering=False, + ) + # Origin detection is enabled after 0.45 by default. + # Disable it since it silently fails. + # Ref: https://github.com/DataDog/datadogpy/issues/764 + client._container_id = None + + # Call wait_for_pending() before exiting to make sure all pending metrics are sent. + atexit.register(client.wait_for_pending) + + self.client = client + + def _build_tag_list(self, tags: Tags | None, *, with_application: bool) -> list[str]: + merged: dict[str, str | int | float] = {"processing_pool": self.processing_pool} + if with_application: + merged["application"] = self.application + if tags: + # Per-call tags win so call sites can override the structural defaults. + merged.update(tags) + return [f"{key}:{value}" for key, value in merged.items()] + + def _emit( + self, + method: str, + name: str, + value: float, + tags: Tags | None, + sample_rate: float | None, + ) -> None: + rate = self.sample_rate if sample_rate is None else sample_rate + emit = getattr(self.client, method) + + # Prefix-free form: application is carried as a tag. + emit( + name, + value, + tags=self._build_tag_list(tags, with_application=True), + sample_rate=rate, + ) + + # Prefixed form: application is in the metric name and removed from the tags. + if self.enable_prefixed_metrics: + emit( + f"{self.application}.{name}", + value, + tags=self._build_tag_list(tags, with_application=False), + sample_rate=rate, + ) + + def gauge( + self, + key: str, + value: float, + instance: str | None = None, + tags: Tags | None = None, + sample_rate: float | None = None, + unit: str | None = None, + stacklevel: int = 0, + ) -> None: + # instance, unit and stacklevel have no DogStatsD equivalent and are ignored. + self._emit("gauge", key, value, tags, sample_rate) + + def incr( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + sample_rate: float | None = None, + ) -> None: + self._emit("increment", name, value, tags, sample_rate) + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + sample_rate: float | None = None, + ) -> None: + # unit has no DogStatsD equivalent and is ignored. + self._emit("distribution", name, value, tags, sample_rate) + + @contextmanager + def timer( + self, + key: str, + tags: Tags | None = None, + sample_rate: float | None = None, + stacklevel: int = 0, + ) -> Generator[None]: + start = time.monotonic() + try: + yield None + finally: + self._emit("timing", key, time.monotonic() - start, tags, sample_rate) + + @contextmanager + def track_memory_usage( + self, + key: str, + tags: Tags | None = None, + ) -> Generator[None]: + """ + Records a distribution metric that tracks the delta + of rss usage between the context manager opening and closing. + """ + start = _rss_bytes() + try: + yield None + finally: + self._emit("distribution", key, _rss_bytes() - start, tags, None) + + +def _rss_bytes() -> int: + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss diff --git a/clients/python/tests/test_metrics.py b/clients/python/tests/test_metrics.py new file mode 100644 index 00000000..8de6dba0 --- /dev/null +++ b/clients/python/tests/test_metrics.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +from unittest.mock import Mock + +import pytest + +from taskbroker_client.metrics import DatadogMetrics + + +def make_metrics( + *, + enable_prefixed_metrics: bool = False, + processing_pool: str | None = "ingest-errors", + sample_rate: float = 1.0, + client: Mock | None = None, +) -> tuple[DatadogMetrics, Mock]: + mock_client = client or Mock() + metrics = DatadogMetrics( + application="sentry", + processing_pool=processing_pool, + sample_rate=sample_rate, + enable_prefixed_metrics=enable_prefixed_metrics, + client=mock_client, + ) + return metrics, mock_client + + +def test_incr_prefix_off() -> None: + metrics, client = make_metrics(sample_rate=0.5) + metrics.incr("taskworker.x", tags={"namespace": "n"}) + + client.increment.assert_called_once() + args, kwargs = client.increment.call_args + assert args[0] == "taskworker.x" + assert args[1] == 1 + assert set(kwargs["tags"]) == { + "application:sentry", + "processing_pool:ingest-errors", + "namespace:n", + } + assert kwargs["sample_rate"] == 0.5 + + +def test_incr_prefix_on() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + metrics.incr("taskworker.x") + + assert client.increment.call_count == 2 + + first_args, first_kwargs = client.increment.call_args_list[0] + assert first_args[0] == "taskworker.x" + assert "application:sentry" in first_kwargs["tags"] + + second_args, second_kwargs = client.increment.call_args_list[1] + assert second_args[0] == "sentry.taskworker.x" + assert not any(tag.startswith("application:") for tag in second_kwargs["tags"]) + assert "processing_pool:ingest-errors" in second_kwargs["tags"] + + +def test_gauge_ignores_unsupported_params() -> None: + metrics, client = make_metrics() + metrics.gauge("taskworker.size", 12.0, instance="i", unit="bytes", stacklevel=3) + + client.gauge.assert_called_once() + args, kwargs = client.gauge.call_args + assert args[0] == "taskworker.size" + assert args[1] == 12.0 + assert set(kwargs) == {"tags", "sample_rate"} + + +def test_distribution_ignores_unit() -> None: + metrics, client = make_metrics() + metrics.distribution("taskworker.duration", 0.25, unit="seconds") + + client.distribution.assert_called_once() + args, kwargs = client.distribution.call_args + assert args[0] == "taskworker.duration" + assert args[1] == 0.25 + assert "unit" not in kwargs + + +def test_tag_precedence() -> None: + metrics, client = make_metrics() + metrics.incr("taskworker.x", tags={"processing_pool": "override", "namespace": "n"}) + + _, kwargs = client.increment.call_args + assert "processing_pool:override" in kwargs["tags"] + assert "processing_pool:ingest-errors" not in kwargs["tags"] + + +def test_none_tags_still_emit_structural_tags() -> None: + metrics, client = make_metrics() + metrics.incr("taskworker.x") + + _, kwargs = client.increment.call_args + assert set(kwargs["tags"]) == {"application:sentry", "processing_pool:ingest-errors"} + + +def test_sample_rate_defaulting() -> None: + metrics, client = make_metrics(sample_rate=0.1) + + metrics.incr("taskworker.x") + assert client.increment.call_args.kwargs["sample_rate"] == 0.1 + + metrics.incr("taskworker.x", sample_rate=0.5) + assert client.increment.call_args.kwargs["sample_rate"] == 0.5 + + +def test_timer_happy_path() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + with metrics.timer("taskworker.duration", tags={"host": "h"}): + pass + + assert client.timing.call_count == 2 + args, kwargs = client.timing.call_args_list[0] + assert args[0] == "taskworker.duration" + assert isinstance(args[1], float) + assert "host:h" in kwargs["tags"] + + +def test_timer_emits_on_exception() -> None: + metrics, client = make_metrics() + with pytest.raises(ValueError): + with metrics.timer("taskworker.dur"): + raise ValueError("boom") + + client.timing.assert_called_once() + assert isinstance(client.timing.call_args.args[1], float) + + +def test_track_memory_usage() -> None: + metrics, client = make_metrics() + with metrics.track_memory_usage("taskworker.mem"): + var = "a" * 1000000 + var += "b" + + client.distribution.assert_called_once() + args, _ = client.distribution.call_args + assert args[0] == "taskworker.mem" + assert isinstance(args[1], int) + assert args[1] > 0 + + +def test_track_memory_usage_prefixed() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + with metrics.track_memory_usage("taskworker.mem"): + pass + + assert client.distribution.call_count == 2 + assert client.distribution.call_args_list[1].args[0] == "sentry.taskworker.mem" diff --git a/uv.lock b/uv.lock index 59c43911..76d2b5f2 100644 --- a/uv.lock +++ b/uv.lock @@ -80,6 +80,25 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/cfgv-3.4.0-py2.py3-none-any.whl", hash = "sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9" }, ] +[[package]] +name = "charset-normalizer" +version = "3.4.7" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:7641bb8895e77f921102f72833904dcd9901df5d6d72a2ab8f31d04b7e51e4e7" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:202389074300232baeb53ae2569a60901f7efadd4245cf3a3bf0617d60b439d7" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2fe249cb4651fd12605b7288b24751d8bfd46d35f12a20b1ba33dea122e690df" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:eca9705049ad3c7345d574e3510665cb2cf844c2f2dcfe675332677f081cbd46" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6178f72c5508bfc5fd446a5905e698c6212932f25bcdd4b47a757a50605a90e2" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5649fd1c7bade02f320a462fdefd0b4bd3ce036065836d4f42e0de958038e116" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f496c9c3cc02230093d8330875c4c3cdfc3b73612a5fd921c65d39cbcef08063" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ea948db76d31190bf08bd371623927ee1339d5f2a0b4b1b4a4439a65298703c" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e044c39e41b92c845bc815e5ae4230804e8e7bc29e399b0437d64222d92809dd" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:c36c333c39be2dbca264d7803333c896ab8fa7d4d6f0ab7edb7dfd7aea6e98c0" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1c2aed2e5e41f24ea8ef1590b8e848a79b56f3a5564a65ceec43c9d692dc7d8a" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bd6c2a1c7573c64738d716488d2cdd3c00e340e4835707d8fdb8dc1a66ef164e" }, +] + [[package]] name = "click" version = "8.3.0" @@ -142,6 +161,17 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/cronsim-2.6-py3-none-any.whl", hash = "sha256:a3a823ea834c29100a17ab1d4af6179c3149612d28d8e0dec8044057570246be" }, ] +[[package]] +name = "datadog" +version = "0.52.1" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/datadog-0.52.1-py2.py3-none-any.whl", hash = "sha256:b8c92cd761618ee062f114171067e4c400d48c9f0dad16cb285042439d9d5d4e" }, +] + [[package]] name = "devservices" version = "1.2.1" @@ -292,6 +322,14 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/identify-2.6.9-py2.py3-none-any.whl", hash = "sha256:c98b4322da415a8e5a70ff6e51fbc2d2932c015532d77e9f8537b4ba7813b150" }, ] +[[package]] +name = "idna" +version = "3.17" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/idna-3.17-py3-none-any.whl", hash = "sha256:466e48829084efe2548012b855df21540b96f2e20e51bd124c851536556a592c" }, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -586,6 +624,20 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/redis_py_cluster-2.1.3-py2.py3-none-any.whl", hash = "sha256:38f08850fde469ffd76bced7309721114acc487e52b76f374a0502c34c69b4ec" }, ] +[[package]] +name = "requests" +version = "2.33.1" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "charset-normalizer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "idna", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "urllib3", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a" }, +] + [[package]] name = "sentry-arroyo" version = "2.38.7" @@ -734,6 +786,9 @@ dependencies = [ ] [package.optional-dependencies] +datadog = [ + { name = "datadog", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] examples = [ { name = "click", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "setuptools", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -742,6 +797,7 @@ examples = [ [package.dev-dependencies] dev = [ { name = "black", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "datadog", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "devservices", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "flake8", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "isort", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -759,6 +815,7 @@ requires-dist = [ { name = "click", marker = "extra == 'examples'", specifier = ">=8.3" }, { name = "confluent-kafka", specifier = ">=2.3.0" }, { name = "cronsim", specifier = ">=2.6" }, + { name = "datadog", marker = "extra == 'datadog'", specifier = ">=0.49" }, { name = "grpcio", specifier = ">=1.67.1" }, { name = "grpcio-health-checking", specifier = ">=1.67.1" }, { name = "msgpack", specifier = ">=1.0.0" }, @@ -772,11 +829,12 @@ requires-dist = [ { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" }, ] -provides-extras = ["examples"] +provides-extras = ["examples", "datadog"] [package.metadata.requires-dev] dev = [ { name = "black", specifier = "==26.3.1" }, + { name = "datadog", specifier = ">=0.49" }, { name = "devservices", specifier = ">=1.2.1" }, { name = "flake8", specifier = ">=7.3.0" }, { name = "isort", specifier = ">=5.13.2" },