Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ dev = [
"mypy>=1.17.1",
"time-machine>=2.16.0",
"types-protobuf>=5.27.0.20240626,<6.0.0",
"datadog>=0.49",
]
[project.optional-dependencies]
examples = [
"click>=8.3",
"setuptools>=80.0",
]
datadog = [
"datadog>=0.49",
]

[build-system]
requires = ["uv_build>=0.8.2"]
Expand Down
171 changes: 168 additions & 3 deletions clients/python/src/taskbroker_client/metrics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from __future__ import annotations

import atexit
import resource
import time
from abc import abstractmethod
from collections.abc import Mapping
from contextlib import contextmanager
from typing import Generator, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Generator, Protocol, runtime_checkable

if TYPE_CHECKING:
from datadog.dogstatsd.base import DogStatsd

Tags = Mapping[str, str | int | float]

Expand All @@ -21,7 +27,7 @@ def gauge(
value: float,
instance: str | None = None,
tags: Tags | None = None,
sample_rate: float = 1,
sample_rate: float | None = None,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default value has been silly for a while. Now is a good time to change this.

unit: str | None = None,
stacklevel: int = 0,
) -> None:
Expand Down Expand Up @@ -93,7 +99,7 @@ def gauge(
value: float,
instance: str | None = None,
tags: Tags | None = None,
sample_rate: float = 1,
sample_rate: float | None = None,
unit: str | None = None,
stacklevel: int = 0,
) -> None:
Expand Down Expand Up @@ -139,3 +145,162 @@ def track_memory_usage(
of rss_usage between the context manager opening and closing.
"""
yield None


class DatadogMetrics(MetricsBackend):
"""
An opinionated metrics backend that emits to Datadog via DogStatsD.

All metrics are tagged with ``application`` and ``processing_pool`` so that
dashboards and alerts can be built once and shared across every application
using taskbroker-client, without depending on a host-application metrics
prefix.
Comment thread
sentry[bot] marked this conversation as resolved.

When ``enable_prefixed_metrics`` is enabled each metric is emitted twice: once
prefix-free with ``application`` as a tag, and once with ``application``
as a metric prefix (and not included in tags). This eases migrating existing
alerts and dashboards from the prefixed form to the prefix-free form.

The ``datadog`` package is an optional dependency. Install it with
``pip install taskbroker-client[datadog]``.
"""

def __init__(
self,
application: str,
processing_pool: str | None = None,
statsd_host: str | None = None,
statsd_port: str | int | None = None,
sample_rate: float = 1.0,
enable_prefixed_metrics: bool = False,
client: DogStatsd | None = None,
) -> None:
self.application = application
self.processing_pool = processing_pool or "unknown"
self.sample_rate = sample_rate
self.enable_prefixed_metrics = enable_prefixed_metrics
if client is None:
from datadog.dogstatsd.base import DogStatsd

client = DogStatsd(
host=statsd_host or "localhost",
port=int(statsd_port) if statsd_port is not None else 8125,
disable_telemetry=True,
# Use a background thread to send metrics
disable_background_sender=False,
# Allow buffering and background delivery
disable_buffering=False,
)
# Origin detection is enabled after 0.45 by default.
# Disable it since it silently fails.
# Ref: https://github.com/DataDog/datadogpy/issues/764
client._container_id = None

# Call wait_for_pending() before exiting to make sure all pending metrics are sent.
atexit.register(client.wait_for_pending)

self.client = client

def _build_tag_list(self, tags: Tags | None, *, with_application: bool) -> list[str]:
merged: dict[str, str | int | float] = {"processing_pool": self.processing_pool}
if with_application:
merged["application"] = self.application
if tags:
# Per-call tags win so call sites can override the structural defaults.
merged.update(tags)
return [f"{key}:{value}" for key, value in merged.items()]

def _emit(
self,
method: str,
name: str,
value: float,
tags: Tags | None,
sample_rate: float | None,
) -> None:
rate = self.sample_rate if sample_rate is None else sample_rate
emit = getattr(self.client, method)

# Prefix-free form: application is carried as a tag.
emit(
name,
value,
tags=self._build_tag_list(tags, with_application=True),
sample_rate=rate,
)

# Prefixed form: application is in the metric name and removed from the tags.
if self.enable_prefixed_metrics:
emit(
f"{self.application}.{name}",
value,
tags=self._build_tag_list(tags, with_application=False),
sample_rate=rate,
)

def gauge(
self,
key: str,
value: float,
instance: str | None = None,
tags: Tags | None = None,
sample_rate: float | None = None,
unit: str | None = None,
stacklevel: int = 0,
) -> None:
# instance, unit and stacklevel have no DogStatsD equivalent and are ignored.
self._emit("gauge", key, value, tags, sample_rate)
Comment thread
cursor[bot] marked this conversation as resolved.

def incr(
self,
name: str,
value: int | float = 1,
tags: Tags | None = None,
sample_rate: float | None = None,
) -> None:
self._emit("increment", name, value, tags, sample_rate)

def distribution(
self,
name: str,
value: int | float,
tags: Tags | None = None,
unit: str | None = None,
sample_rate: float | None = None,
) -> None:
# unit has no DogStatsD equivalent and is ignored.
self._emit("distribution", name, value, tags, sample_rate)

@contextmanager
def timer(
self,
key: str,
tags: Tags | None = None,
sample_rate: float | None = None,
stacklevel: int = 0,
) -> Generator[None]:
start = time.monotonic()
try:
yield None
finally:
self._emit("timing", key, time.monotonic() - start, tags, sample_rate)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer sends seconds not milliseconds

High Severity

DatadogMetrics.timer emits elapsed wall time from time.monotonic() directly to DogStatsD timing, but that API expects values in milliseconds. Reported durations for RPC and task timers will be roughly three orders of magnitude too small, breaking latency dashboards and alerts.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit fe820a1. Configure here.

Comment thread
sentry[bot] marked this conversation as resolved.

@contextmanager
def track_memory_usage(
self,
key: str,
tags: Tags | None = None,
) -> Generator[None]:
"""
Records a distribution metric that tracks the delta
of rss usage between the context manager opening and closing.
"""
start = _rss_bytes()
try:
yield None
finally:
self._emit("distribution", key, _rss_bytes() - start, tags, None)


def _rss_bytes() -> int:
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory metric uses peak RSS

Medium Severity

track_memory_usage derives its distribution from the difference in ru_maxrss, which is a process lifetime high-water mark, not current RSS. Memory freed inside the block does not lower the metric, and on Linux ru_maxrss is in kilobytes despite _rss_bytes implying bytes.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit fe820a1. Configure here.

Comment thread
sentry[bot] marked this conversation as resolved.
Comment thread
markstory marked this conversation as resolved.
150 changes: 150 additions & 0 deletions clients/python/tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from unittest.mock import Mock

import pytest

from taskbroker_client.metrics import DatadogMetrics


def make_metrics(
*,
enable_prefixed_metrics: bool = False,
processing_pool: str | None = "ingest-errors",
sample_rate: float = 1.0,
client: Mock | None = None,
) -> tuple[DatadogMetrics, Mock]:
mock_client = client or Mock()
metrics = DatadogMetrics(
application="sentry",
processing_pool=processing_pool,
sample_rate=sample_rate,
enable_prefixed_metrics=enable_prefixed_metrics,
client=mock_client,
)
return metrics, mock_client


def test_incr_prefix_off() -> None:
metrics, client = make_metrics(sample_rate=0.5)
metrics.incr("taskworker.x", tags={"namespace": "n"})

client.increment.assert_called_once()
args, kwargs = client.increment.call_args
assert args[0] == "taskworker.x"
assert args[1] == 1
assert set(kwargs["tags"]) == {
"application:sentry",
"processing_pool:ingest-errors",
"namespace:n",
}
assert kwargs["sample_rate"] == 0.5


def test_incr_prefix_on() -> None:
metrics, client = make_metrics(enable_prefixed_metrics=True)
metrics.incr("taskworker.x")

assert client.increment.call_count == 2

first_args, first_kwargs = client.increment.call_args_list[0]
assert first_args[0] == "taskworker.x"
assert "application:sentry" in first_kwargs["tags"]

second_args, second_kwargs = client.increment.call_args_list[1]
assert second_args[0] == "sentry.taskworker.x"
assert not any(tag.startswith("application:") for tag in second_kwargs["tags"])
assert "processing_pool:ingest-errors" in second_kwargs["tags"]


def test_gauge_ignores_unsupported_params() -> None:
metrics, client = make_metrics()
metrics.gauge("taskworker.size", 12.0, instance="i", unit="bytes", stacklevel=3)

client.gauge.assert_called_once()
args, kwargs = client.gauge.call_args
assert args[0] == "taskworker.size"
assert args[1] == 12.0
assert set(kwargs) == {"tags", "sample_rate"}


def test_distribution_ignores_unit() -> None:
metrics, client = make_metrics()
metrics.distribution("taskworker.duration", 0.25, unit="seconds")

client.distribution.assert_called_once()
args, kwargs = client.distribution.call_args
assert args[0] == "taskworker.duration"
assert args[1] == 0.25
assert "unit" not in kwargs


def test_tag_precedence() -> None:
metrics, client = make_metrics()
metrics.incr("taskworker.x", tags={"processing_pool": "override", "namespace": "n"})

_, kwargs = client.increment.call_args
assert "processing_pool:override" in kwargs["tags"]
assert "processing_pool:ingest-errors" not in kwargs["tags"]


def test_none_tags_still_emit_structural_tags() -> None:
metrics, client = make_metrics()
metrics.incr("taskworker.x")

_, kwargs = client.increment.call_args
assert set(kwargs["tags"]) == {"application:sentry", "processing_pool:ingest-errors"}


def test_sample_rate_defaulting() -> None:
metrics, client = make_metrics(sample_rate=0.1)

metrics.incr("taskworker.x")
assert client.increment.call_args.kwargs["sample_rate"] == 0.1

metrics.incr("taskworker.x", sample_rate=0.5)
assert client.increment.call_args.kwargs["sample_rate"] == 0.5


def test_timer_happy_path() -> None:
metrics, client = make_metrics(enable_prefixed_metrics=True)
with metrics.timer("taskworker.duration", tags={"host": "h"}):
pass

assert client.timing.call_count == 2
args, kwargs = client.timing.call_args_list[0]
assert args[0] == "taskworker.duration"
assert isinstance(args[1], float)
assert "host:h" in kwargs["tags"]


def test_timer_emits_on_exception() -> None:
metrics, client = make_metrics()
with pytest.raises(ValueError):
with metrics.timer("taskworker.dur"):
raise ValueError("boom")

client.timing.assert_called_once()
assert isinstance(client.timing.call_args.args[1], float)


def test_track_memory_usage() -> None:
metrics, client = make_metrics()
with metrics.track_memory_usage("taskworker.mem"):
var = "a" * 1000000
var += "b"

client.distribution.assert_called_once()
args, _ = client.distribution.call_args
assert args[0] == "taskworker.mem"
assert isinstance(args[1], int)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The track_memory_usage function incorrectly uses ru_maxrss, a high-water mark for memory, to calculate memory deltas. This can cause test failures and makes the production metric unreliable.
Severity: MEDIUM

Suggested Fix

To accurately measure memory usage within the context, replace resource.getrusage with a method that measures current memory usage, not a historical peak. For example, use a library like psutil to get the current resident set size (RSS) at the beginning and end of the block. The difference between these two values will provide a more accurate measurement of the memory consumed within that specific context.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: clients/python/tests/test_metrics.py#L140

Potential issue: The function `track_memory_usage` calculates memory usage deltas using
`resource.getrusage(resource.RUSAGE_SELF).ru_maxrss`. This value represents the peak
memory usage (a high-water mark) of the process, not the current usage. In the test
`test_track_memory_usage`, an assertion `args[1] > 0` expects a positive memory delta
after an allocation. However, if previous tests have already set a high memory peak, the
new allocation may not increase it, resulting in a delta of zero and a test failure.
This also affects production, where the metric will likely report zero for most tasks,
making it ineffective for its intended purpose of tracking memory usage per task.

assert args[1] > 0


def test_track_memory_usage_prefixed() -> None:
metrics, client = make_metrics(enable_prefixed_metrics=True)
with metrics.track_memory_usage("taskworker.mem"):
pass

assert client.distribution.call_count == 2
assert client.distribution.call_args_list[1].args[0] == "sentry.taskworker.mem"
Loading
Loading