diff --git a/CHANGELOG.md b/CHANGELOG.md index 821e7f9..02a2c26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,29 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.6.0] - 04/2026 + +### Added + +- **`SpanMqttClient.register_connection_callback(cb)`** — subscribe to broker connection state transitions. Callback fires with `False` on broker disconnect and `True` on reconnect; returns an idempotent unregister function. Added to + `SpanPanelClientProtocol` so any transport that claims the protocol must implement it. +- **`SpanPanelStaleDataError`** exception — raised by `get_snapshot()` when the client is not fully live. Derives from `SpanPanelError` (not from `SpanPanelConnectionError`), because "never connected" and "running but data not currently live" are + semantically distinct states. + +### Changed + +- **`get_snapshot()` contract** — now raises `SpanPanelStaleDataError` when the bridge is not connected or the Homie device has not reached ready state. Previously, the method silently returned a snapshot built from whatever the in-memory accumulator + happened to hold, which made offline panels indistinguishable from online ones. This is the primary reason the span integration could not detect panel-offline transitions. + +### Fixed + +- **Stale snapshot dispatch after bridge disconnect** — a pending snapshot-debounce timer scheduled just before a bridge disconnect could fire afterwards, delivering a snapshot built from the still-`ready()` accumulator to subscribers. + `_on_connection_change(False)` now cancels the pending timer, and `_dispatch_snapshot` is now guarded by the same liveness predicate as `get_snapshot()`, so push consumers never receive a post-disconnect stale snapshot. + +### Breaking + +- Consumers of `get_snapshot()` must now handle `SpanPanelStaleDataError`. Any consumer with a broad `except Exception` (or `except SpanPanelError`) branch already handles this correctly. + ## [2.5.4] - 04/2026 ### Reverted diff --git a/README.md b/README.md index 521b40a..eb77735 100644 --- a/README.md +++ b/README.md @@ -71,12 +71,12 @@ This ensures that the first `get_snapshot()` after connect returns human-readabl The library defines three structural subtyping protocols (PEP 544) that both the MQTT transport and the simulation engine implement: -| Protocol | Purpose | -| -------------------------- | ------------------------------------------------------------------------------------- | -| `SpanPanelClientProtocol` | Core lifecycle: `connect`, `close`, `ping`, `get_snapshot` | -| `CircuitControlProtocol` | Relay and shed-priority control: `set_circuit_relay`, `set_circuit_priority` | -| `PanelControlProtocol` | Panel-level control: `set_dominant_power_source` | -| `StreamingCapableProtocol` | Push-based updates: `register_snapshot_callback`, `start_streaming`, `stop_streaming` | +| Protocol | Purpose | +| -------------------------- | ------------------------------------------------------------------------------------------ | +| `SpanPanelClientProtocol` | Core lifecycle: `connect`, `close`, `ping`, `get_snapshot`, `register_connection_callback` | +| `CircuitControlProtocol` | Relay and shed-priority control: `set_circuit_relay`, `set_circuit_priority` | +| `PanelControlProtocol` | Panel-level control: `set_dominant_power_source` | +| `StreamingCapableProtocol` | Push-based updates: `register_snapshot_callback`, `start_streaming`, `stop_streaming` | Integration code programs against these protocols, not transport-specific classes. @@ -160,6 +160,29 @@ async def main(): asyncio.run(main()) ``` +### Connection State Monitoring + +Push consumers that need to react to broker disconnect/reconnect events — for example, to mark downstream entities offline within a second of a dropped connection rather than waiting on a fallback poll — can register a connection callback. The callback +fires `False` on disconnect and `True` on reconnect, edge-only (no synthetic call at registration time): + +```python +def on_connection_change(connected: bool) -> None: + if connected: + print("Broker connection restored") + else: + print("Broker connection lost") + +unsubscribe_connection = client.register_connection_callback(on_connection_change) + +# Later, during teardown: +unsubscribe_connection() +``` + +To check the current connection state on demand (for example, just after registering), call `await client.ping()`. + +When the client is not fully live (broker disconnected, or Homie device not yet ready), `await client.get_snapshot()` raises `SpanPanelStaleDataError` instead of returning cached data. Treat that exception as the canonical "panel currently unreachable" +signal — see [Error Handling](#error-handling) below. + ### Pre-Built Config Pattern If you already have MQTT broker credentials (e.g., stored from a previous registration): @@ -295,17 +318,25 @@ await delete_fqdn("192.168.1.100", token=auth.access_token) All exceptions inherit from `SpanPanelError`: -| Exception | Cause | -| -------------------------- | --------------------------------------------------------- | -| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials | -| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) | -| `SpanPanelTimeoutError` | Request or connection timed out | -| `SpanPanelValidationError` | Data validation failure | -| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints | -| `SpanPanelServerError` | Panel returned HTTP 500 | +| Exception | Cause | +| -------------------------- | -------------------------------------------------------------------------------------------------- | +| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials | +| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) during initial connect | +| `SpanPanelStaleDataError` | `get_snapshot()` called while the broker is disconnected or the Homie device has not reached ready | +| `SpanPanelTimeoutError` | Request or connection timed out | +| `SpanPanelValidationError` | Data validation failure | +| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints | +| `SpanPanelServerError` | Panel returned HTTP 500 | + +`SpanPanelStaleDataError` is distinct from `SpanPanelConnectionError`: the former means the client is running but data cannot be trusted right now (transient disconnect, or panel-declared not-ready); the latter means the initial connect failed and the +client cannot be used at all. ```python -from span_panel_api import SpanPanelAuthError, SpanPanelConnectionError +from span_panel_api import ( + SpanPanelAuthError, + SpanPanelConnectionError, + SpanPanelStaleDataError, +) try: client = await create_span_client(host="192.168.1.100", passphrase="wrong") @@ -313,6 +344,14 @@ except SpanPanelAuthError: print("Invalid passphrase") except SpanPanelConnectionError: print("Cannot reach panel") + +# Later, during normal operation: +try: + snapshot = await client.get_snapshot() +except SpanPanelStaleDataError as err: + # Broker dropped or panel declared not-ready — fall back to last-known + # data, a grace-period value, or mark downstream state unavailable. + print(f"Snapshot unavailable: {err}") ``` ## Capabilities diff --git a/pyproject.toml b/pyproject.toml index 8d03d97..9e97a58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "span-panel-api" -version = "2.5.4" +version = "2.6.0" description = "A client library for SPAN Panel API" authors = [ {name = "SpanPanel"} diff --git a/src/span_panel_api/__init__.py b/src/span_panel_api/__init__.py index a9d0c68..62ab74f 100644 --- a/src/span_panel_api/__init__.py +++ b/src/span_panel_api/__init__.py @@ -23,6 +23,7 @@ SpanPanelConnectionError, SpanPanelError, SpanPanelServerError, + SpanPanelStaleDataError, SpanPanelTimeoutError, SpanPanelValidationError, ) @@ -109,6 +110,7 @@ "SpanPanelConnectionError", "SpanPanelError", "SpanPanelServerError", + "SpanPanelStaleDataError", "SpanPanelTimeoutError", "SpanPanelValidationError", ] diff --git a/src/span_panel_api/exceptions.py b/src/span_panel_api/exceptions.py index d730121..6681ef4 100644 --- a/src/span_panel_api/exceptions.py +++ b/src/span_panel_api/exceptions.py @@ -34,3 +34,12 @@ def __str__(self) -> str: class SpanPanelServerError(SpanPanelAPIError): """Server error (500).""" + + +class SpanPanelStaleDataError(SpanPanelError): + """Raised when get_snapshot() is called while the client isn't live. + + Distinct from SpanPanelConnectionError: this means the client is running + but data cannot be trusted right now (broker disconnected, or the Homie + device has declared $state=disconnected/lost). + """ diff --git a/src/span_panel_api/mqtt/client.py b/src/span_panel_api/mqtt/client.py index 81c940b..a412087 100644 --- a/src/span_panel_api/mqtt/client.py +++ b/src/span_panel_api/mqtt/client.py @@ -9,10 +9,11 @@ import asyncio from collections.abc import Awaitable, Callable +import contextlib import logging from ..auth import get_homie_schema -from ..exceptions import SpanPanelConnectionError, SpanPanelServerError +from ..exceptions import SpanPanelConnectionError, SpanPanelServerError, SpanPanelStaleDataError from ..models import FieldMetadata, HomieSchemaTypes, SpanPanelSnapshot from ..protocol import PanelCapability from .accumulator import HomiePropertyAccumulator @@ -52,6 +53,8 @@ def __init__( self._homie: HomieDeviceConsumer | None = None self._streaming = False self._snapshot_callbacks: list[Callable[[SpanPanelSnapshot], Awaitable[None]]] = [] + self._connection_callbacks: list[Callable[[bool], None]] = [] + self._live = False self._ready_event: asyncio.Event | None = None self._loop: asyncio.AbstractEventLoop | None = None self._background_tasks: set[asyncio.Task[None]] = set() @@ -190,6 +193,7 @@ async def close(self) -> None: await self._bridge.disconnect() self._bridge = None self._accumulator = None + self._live = False async def ping(self) -> bool: """Check if MQTT connection is alive and device is ready.""" @@ -197,12 +201,43 @@ async def ping(self) -> bool: return False return self._bridge.is_connected() and self._homie.is_ready() + def register_connection_callback(self, callback: Callable[[bool], None]) -> Callable[[], None]: + """Subscribe to broker connection state transitions. + + Callback fires with False on broker disconnect and True on reconnect. + No synthetic call is made at registration time — callbacks only fire + on real state edges. To check current connection state on registration, + await ping(). + + Returns an unregister function that removes the callback from the + dispatch list. Calling unregister twice is safe. + """ + self._connection_callbacks.append(callback) + + def unregister() -> None: + with contextlib.suppress(ValueError): + self._connection_callbacks.remove(callback) + + return unregister + async def get_snapshot(self) -> SpanPanelSnapshot: """Return current snapshot from accumulated MQTT state. - No network call — snapshot is built from in-memory property values. + Raises SpanPanelStaleDataError if the client is not fully live. + "Live" means: the bridge is connected AND the Homie accumulator + has reached ready state. Callers can treat SpanPanelStaleDataError + as the canonical "panel currently unreachable" signal. + + No network call — snapshot is built from in-memory property values + when the liveness checks pass. """ - return self._require_homie().build_snapshot() + if self._bridge is None or self._homie is None: + raise SpanPanelStaleDataError("Client not connected — call connect() first") + if not self._bridge.is_connected(): + raise SpanPanelStaleDataError("MQTT broker disconnected") + if not self._homie.is_ready(): + raise SpanPanelStaleDataError("Homie device not ready") + return self._homie.build_snapshot() # -- CircuitControlProtocol -------------------------------------------- @@ -296,15 +331,42 @@ def _on_message(self, topic: str, payload: str) -> None: self._snapshot_timer = self._loop.call_later(self._snapshot_interval, self._fire_snapshot) def _on_connection_change(self, connected: bool) -> None: - """Handle MQTT connection state change (called from asyncio loop).""" + """Handle MQTT connection state change (called from asyncio loop). + + Re-subscribes to the wildcard topic on reconnect (pre-existing + behavior), then fans out an edge-only notification to registered + connection callbacks. Duplicate state transitions are suppressed + so subscribers only see real edges. + + On disconnect, any pending snapshot-debounce timer is cancelled + so a stale timer cannot dispatch a post-disconnect snapshot. + """ + # Re-subscribe runs on every connected=True, including duplicates — + # paho may re-emit connected events after session restoration, and + # re-subscribing is broker-benign. Callback fan-out below is + # edge-only (see the guard after this block). if connected: _LOGGER.debug("MQTT connection established") - # Re-subscribe on reconnect if self._bridge is not None: wildcard = WILDCARD_TOPIC_FMT.format(serial=self._serial_number) self._bridge.subscribe(wildcard, qos=0) else: _LOGGER.debug("MQTT connection lost") + # Cancel any pending snapshot-debounce timer so it cannot + # fire post-disconnect with a stale snapshot. + self._cancel_snapshot_timer() + + # Edge-only dispatch + if connected == self._live: + return + self._live = connected + + # Iterate a copy — subscribers may unregister during their callback + for cb in list(self._connection_callbacks): + try: + cb(connected) + except Exception: # pylint: disable=broad-exception-caught + _LOGGER.exception("Connection callback raised") async def _wait_for_circuit_names(self, timeout: float) -> None: """Wait for all circuit-like nodes to have a ``name`` property. @@ -364,8 +426,24 @@ def set_snapshot_interval(self, interval: float) -> None: self._cancel_snapshot_timer() async def _dispatch_snapshot(self) -> None: - """Build snapshot and send to all registered callbacks.""" - snapshot = self._require_homie().build_snapshot() + """Build snapshot and send to all registered callbacks. + + Guarded by the same liveness predicate as get_snapshot() — if the + bridge has disconnected or the Homie device is not ready, no + dispatch occurs. This prevents a pending debounce timer that was + scheduled just before a disconnect from delivering a stale + snapshot to subscribers after the fact. + """ + bridge = self._bridge + homie = self._homie + if bridge is None or not bridge.is_connected() or homie is None or not homie.is_ready(): + _LOGGER.debug( + "Skipping stale snapshot dispatch (bridge_connected=%s, homie_ready=%s)", + bridge is not None and bridge.is_connected(), + homie is not None and homie.is_ready(), + ) + return + snapshot = homie.build_snapshot() for cb in list(self._snapshot_callbacks): try: await cb(snapshot) diff --git a/src/span_panel_api/protocol.py b/src/span_panel_api/protocol.py index cebe672..90ced1e 100644 --- a/src/span_panel_api/protocol.py +++ b/src/span_panel_api/protocol.py @@ -46,6 +46,8 @@ async def ping(self) -> bool: ... async def get_snapshot(self) -> SpanPanelSnapshot: ... + def register_connection_callback(self, callback: Callable[[bool], None]) -> Callable[[], None]: ... + @runtime_checkable class CircuitControlProtocol(Protocol): diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100644 index 0000000..7d2a969 --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,23 @@ +"""Tests for the span_panel_api exception hierarchy.""" + +from __future__ import annotations + +from span_panel_api.exceptions import SpanPanelError + + +def test_stale_data_error_derives_from_span_panel_error() -> None: + from span_panel_api.exceptions import SpanPanelStaleDataError + + err = SpanPanelStaleDataError("example") + assert isinstance(err, SpanPanelError) + assert str(err) == "example" + + +def test_stale_data_error_is_distinct_from_connection_error() -> None: + from span_panel_api.exceptions import ( + SpanPanelConnectionError, + SpanPanelStaleDataError, + ) + + err = SpanPanelStaleDataError("example") + assert not isinstance(err, SpanPanelConnectionError) diff --git a/tests/test_mqtt_client_connection.py b/tests/test_mqtt_client_connection.py new file mode 100644 index 0000000..4dc1b0e --- /dev/null +++ b/tests/test_mqtt_client_connection.py @@ -0,0 +1,431 @@ +"""Tests for SpanMqttClient connection callbacks and get_snapshot() liveness guards.""" + +from __future__ import annotations + +import logging + +import pytest + +from span_panel_api.exceptions import SpanPanelError, SpanPanelStaleDataError +from span_panel_api.models import SpanPanelSnapshot +from span_panel_api.mqtt.client import SpanMqttClient +from span_panel_api.mqtt.connection import AsyncMqttBridge +from span_panel_api.mqtt.const import WILDCARD_TOPIC_FMT +from span_panel_api.mqtt.homie import HomieDeviceConsumer +from span_panel_api.mqtt.models import MqttClientConfig + + +def _make_client() -> SpanMqttClient: + """Build a SpanMqttClient without I/O for unit testing.""" + config = MqttClientConfig( + broker_host="127.0.0.1", + username="test", + password="test", + ) + return SpanMqttClient("127.0.0.1", "test-serial", config) + + +class _FakeBridge(AsyncMqttBridge): + """Minimal bridge stub for get_snapshot() and fan-out tests. + + Bypasses AsyncMqttBridge.__init__ (which does TLS/CA/network setup) — + only is_connected() and subscribe() are invoked on this stub. + """ + + def __init__(self, connected: bool = True) -> None: + # Intentionally do not call super().__init__ — avoids I/O setup. + self._connected = connected + self.subscribed_topics: list[tuple[str, int]] = [] + + def is_connected(self) -> bool: + return self._connected + + def subscribe(self, topic: str, qos: int = 0) -> None: + self.subscribed_topics.append((topic, qos)) + + +class _FakeHomie(HomieDeviceConsumer): + """Minimal Homie stub for get_snapshot() tests. + + Bypasses HomieDeviceConsumer.__init__ — only is_ready() and + build_snapshot() are invoked on this stub. + """ + + def __init__(self, ready: bool = True, snapshot: SpanPanelSnapshot | None = None) -> None: + # Intentionally do not call super().__init__ — avoids accumulator setup. + self._ready_flag = ready + self._snapshot = snapshot + + def is_ready(self) -> bool: + return self._ready_flag + + def build_snapshot(self) -> SpanPanelSnapshot: + if self._snapshot is None: + raise RuntimeError("_FakeHomie: no snapshot configured") + return self._snapshot + + +class TestRegisterConnectionCallback: + """Callback subscription API — structural only (fan-out is tested in Task 4).""" + + def test_register_returns_unregister_function(self) -> None: + client = _make_client() + unregister = client.register_connection_callback(lambda _c: None) + assert callable(unregister) + + def test_register_appends_to_callback_list(self) -> None: + client = _make_client() + cb = lambda _c: None # noqa: E731 + client.register_connection_callback(cb) + assert cb in client._connection_callbacks + + def test_unregister_removes_from_callback_list(self) -> None: + client = _make_client() + cb = lambda _c: None # noqa: E731 + unregister = client.register_connection_callback(cb) + unregister() + assert cb not in client._connection_callbacks + + def test_double_unregister_is_noop(self) -> None: + client = _make_client() + unregister = client.register_connection_callback(lambda _c: None) + unregister() + unregister() # must not raise + + +class TestConnectionEventDispatch: + """Edge-only fan-out in _on_connection_change.""" + + def test_multiple_callbacks_all_fire(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + calls_a: list[bool] = [] + calls_b: list[bool] = [] + client.register_connection_callback(calls_a.append) + client.register_connection_callback(calls_b.append) + + client._on_connection_change(True) + + assert calls_a == [True] + assert calls_b == [True] + + def test_unregister_prevents_future_calls(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + calls: list[bool] = [] + unregister = client.register_connection_callback(calls.append) + unregister() + + client._on_connection_change(True) + + assert calls == [] + + def test_initial_false_to_true_fires_online(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(True) + + assert calls == [True] + assert client._live is True + + def test_true_to_false_fires_offline(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._live = True + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(False) + + assert calls == [False] + assert client._live is False + + def test_duplicate_true_suppressed(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._live = True + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(True) + + assert calls == [] + + def test_duplicate_false_suppressed(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=False) + client._live = False + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(False) + + assert calls == [] + + def test_callback_exception_does_not_break_fanout(self, caplog: pytest.LogCaptureFixture) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + + def bad(_connected: bool) -> None: + raise RuntimeError("intentional") + + good_calls: list[bool] = [] + client.register_connection_callback(bad) + client.register_connection_callback(good_calls.append) + + with caplog.at_level(logging.ERROR): + client._on_connection_change(True) + + assert good_calls == [True] + assert any("Connection callback raised" in r.message for r in caplog.records) + + def test_unregister_during_fanout_safe(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + + order: list[str] = [] + unregister_holder: dict[str, object] = {} + + def first(_connected: bool) -> None: + order.append("first") + unregister_fn = unregister_holder["unregister"] + assert callable(unregister_fn) + unregister_fn() + + def second(_connected: bool) -> None: + order.append("second") + + unregister_holder["unregister"] = client.register_connection_callback(first) + client.register_connection_callback(second) + + client._on_connection_change(True) + + assert order == ["first", "second"] + assert first not in client._connection_callbacks + + def test_reconnect_triggers_resubscribe_and_callback(self) -> None: + client = _make_client() + bridge = _FakeBridge(connected=True) + client._bridge = bridge + client._live = False # was offline + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(True) + + expected_topic = WILDCARD_TOPIC_FMT.format(serial="test-serial") + assert len(bridge.subscribed_topics) == 1 + assert bridge.subscribed_topics[0] == (expected_topic, 0) + assert calls == [True] + + def test_resubscribe_fires_even_on_duplicate_true(self) -> None: + """Duplicate connected=True still triggers re-subscribe (intentional). + + Paho may re-emit connected events; re-subscribing is broker-benign + and ensures subscriptions survive session restoration. Callback + fan-out is separately edge-only. + """ + client = _make_client() + bridge = _FakeBridge(connected=True) + client._bridge = bridge + client._live = True # already online + calls: list[bool] = [] + client.register_connection_callback(calls.append) + + client._on_connection_change(True) + + # Re-subscribe fires (side effect preserved) but no callback edge + assert len(bridge.subscribed_topics) == 1 + assert calls == [] + + +def _make_sentinel_snapshot() -> SpanPanelSnapshot: + """Build a minimal SpanPanelSnapshot for identity-assertion tests. + + Field values are arbitrary — tests only assert object identity + (`snapshot is sentinel`), not content. If SpanPanelSnapshot grows + a new required field, add it here with a zero/empty default; no + test assertion needs to change. + """ + return SpanPanelSnapshot( + serial_number="test-serial", + firmware_version="0.0.0", + main_relay_state="CLOSED", + instant_grid_power_w=0.0, + feedthrough_power_w=0.0, + main_meter_energy_consumed_wh=0.0, + main_meter_energy_produced_wh=0.0, + feedthrough_energy_consumed_wh=0.0, + feedthrough_energy_produced_wh=0.0, + dsm_state="DSM_GRID_OK", + current_run_config="PANEL_ON_GRID", + door_state="CLOSED", + proximity_proven=True, + uptime_s=0, + eth0_link=False, + wlan_link=False, + wwan_link=False, + panel_size=32, + ) + + +class TestGetSnapshotLiveness: + """get_snapshot() must raise SpanPanelStaleDataError when not live.""" + + async def test_raises_stale_when_bridge_none(self) -> None: + client = _make_client() + client._bridge = None + client._homie = _FakeHomie(ready=True) + + with pytest.raises(SpanPanelStaleDataError) as exc_info: + await client.get_snapshot() + assert "not connected" in str(exc_info.value).lower() + + async def test_raises_stale_when_homie_none(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._homie = None + + with pytest.raises(SpanPanelStaleDataError) as exc_info: + await client.get_snapshot() + assert "not connected" in str(exc_info.value).lower() + + async def test_raises_stale_when_broker_disconnected(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=False) + client._homie = _FakeHomie(ready=True) + + with pytest.raises(SpanPanelStaleDataError) as exc_info: + await client.get_snapshot() + assert "broker" in str(exc_info.value).lower() + + async def test_raises_stale_when_homie_not_ready(self) -> None: + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._homie = _FakeHomie(ready=False) + + with pytest.raises(SpanPanelStaleDataError) as exc_info: + await client.get_snapshot() + assert "not ready" in str(exc_info.value).lower() + + async def test_returns_snapshot_when_fully_live(self) -> None: + sentinel = _make_sentinel_snapshot() + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._homie = _FakeHomie(ready=True, snapshot=sentinel) + + snapshot = await client.get_snapshot() + assert snapshot is sentinel + + async def test_raised_exception_is_span_panel_error(self) -> None: + client = _make_client() + client._bridge = None + client._homie = None + + with pytest.raises(SpanPanelError): + await client.get_snapshot() + + +class TestCloseBehavior: + """close() must reset internal state for safe re-use.""" + + async def test_close_resets_live_flag(self) -> None: + client = _make_client() + client._live = True # simulate a prior connection + + await client.close() + + assert client._live is False + + +class TestStaleSnapshotDispatchGuard: + """Post-disconnect stale-snapshot dispatch must not reach subscribers. + + The library's debounced snapshot dispatch schedules a timer on every + incoming message. A timer scheduled just before a bridge disconnect + will still fire afterwards unless cancelled. Subscribers must never + receive a snapshot built after disconnect — see connection.py. + """ + + @pytest.mark.asyncio + async def test_dispatch_snapshot_bails_when_bridge_disconnected(self, caplog: pytest.LogCaptureFixture) -> None: + snapshot_sentinel = _make_sentinel_snapshot() + client = _make_client() + client._bridge = _FakeBridge(connected=False) + client._homie = _FakeHomie(ready=True, snapshot=snapshot_sentinel) + + calls: list[SpanPanelSnapshot] = [] + + async def record(snapshot: SpanPanelSnapshot) -> None: + calls.append(snapshot) + + client._snapshot_callbacks.append(record) + + with caplog.at_level(logging.DEBUG, logger="span_panel_api.mqtt.client"): + await client._dispatch_snapshot() + + assert calls == [] + assert any("Skipping stale snapshot dispatch" in r.message for r in caplog.records) + + @pytest.mark.asyncio + async def test_dispatch_snapshot_bails_when_homie_not_ready(self) -> None: + snapshot_sentinel = _make_sentinel_snapshot() + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._homie = _FakeHomie(ready=False, snapshot=snapshot_sentinel) + + calls: list[SpanPanelSnapshot] = [] + + async def record(snapshot: SpanPanelSnapshot) -> None: + calls.append(snapshot) + + client._snapshot_callbacks.append(record) + + await client._dispatch_snapshot() + + assert calls == [] + + @pytest.mark.asyncio + async def test_dispatch_snapshot_delivers_when_live(self) -> None: + snapshot_sentinel = _make_sentinel_snapshot() + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._homie = _FakeHomie(ready=True, snapshot=snapshot_sentinel) + + calls: list[SpanPanelSnapshot] = [] + + async def record(snapshot: SpanPanelSnapshot) -> None: + calls.append(snapshot) + + client._snapshot_callbacks.append(record) + + await client._dispatch_snapshot() + + assert calls == [snapshot_sentinel] + + def test_on_connection_change_false_cancels_snapshot_timer(self) -> None: + """Disconnect must cancel any pending debounce timer.""" + client = _make_client() + client._bridge = _FakeBridge(connected=True) + client._live = True # previously connected + + # Simulate a scheduled debounce timer + fired: list[None] = [] + + class _FakeHandle: + cancelled = False + + def cancel(self) -> None: + self.cancelled = True + fired.append(None) + + handle = _FakeHandle() + client._snapshot_timer = handle # type: ignore[assignment] + + client._on_connection_change(False) + + assert handle.cancelled is True + assert client._snapshot_timer is None diff --git a/tests/test_mqtt_homie.py b/tests/test_mqtt_homie.py index 14fd2f5..fa92a0e 100644 --- a/tests/test_mqtt_homie.py +++ b/tests/test_mqtt_homie.py @@ -18,7 +18,7 @@ import json import time -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -39,6 +39,7 @@ TYPE_PV, ) from span_panel_api.mqtt.accumulator import HomiePropertyAccumulator +from span_panel_api.mqtt.connection import AsyncMqttBridge from span_panel_api.mqtt.homie import HomieDeviceConsumer from span_panel_api.mqtt.models import MqttClientConfig from span_panel_api.protocol import ( @@ -46,6 +47,17 @@ ) +class _ConnectedBridge(AsyncMqttBridge): + """Minimal bridge stub: always reports connected. No I/O setup.""" + + def __init__(self) -> None: # noqa: D107 + # Bypass AsyncMqttBridge.__init__ — avoids TLS/network setup. + pass + + def is_connected(self) -> bool: # noqa: D102 + return True + + SERIAL = "nj-2316-XXXX" PREFIX = f"{TOPIC_PREFIX}/{SERIAL}" @@ -1091,6 +1103,7 @@ async def test_get_snapshot_returns_homie_state(self): client = SpanMqttClient(host="192.168.1.1", serial_number=SERIAL, broker_config=config) client._accumulator = HomiePropertyAccumulator(SERIAL) client._homie = HomieDeviceConsumer(client._accumulator, panel_size=32) + client._bridge = _ConnectedBridge() # Manually ready the homie consumer client._homie.handle_message(f"{PREFIX}/$state", "ready") diff --git a/uv.lock b/uv.lock index 906a361..95804b8 100644 --- a/uv.lock +++ b/uv.lock @@ -1292,7 +1292,7 @@ wheels = [ [[package]] name = "span-panel-api" -version = "2.5.4" +version = "2.6.0" source = { editable = "." } dependencies = [ { name = "httpx" },