feat: Add datadog metrics backend#703
Conversation
We currently require applications to implement their own metrics backend. This has resulted in metrics from the taskworker runtime having inconsistent tags and metric names, which makes building dashboards and alerting for taskworkers tedious. Having a more opinionated metrics backend will allow us to make metric names and tags that are important to observability structurally required. I've also included a prefixed metric shim which will allow us to switch metrics providers without gaps in observability. Refs STREAM-816
| try: | ||
| yield None | ||
| finally: | ||
| self._emit("timing", key, time.monotonic() - start, tags, sample_rate) |
There was a problem hiding this comment.
Timer sends seconds not milliseconds
High Severity
DatadogMetrics.timer emits elapsed wall time from time.monotonic() directly to DogStatsD timing, but that API expects values in milliseconds. Reported durations for RPC and task timers will be roughly three orders of magnitude too small, breaking latency dashboards and alerts.
Reviewed by Cursor Bugbot for commit fe820a1. Configure here.
|
|
||
|
|
||
| def _rss_bytes() -> int: | ||
| return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss |
There was a problem hiding this comment.
Memory metric uses peak RSS
Medium Severity
track_memory_usage derives its distribution from the difference in ru_maxrss, which is a process lifetime high-water mark, not current RSS. Memory freed inside the block does not lower the metric, and on Linux ru_maxrss is in kilobytes despite _rss_bytes implying bytes.
Reviewed by Cursor Bugbot for commit fe820a1. Configure here.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 2078b84. Configure here.
It should be aligned with the other methods.
| instance: str | None = None, | ||
| tags: Tags | None = None, | ||
| sample_rate: float = 1, | ||
| sample_rate: float | None = None, |
There was a problem hiding this comment.
This default value has been silly for a while. Now is a good time to change this.
| client.distribution.assert_called_once() | ||
| args, _ = client.distribution.call_args | ||
| assert args[0] == "taskworker.mem" | ||
| assert isinstance(args[1], int) |
There was a problem hiding this comment.
Bug: The track_memory_usage function incorrectly uses ru_maxrss, a high-water mark for memory, to calculate memory deltas. This can cause test failures and makes the production metric unreliable.
Severity: MEDIUM
Suggested Fix
To accurately measure memory usage within the context, replace resource.getrusage with a method that measures current memory usage, not a historical peak. For example, use a library like psutil to get the current resident set size (RSS) at the beginning and end of the block. The difference between these two values will provide a more accurate measurement of the memory consumed within that specific context.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: clients/python/tests/test_metrics.py#L140
Potential issue: The function `track_memory_usage` calculates memory usage deltas using
`resource.getrusage(resource.RUSAGE_SELF).ru_maxrss`. This value represents the peak
memory usage (a high-water mark) of the process, not the current usage. In the test
`test_track_memory_usage`, an assertion `args[1] > 0` expects a positive memory delta
after an allocation. However, if previous tests have already set a high memory peak, the
new allocation may not increase it, resulting in a delta of zero and a test failure.
This also affects production, where the metric will likely report zero for most tasks,
making it ineffective for its intended purpose of tracking memory usage per task.


We currently require applications to implement their own metrics backend. This has resulted in metrics from the taskworker runtime having inconsistent tags and metric names, which makes building dashboards and alerting for taskworkers tedious.
Having a more opinionated metrics backend will allow us to make metric names and tags that are important to observability structurally required. I've also included a prefixed metric shim which will allow us to switch metrics providers without gaps in observability.
Refs STREAM-816