From fe820a117144cacb3dec493f275c449288e09149 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 12 Jun 2026 15:55:36 -0400 Subject: [PATCH 1/4] feat: Add datadog metrics backend We currently require applications to implement their own metrics backend. This has resulted in metrics from the taskworker runtime having inconsistent tags and metric names, which makes building dashboards and alerting for taskworkers tedious. Having a more opinionated metrics backend will allow us to make metric names and tags that are important to observability structurally required. I've also included a prefixed metric shim which will allow us to switch metrics providers without gaps in observability. Refs STREAM-816 --- clients/python/pyproject.toml | 4 + .../python/src/taskbroker_client/metrics.py | 153 +++++++++++++++++- clients/python/tests/test_metrics.py | 150 +++++++++++++++++ uv.lock | 60 ++++++- 4 files changed, 365 insertions(+), 2 deletions(-) create mode 100644 clients/python/tests/test_metrics.py diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index da8304e5..9a72e839 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -32,12 +32,16 @@ dev = [ "mypy>=1.17.1", "time-machine>=2.16.0", "types-protobuf>=5.27.0.20240626,<6.0.0", + "datadog>=0.49", ] [project.optional-dependencies] examples = [ "click>=8.3", "setuptools>=80.0", ] +datadog = [ + "datadog>=0.49", +] [build-system] requires = ["uv_build>=0.8.2"] diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index 6324029a..fa171168 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -1,9 +1,14 @@ from __future__ import annotations +import resource +import time from abc import abstractmethod from collections.abc import Mapping from contextlib import contextmanager -from typing import Generator, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Generator, Protocol, runtime_checkable + +if TYPE_CHECKING: + from datadog.dogstatsd.base import DogStatsd Tags = Mapping[str, str | int | float] @@ -139,3 +144,149 @@ def track_memory_usage( of rss_usage between the context manager opening and closing. """ yield None + + +class DatadogMetrics(MetricsBackend): + """ + An opinionated metrics backend that emits to Datadog via DogStatsD. + + All metrics are tagged with ``application`` and ``processing_pool`` so that + dashboards and alerts can be built once and shared across every application + using taskbroker-client, without depending on a host-application metrics + prefix. + + When ``enable_prefixed_metrics`` is enabled each metric is emitted twice: once + prefix-free with ``application`` as a tag, and once with ``application`` + as a metric prefix (and not included in tags). This eases migrating existing + alerts and dashboards from the prefixed form to the prefix-free form. + + The ``datadog`` package is an optional dependency. Install it with + ``pip install taskbroker-client[datadog]``. + """ + + def __init__( + self, + application: str, + processing_pool: str | None = None, + statsd_host: str | None = None, + statsd_port: str | int | None = None, + sample_rate: float = 1.0, + enable_prefixed_metrics: bool = False, + client: DogStatsd | None = None, + ) -> None: + self.application = application + self.processing_pool = processing_pool or "unknown" + self.sample_rate = sample_rate + self.enable_prefixed_metrics = enable_prefixed_metrics + if client is None: + from datadog.dogstatsd.base import DogStatsd + + client = DogStatsd( + host=statsd_host or "localhost", + port=int(statsd_port) if statsd_port is not None else 8125, + ) + self.client = client + + def _build_tag_list(self, tags: Tags | None, *, with_application: bool) -> list[str]: + merged: dict[str, str | int | float] = {"processing_pool": self.processing_pool} + if with_application: + merged["application"] = self.application + if tags: + # Per-call tags win so call sites can override the structural defaults. + merged.update(tags) + return [f"{key}:{value}" for key, value in merged.items()] + + def _emit( + self, + method: str, + name: str, + value: float, + tags: Tags | None, + sample_rate: float | None, + ) -> None: + rate = self.sample_rate if sample_rate is None else sample_rate + emit = getattr(self.client, method) + + # Prefix-free form: application is carried as a tag. + emit( + name, + value, + tags=self._build_tag_list(tags, with_application=True), + sample_rate=rate, + ) + + # Prefixed form: application is in the metric name and removed from the tags. + if self.enable_prefixed_metrics: + emit( + f"{self.application}.{name}", + value, + tags=self._build_tag_list(tags, with_application=False), + sample_rate=rate, + ) + + def gauge( + self, + key: str, + value: float, + instance: str | None = None, + tags: Tags | None = None, + sample_rate: float = 1, + unit: str | None = None, + stacklevel: int = 0, + ) -> None: + # instance, unit and stacklevel have no DogStatsD equivalent and are ignored. + self._emit("gauge", key, value, tags, sample_rate) + + def incr( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + sample_rate: float | None = None, + ) -> None: + self._emit("increment", name, value, tags, sample_rate) + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + sample_rate: float | None = None, + ) -> None: + # unit has no DogStatsD equivalent and is ignored. + self._emit("distribution", name, value, tags, sample_rate) + + @contextmanager + def timer( + self, + key: str, + tags: Tags | None = None, + sample_rate: float | None = None, + stacklevel: int = 0, + ) -> Generator[None]: + start = time.monotonic() + try: + yield None + finally: + self._emit("timing", key, time.monotonic() - start, tags, sample_rate) + + @contextmanager + def track_memory_usage( + self, + key: str, + tags: Tags | None = None, + ) -> Generator[None]: + """ + Records a distribution metric that tracks the delta + of rss usage between the context manager opening and closing. + """ + start = _rss_bytes() + try: + yield None + finally: + self._emit("distribution", key, _rss_bytes() - start, tags, None) + + +def _rss_bytes() -> int: + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss diff --git a/clients/python/tests/test_metrics.py b/clients/python/tests/test_metrics.py new file mode 100644 index 00000000..8de6dba0 --- /dev/null +++ b/clients/python/tests/test_metrics.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +from unittest.mock import Mock + +import pytest + +from taskbroker_client.metrics import DatadogMetrics + + +def make_metrics( + *, + enable_prefixed_metrics: bool = False, + processing_pool: str | None = "ingest-errors", + sample_rate: float = 1.0, + client: Mock | None = None, +) -> tuple[DatadogMetrics, Mock]: + mock_client = client or Mock() + metrics = DatadogMetrics( + application="sentry", + processing_pool=processing_pool, + sample_rate=sample_rate, + enable_prefixed_metrics=enable_prefixed_metrics, + client=mock_client, + ) + return metrics, mock_client + + +def test_incr_prefix_off() -> None: + metrics, client = make_metrics(sample_rate=0.5) + metrics.incr("taskworker.x", tags={"namespace": "n"}) + + client.increment.assert_called_once() + args, kwargs = client.increment.call_args + assert args[0] == "taskworker.x" + assert args[1] == 1 + assert set(kwargs["tags"]) == { + "application:sentry", + "processing_pool:ingest-errors", + "namespace:n", + } + assert kwargs["sample_rate"] == 0.5 + + +def test_incr_prefix_on() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + metrics.incr("taskworker.x") + + assert client.increment.call_count == 2 + + first_args, first_kwargs = client.increment.call_args_list[0] + assert first_args[0] == "taskworker.x" + assert "application:sentry" in first_kwargs["tags"] + + second_args, second_kwargs = client.increment.call_args_list[1] + assert second_args[0] == "sentry.taskworker.x" + assert not any(tag.startswith("application:") for tag in second_kwargs["tags"]) + assert "processing_pool:ingest-errors" in second_kwargs["tags"] + + +def test_gauge_ignores_unsupported_params() -> None: + metrics, client = make_metrics() + metrics.gauge("taskworker.size", 12.0, instance="i", unit="bytes", stacklevel=3) + + client.gauge.assert_called_once() + args, kwargs = client.gauge.call_args + assert args[0] == "taskworker.size" + assert args[1] == 12.0 + assert set(kwargs) == {"tags", "sample_rate"} + + +def test_distribution_ignores_unit() -> None: + metrics, client = make_metrics() + metrics.distribution("taskworker.duration", 0.25, unit="seconds") + + client.distribution.assert_called_once() + args, kwargs = client.distribution.call_args + assert args[0] == "taskworker.duration" + assert args[1] == 0.25 + assert "unit" not in kwargs + + +def test_tag_precedence() -> None: + metrics, client = make_metrics() + metrics.incr("taskworker.x", tags={"processing_pool": "override", "namespace": "n"}) + + _, kwargs = client.increment.call_args + assert "processing_pool:override" in kwargs["tags"] + assert "processing_pool:ingest-errors" not in kwargs["tags"] + + +def test_none_tags_still_emit_structural_tags() -> None: + metrics, client = make_metrics() + metrics.incr("taskworker.x") + + _, kwargs = client.increment.call_args + assert set(kwargs["tags"]) == {"application:sentry", "processing_pool:ingest-errors"} + + +def test_sample_rate_defaulting() -> None: + metrics, client = make_metrics(sample_rate=0.1) + + metrics.incr("taskworker.x") + assert client.increment.call_args.kwargs["sample_rate"] == 0.1 + + metrics.incr("taskworker.x", sample_rate=0.5) + assert client.increment.call_args.kwargs["sample_rate"] == 0.5 + + +def test_timer_happy_path() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + with metrics.timer("taskworker.duration", tags={"host": "h"}): + pass + + assert client.timing.call_count == 2 + args, kwargs = client.timing.call_args_list[0] + assert args[0] == "taskworker.duration" + assert isinstance(args[1], float) + assert "host:h" in kwargs["tags"] + + +def test_timer_emits_on_exception() -> None: + metrics, client = make_metrics() + with pytest.raises(ValueError): + with metrics.timer("taskworker.dur"): + raise ValueError("boom") + + client.timing.assert_called_once() + assert isinstance(client.timing.call_args.args[1], float) + + +def test_track_memory_usage() -> None: + metrics, client = make_metrics() + with metrics.track_memory_usage("taskworker.mem"): + var = "a" * 1000000 + var += "b" + + client.distribution.assert_called_once() + args, _ = client.distribution.call_args + assert args[0] == "taskworker.mem" + assert isinstance(args[1], int) + assert args[1] > 0 + + +def test_track_memory_usage_prefixed() -> None: + metrics, client = make_metrics(enable_prefixed_metrics=True) + with metrics.track_memory_usage("taskworker.mem"): + pass + + assert client.distribution.call_count == 2 + assert client.distribution.call_args_list[1].args[0] == "sentry.taskworker.mem" diff --git a/uv.lock b/uv.lock index 59c43911..76d2b5f2 100644 --- a/uv.lock +++ b/uv.lock @@ -80,6 +80,25 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/cfgv-3.4.0-py2.py3-none-any.whl", hash = "sha256:b7265b1f29fd3316bfcd2b330d63d024f2bfd8bcb8b0272f8e19a504856c48f9" }, ] +[[package]] +name = "charset-normalizer" +version = "3.4.7" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:7641bb8895e77f921102f72833904dcd9901df5d6d72a2ab8f31d04b7e51e4e7" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:202389074300232baeb53ae2569a60901f7efadd4245cf3a3bf0617d60b439d7" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2fe249cb4651fd12605b7288b24751d8bfd46d35f12a20b1ba33dea122e690df" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:eca9705049ad3c7345d574e3510665cb2cf844c2f2dcfe675332677f081cbd46" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6178f72c5508bfc5fd446a5905e698c6212932f25bcdd4b47a757a50605a90e2" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5649fd1c7bade02f320a462fdefd0b4bd3ce036065836d4f42e0de958038e116" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f496c9c3cc02230093d8330875c4c3cdfc3b73612a5fd921c65d39cbcef08063" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ea948db76d31190bf08bd371623927ee1339d5f2a0b4b1b4a4439a65298703c" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e044c39e41b92c845bc815e5ae4230804e8e7bc29e399b0437d64222d92809dd" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:c36c333c39be2dbca264d7803333c896ab8fa7d4d6f0ab7edb7dfd7aea6e98c0" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1c2aed2e5e41f24ea8ef1590b8e848a79b56f3a5564a65ceec43c9d692dc7d8a" }, + { url = "https://pypi.devinfra.sentry.io/wheels/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bd6c2a1c7573c64738d716488d2cdd3c00e340e4835707d8fdb8dc1a66ef164e" }, +] + [[package]] name = "click" version = "8.3.0" @@ -142,6 +161,17 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/cronsim-2.6-py3-none-any.whl", hash = "sha256:a3a823ea834c29100a17ab1d4af6179c3149612d28d8e0dec8044057570246be" }, ] +[[package]] +name = "datadog" +version = "0.52.1" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/datadog-0.52.1-py2.py3-none-any.whl", hash = "sha256:b8c92cd761618ee062f114171067e4c400d48c9f0dad16cb285042439d9d5d4e" }, +] + [[package]] name = "devservices" version = "1.2.1" @@ -292,6 +322,14 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/identify-2.6.9-py2.py3-none-any.whl", hash = "sha256:c98b4322da415a8e5a70ff6e51fbc2d2932c015532d77e9f8537b4ba7813b150" }, ] +[[package]] +name = "idna" +version = "3.17" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/idna-3.17-py3-none-any.whl", hash = "sha256:466e48829084efe2548012b855df21540b96f2e20e51bd124c851536556a592c" }, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -586,6 +624,20 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/redis_py_cluster-2.1.3-py2.py3-none-any.whl", hash = "sha256:38f08850fde469ffd76bced7309721114acc487e52b76f374a0502c34c69b4ec" }, ] +[[package]] +name = "requests" +version = "2.33.1" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "charset-normalizer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "idna", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "urllib3", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a" }, +] + [[package]] name = "sentry-arroyo" version = "2.38.7" @@ -734,6 +786,9 @@ dependencies = [ ] [package.optional-dependencies] +datadog = [ + { name = "datadog", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] examples = [ { name = "click", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "setuptools", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -742,6 +797,7 @@ examples = [ [package.dev-dependencies] dev = [ { name = "black", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "datadog", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "devservices", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "flake8", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "isort", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -759,6 +815,7 @@ requires-dist = [ { name = "click", marker = "extra == 'examples'", specifier = ">=8.3" }, { name = "confluent-kafka", specifier = ">=2.3.0" }, { name = "cronsim", specifier = ">=2.6" }, + { name = "datadog", marker = "extra == 'datadog'", specifier = ">=0.49" }, { name = "grpcio", specifier = ">=1.67.1" }, { name = "grpcio-health-checking", specifier = ">=1.67.1" }, { name = "msgpack", specifier = ">=1.0.0" }, @@ -772,11 +829,12 @@ requires-dist = [ { name = "setuptools", marker = "extra == 'examples'", specifier = ">=80.0" }, { name = "zstandard", specifier = ">=0.18.0" }, ] -provides-extras = ["examples"] +provides-extras = ["examples", "datadog"] [package.metadata.requires-dev] dev = [ { name = "black", specifier = "==26.3.1" }, + { name = "datadog", specifier = ">=0.49" }, { name = "devservices", specifier = ">=1.2.1" }, { name = "flake8", specifier = ">=7.3.0" }, { name = "isort", specifier = ">=5.13.2" }, From 2078b84bc98b4cc0abf7d060e503103e4dc420d9 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 12 Jun 2026 16:38:52 -0400 Subject: [PATCH 2/4] Port more configuration from sentry in --- clients/python/src/taskbroker_client/metrics.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index fa171168..33dce063 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -1,5 +1,6 @@ from __future__ import annotations +import atexit import resource import time from abc import abstractmethod @@ -184,7 +185,18 @@ def __init__( client = DogStatsd( host=statsd_host or "localhost", port=int(statsd_port) if statsd_port is not None else 8125, + disable_telemetry=True, + # Use a background thread to send metrics + disable_background_sender=False, ) + # Origin detection is enabled after 0.45 by default. + # Disable it since it silently fails. + # Ref: https://github.com/DataDog/datadogpy/issues/764 + client._container_id = None + + # Call wait_for_pending() before exiting to make sure all pending metrics are sent. + atexit.register(client.wait_for_pending) + self.client = client def _build_tag_list(self, tags: Tags | None, *, with_application: bool) -> list[str]: From 0c255ffb1a316d78a0230352cc732a00daf086b2 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 12 Jun 2026 16:45:02 -0400 Subject: [PATCH 3/4] Fix up default sample_rate for gauge() It should be aligned with the other methods. --- clients/python/src/taskbroker_client/metrics.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index 33dce063..f0279364 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -27,7 +27,7 @@ def gauge( value: float, instance: str | None = None, tags: Tags | None = None, - sample_rate: float = 1, + sample_rate: float | None = None, unit: str | None = None, stacklevel: int = 0, ) -> None: @@ -99,7 +99,7 @@ def gauge( value: float, instance: str | None = None, tags: Tags | None = None, - sample_rate: float = 1, + sample_rate: float | None = None, unit: str | None = None, stacklevel: int = 0, ) -> None: @@ -242,7 +242,7 @@ def gauge( value: float, instance: str | None = None, tags: Tags | None = None, - sample_rate: float = 1, + sample_rate: float | None = None, unit: str | None = None, stacklevel: int = 0, ) -> None: From c3a4dced1ecadbda27d5c2ec0f6fa46d1515eaea Mon Sep 17 00:00:00 2001 From: Mark Story Date: Wed, 17 Jun 2026 11:30:17 -0400 Subject: [PATCH 4/4] Align another setting with sentry --- clients/python/src/taskbroker_client/metrics.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/python/src/taskbroker_client/metrics.py b/clients/python/src/taskbroker_client/metrics.py index f0279364..2a76f99c 100644 --- a/clients/python/src/taskbroker_client/metrics.py +++ b/clients/python/src/taskbroker_client/metrics.py @@ -188,6 +188,8 @@ def __init__( disable_telemetry=True, # Use a background thread to send metrics disable_background_sender=False, + # Allow buffering and background delivery + disable_buffering=False, ) # Origin detection is enabled after 0.45 by default. # Disable it since it silently fails.