Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions sdk/src/opendecree/async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import random
import re
from collections import deque
from collections.abc import AsyncIterator, Callable
from typing import Any, TypeVar

Expand All @@ -41,6 +42,8 @@

_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")

_DEFAULT_MAX_QUEUE_SIZE = 1024

T = TypeVar("T")


Expand All @@ -57,15 +60,26 @@ def __init__(
default: T,
*,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> None:
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
self._max_queue_size = max_queue_size
self._dropped_changes = 0
# _change_queue is a deque used as a bounded FIFO. _queue_event gates
# the async changes() iterator when the deque is empty.
self._change_queue: deque[Change | None] = deque()
self._queue_event = asyncio.Event()

@property
def value(self) -> T:
"""The current value — always fresh."""
return self._value

@property
def dropped_changes(self) -> int:
"""Number of changes dropped because the queue was full."""
return self._dropped_changes

def __repr__(self) -> str:
return f"AsyncWatchedField({self._path!r}, value={self._value!r})"

Expand All @@ -75,7 +89,14 @@ async def changes(self) -> AsyncIterator[Change]:
Yields Change objects until the watcher is stopped.
"""
while True:
change = await self._change_queue.get()
await self._queue_event.wait()
if not self._change_queue:
# Spurious wake — clear and re-wait.
self._queue_event.clear()
continue
change = self._change_queue.popleft()
if not self._change_queue:
self._queue_event.clear()
if change is None: # sentinel
return
yield change
Expand All @@ -84,15 +105,27 @@ def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher task."""
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put_nowait(change)
if len(self._change_queue) >= self._max_queue_size:
self._change_queue.popleft()
self._dropped_changes += 1
logger.warning(
"AsyncWatchedField %r: change queue full (max=%d), oldest entry dropped "
"(total dropped: %d)",
self._path,
self._max_queue_size,
self._dropped_changes,
)
self._change_queue.append(change)
self._queue_event.set()

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
self._apply_raw(raw_value)

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
self._change_queue.put_nowait(None)
self._change_queue.append(None)
self._queue_event.set()


class AsyncConfigWatcher:
Expand Down Expand Up @@ -126,6 +159,7 @@ def field(
*,
default: T,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> AsyncWatchedField[T]:
"""Register a field to watch.

Expand All @@ -138,13 +172,18 @@ def field(
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background task.
max_queue_size: Maximum number of unread changes buffered. When the
queue is full, the oldest entry is dropped and ``dropped_changes``
is incremented. Default: 1024.

Returns:
An AsyncWatchedField that tracks the live value.
"""
if self._task is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error)
watched = AsyncWatchedField(
path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size
)
self._fields[path] = watched
return watched

Expand Down
52 changes: 43 additions & 9 deletions sdk/src/opendecree/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from __future__ import annotations

import logging
import queue
import random
import re
import threading
import time
from collections import deque
from collections.abc import Callable, Iterator
from typing import Any, TypeVar

Expand All @@ -38,6 +38,8 @@

logger = logging.getLogger("opendecree.watcher")

_DEFAULT_MAX_QUEUE_SIZE = 1024

_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")

T = TypeVar("T")
Expand All @@ -56,17 +58,29 @@ def __init__(
default: T,
*,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> None:
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._lock = threading.Lock()
self._change_queue: queue.Queue[Change] = queue.Queue()
self._max_queue_size = max_queue_size
self._dropped_changes = 0
# _change_queue is a deque used as a bounded FIFO. _queue_cond is used
# by changes() to block when the deque is empty.
self._change_queue: deque[Change] = deque()
self._queue_cond = threading.Condition(threading.Lock())

@property
def value(self) -> T:
"""The current value — always fresh, thread-safe."""
with self._lock:
return self._value

@property
def dropped_changes(self) -> int:
"""Number of changes dropped because the queue was full."""
with self._queue_cond:
return self._dropped_changes

def __repr__(self) -> str:
return f"WatchedField({self._path!r}, value={self.value!r})"

Expand All @@ -77,10 +91,10 @@ def changes(self) -> Iterator[Change]:
Yields Change objects with old_value and new_value as strings.
"""
while True:
try:
change = self._change_queue.get(timeout=1.0)
except queue.Empty:
continue
with self._queue_cond:
while not self._change_queue:
self._queue_cond.wait(timeout=1.0)
change = self._change_queue.popleft()
if change is _SENTINEL_CHANGE:
return
yield change
Expand All @@ -90,7 +104,19 @@ def _update(self, raw_value: str | None, change: Change) -> None:
with self._lock:
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put(change)
with self._queue_cond:
if len(self._change_queue) >= self._max_queue_size:
self._change_queue.popleft()
self._dropped_changes += 1
logger.warning(
"WatchedField %r: change queue full (max=%d), oldest entry dropped "
"(total dropped: %d)",
self._path,
self._max_queue_size,
self._dropped_changes,
)
self._change_queue.append(change)
self._queue_cond.notify()

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
Expand All @@ -99,7 +125,9 @@ def _load_initial(self, raw_value: str) -> None:

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
self._change_queue.put(_SENTINEL_CHANGE)
with self._queue_cond:
self._change_queue.append(_SENTINEL_CHANGE)
self._queue_cond.notify()


# Sentinel to signal the changes() iterator to stop.
Expand Down Expand Up @@ -130,6 +158,7 @@ def field(
*,
default: T,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> WatchedField[T]:
"""Register a field to watch.

Expand All @@ -142,13 +171,18 @@ def field(
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background loop.
max_queue_size: Maximum number of unread changes buffered. When the
queue is full, the oldest entry is dropped and ``dropped_changes``
is incremented. Default: 1024.

Returns:
A WatchedField that tracks the live value.
"""
if self._thread is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = WatchedField(path, type_, default, on_callback_error=on_callback_error)
watched = WatchedField(
path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size
)
self._fields[path] = watched
return watched

Expand Down
74 changes: 71 additions & 3 deletions sdk/tests/test_async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ def test_update_null_resets_to_default(self):
@pytest.mark.asyncio
async def test_changes_iterator(self):
f = AsyncWatchedField("x", str, "")
f._load_initial("a")

c1 = Change(field_path="x", old_value="a", new_value="b", version=1)
c2 = Change(field_path="x", old_value="b", new_value="c", version=2)

f._change_queue.put_nowait(c1)
f._change_queue.put_nowait(c2)
f._change_queue.put_nowait(None) # sentinel
# Populate via the internal helpers (matching the production path).
f._update("b", c1)
f._update("c", c2)
f._stop()

collected = [c async for c in f.changes()]
assert len(collected) == 2
Expand Down Expand Up @@ -140,6 +142,72 @@ def bad_cb(old: int, new: int) -> None:
assert len(errors) == 1
assert isinstance(errors[0], RuntimeError)

# --- Bounded queue tests ---

def test_dropped_changes_starts_at_zero(self):
f = AsyncWatchedField("x", str, "", max_queue_size=5)
assert f.dropped_changes == 0

def test_queue_fills_without_dropping_below_limit(self):
f = AsyncWatchedField("x", str, "", max_queue_size=3)
for i in range(3):
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
f._update(str(i + 1), c)

assert f.dropped_changes == 0
assert len(f._change_queue) == 3

def test_oldest_entry_dropped_when_queue_full(self):
f = AsyncWatchedField("x", str, "", max_queue_size=3)
for i in range(5):
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
f._update(str(i + 1), c)

assert f.dropped_changes == 2
assert len(f._change_queue) == 3
versions = [c.version for c in f._change_queue]
assert versions == [2, 3, 4]

def test_drop_logs_warning(self, caplog):
import logging

f = AsyncWatchedField("payments.fee", str, "", max_queue_size=2)
with caplog.at_level(logging.WARNING, logger="opendecree.async_watcher"):
for i in range(4):
c = Change(
field_path="payments.fee", old_value=str(i), new_value=str(i + 1), version=i
)
f._update(str(i + 1), c)

assert f.dropped_changes == 2
warning_records = [r for r in caplog.records if "dropped" in r.message]
assert len(warning_records) == 2
assert "payments.fee" in warning_records[0].message

def test_max_queue_size_constructor_arg(self):
f = AsyncWatchedField("x", str, "", max_queue_size=10)
assert f._max_queue_size == 10

def test_default_max_queue_size(self):
from opendecree.async_watcher import _DEFAULT_MAX_QUEUE_SIZE

f = AsyncWatchedField("x", str, "")
assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE
assert _DEFAULT_MAX_QUEUE_SIZE == 1024

@pytest.mark.asyncio
async def test_changes_iterator_after_overflow(self):
f = AsyncWatchedField("x", str, "", max_queue_size=2)
for i in range(4):
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
f._update(str(i + 1), c)
f._stop()

collected = [c async for c in f.changes()]
assert len(collected) == 2
assert collected[0].version == 2
assert collected[1].version == 3


# --- AsyncConfigWatcher unit tests ---

Expand Down
Loading