Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,29 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.6.0] - 04/2026

### Added

- **`SpanMqttClient.register_connection_callback(cb)`** — subscribe to broker connection state transitions. Callback fires with `False` on broker disconnect and `True` on reconnect; returns an idempotent unregister function. Added to
`SpanPanelClientProtocol` so any transport that claims the protocol must implement it.
- **`SpanPanelStaleDataError`** exception — raised by `get_snapshot()` when the client is not fully live. Derives from `SpanPanelError` (not from `SpanPanelConnectionError`), because "never connected" and "running but data not currently live" are
semantically distinct states.

### Changed

- **`get_snapshot()` contract** — now raises `SpanPanelStaleDataError` when the bridge is not connected or the Homie device has not reached ready state. Previously, the method silently returned a snapshot built from whatever the in-memory accumulator
happened to hold, which made offline panels indistinguishable from online ones. This is the primary reason the span integration could not detect panel-offline transitions.

### Fixed

- **Stale snapshot dispatch after bridge disconnect** — a pending snapshot-debounce timer scheduled just before a bridge disconnect could fire afterwards, delivering a snapshot built from the still-`ready()` accumulator to subscribers.
`_on_connection_change(False)` now cancels the pending timer, and `_dispatch_snapshot` is now guarded by the same liveness predicate as `get_snapshot()`, so push consumers never receive a post-disconnect stale snapshot.

### Breaking

- Consumers of `get_snapshot()` must now handle `SpanPanelStaleDataError`. Any consumer with a broad `except Exception` (or `except SpanPanelError`) branch already handles this correctly.

## [2.5.4] - 04/2026

### Reverted
Expand Down
69 changes: 54 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ This ensures that the first `get_snapshot()` after connect returns human-readabl

The library defines three structural subtyping protocols (PEP 544) that both the MQTT transport and the simulation engine implement:

| Protocol | Purpose |
| -------------------------- | ------------------------------------------------------------------------------------- |
| `SpanPanelClientProtocol` | Core lifecycle: `connect`, `close`, `ping`, `get_snapshot` |
| `CircuitControlProtocol` | Relay and shed-priority control: `set_circuit_relay`, `set_circuit_priority` |
| `PanelControlProtocol` | Panel-level control: `set_dominant_power_source` |
| `StreamingCapableProtocol` | Push-based updates: `register_snapshot_callback`, `start_streaming`, `stop_streaming` |
| Protocol | Purpose |
| -------------------------- | ------------------------------------------------------------------------------------------ |
| `SpanPanelClientProtocol` | Core lifecycle: `connect`, `close`, `ping`, `get_snapshot`, `register_connection_callback` |
| `CircuitControlProtocol` | Relay and shed-priority control: `set_circuit_relay`, `set_circuit_priority` |
| `PanelControlProtocol` | Panel-level control: `set_dominant_power_source` |
| `StreamingCapableProtocol` | Push-based updates: `register_snapshot_callback`, `start_streaming`, `stop_streaming` |

Integration code programs against these protocols, not transport-specific classes.

Expand Down Expand Up @@ -160,6 +160,29 @@ async def main():
asyncio.run(main())
```

### Connection State Monitoring

Push consumers that need to react to broker disconnect/reconnect events — for example, to mark downstream entities offline within a second of a dropped connection rather than waiting on a fallback poll — can register a connection callback. The callback
fires `False` on disconnect and `True` on reconnect, edge-only (no synthetic call at registration time):

```python
def on_connection_change(connected: bool) -> None:
if connected:
print("Broker connection restored")
else:
print("Broker connection lost")

unsubscribe_connection = client.register_connection_callback(on_connection_change)

# Later, during teardown:
unsubscribe_connection()
```

To check the current connection state on demand (for example, just after registering), call `await client.ping()`.

When the client is not fully live (broker disconnected, or Homie device not yet ready), `await client.get_snapshot()` raises `SpanPanelStaleDataError` instead of returning cached data. Treat that exception as the canonical "panel currently unreachable"
signal — see [Error Handling](#error-handling) below.

### Pre-Built Config Pattern

If you already have MQTT broker credentials (e.g., stored from a previous registration):
Expand Down Expand Up @@ -295,24 +318,40 @@ await delete_fqdn("192.168.1.100", token=auth.access_token)

All exceptions inherit from `SpanPanelError`:

| Exception | Cause |
| -------------------------- | --------------------------------------------------------- |
| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials |
| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) |
| `SpanPanelTimeoutError` | Request or connection timed out |
| `SpanPanelValidationError` | Data validation failure |
| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints |
| `SpanPanelServerError` | Panel returned HTTP 500 |
| Exception | Cause |
| -------------------------- | -------------------------------------------------------------------------------------------------- |
| `SpanPanelAuthError` | Invalid passphrase, expired token, or missing credentials |
| `SpanPanelConnectionError` | Cannot reach the panel (network/DNS) during initial connect |
| `SpanPanelStaleDataError` | `get_snapshot()` called while the broker is disconnected or the Homie device has not reached ready |
| `SpanPanelTimeoutError` | Request or connection timed out |
| `SpanPanelValidationError` | Data validation failure |
| `SpanPanelAPIError` | Unexpected HTTP response from v2 endpoints |
| `SpanPanelServerError` | Panel returned HTTP 500 |

`SpanPanelStaleDataError` is distinct from `SpanPanelConnectionError`: the former means the client is running but data cannot be trusted right now (transient disconnect, or panel-declared not-ready); the latter means the initial connect failed and the
client cannot be used at all.

```python
from span_panel_api import SpanPanelAuthError, SpanPanelConnectionError
from span_panel_api import (
SpanPanelAuthError,
SpanPanelConnectionError,
SpanPanelStaleDataError,
)

try:
client = await create_span_client(host="192.168.1.100", passphrase="wrong")
except SpanPanelAuthError:
print("Invalid passphrase")
except SpanPanelConnectionError:
print("Cannot reach panel")

# Later, during normal operation:
try:
snapshot = await client.get_snapshot()
except SpanPanelStaleDataError as err:
# Broker dropped or panel declared not-ready — fall back to last-known
# data, a grace-period value, or mark downstream state unavailable.
print(f"Snapshot unavailable: {err}")
```

## Capabilities
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "span-panel-api"
version = "2.5.4"
version = "2.6.0"
description = "A client library for SPAN Panel API"
authors = [
{name = "SpanPanel"}
Expand Down
2 changes: 2 additions & 0 deletions src/span_panel_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SpanPanelConnectionError,
SpanPanelError,
SpanPanelServerError,
SpanPanelStaleDataError,
SpanPanelTimeoutError,
SpanPanelValidationError,
)
Expand Down Expand Up @@ -109,6 +110,7 @@
"SpanPanelConnectionError",
"SpanPanelError",
"SpanPanelServerError",
"SpanPanelStaleDataError",
"SpanPanelTimeoutError",
"SpanPanelValidationError",
]
Expand Down
9 changes: 9 additions & 0 deletions src/span_panel_api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ def __str__(self) -> str:

class SpanPanelServerError(SpanPanelAPIError):
"""Server error (500)."""


class SpanPanelStaleDataError(SpanPanelError):
"""Raised when get_snapshot() is called while the client isn't live.

Distinct from SpanPanelConnectionError: this means the client is running
but data cannot be trusted right now (broker disconnected, or the Homie
device has declared $state=disconnected/lost).
"""
92 changes: 85 additions & 7 deletions src/span_panel_api/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import asyncio
from collections.abc import Awaitable, Callable
import contextlib
import logging

from ..auth import get_homie_schema
from ..exceptions import SpanPanelConnectionError, SpanPanelServerError
from ..exceptions import SpanPanelConnectionError, SpanPanelServerError, SpanPanelStaleDataError
from ..models import FieldMetadata, HomieSchemaTypes, SpanPanelSnapshot
from ..protocol import PanelCapability
from .accumulator import HomiePropertyAccumulator
Expand Down Expand Up @@ -52,6 +53,8 @@ def __init__(
self._homie: HomieDeviceConsumer | None = None
self._streaming = False
self._snapshot_callbacks: list[Callable[[SpanPanelSnapshot], Awaitable[None]]] = []
self._connection_callbacks: list[Callable[[bool], None]] = []
self._live = False
self._ready_event: asyncio.Event | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._background_tasks: set[asyncio.Task[None]] = set()
Expand Down Expand Up @@ -190,19 +193,51 @@ async def close(self) -> None:
await self._bridge.disconnect()
self._bridge = None
self._accumulator = None
self._live = False

async def ping(self) -> bool:
"""Check if MQTT connection is alive and device is ready."""
if self._bridge is None or self._homie is None:
return False
return self._bridge.is_connected() and self._homie.is_ready()

def register_connection_callback(self, callback: Callable[[bool], None]) -> Callable[[], None]:
"""Subscribe to broker connection state transitions.

Callback fires with False on broker disconnect and True on reconnect.
No synthetic call is made at registration time — callbacks only fire
on real state edges. To check current connection state on registration,
await ping().

Returns an unregister function that removes the callback from the
dispatch list. Calling unregister twice is safe.
"""
self._connection_callbacks.append(callback)

def unregister() -> None:
with contextlib.suppress(ValueError):
self._connection_callbacks.remove(callback)

return unregister

async def get_snapshot(self) -> SpanPanelSnapshot:
"""Return current snapshot from accumulated MQTT state.

No network call — snapshot is built from in-memory property values.
Raises SpanPanelStaleDataError if the client is not fully live.
"Live" means: the bridge is connected AND the Homie accumulator
has reached ready state. Callers can treat SpanPanelStaleDataError
as the canonical "panel currently unreachable" signal.

No network call — snapshot is built from in-memory property values
when the liveness checks pass.
"""
return self._require_homie().build_snapshot()
if self._bridge is None or self._homie is None:
raise SpanPanelStaleDataError("Client not connected — call connect() first")
if not self._bridge.is_connected():
raise SpanPanelStaleDataError("MQTT broker disconnected")
if not self._homie.is_ready():
raise SpanPanelStaleDataError("Homie device not ready")
return self._homie.build_snapshot()

# -- CircuitControlProtocol --------------------------------------------

Expand Down Expand Up @@ -296,15 +331,42 @@ def _on_message(self, topic: str, payload: str) -> None:
self._snapshot_timer = self._loop.call_later(self._snapshot_interval, self._fire_snapshot)

def _on_connection_change(self, connected: bool) -> None:
"""Handle MQTT connection state change (called from asyncio loop)."""
"""Handle MQTT connection state change (called from asyncio loop).

Re-subscribes to the wildcard topic on reconnect (pre-existing
behavior), then fans out an edge-only notification to registered
connection callbacks. Duplicate state transitions are suppressed
so subscribers only see real edges.

On disconnect, any pending snapshot-debounce timer is cancelled
so a stale timer cannot dispatch a post-disconnect snapshot.
"""
# Re-subscribe runs on every connected=True, including duplicates —
# paho may re-emit connected events after session restoration, and
# re-subscribing is broker-benign. Callback fan-out below is
# edge-only (see the guard after this block).
if connected:
_LOGGER.debug("MQTT connection established")
# Re-subscribe on reconnect
if self._bridge is not None:
wildcard = WILDCARD_TOPIC_FMT.format(serial=self._serial_number)
self._bridge.subscribe(wildcard, qos=0)
else:
_LOGGER.debug("MQTT connection lost")
# Cancel any pending snapshot-debounce timer so it cannot
# fire post-disconnect with a stale snapshot.
self._cancel_snapshot_timer()

# Edge-only dispatch
if connected == self._live:
return
self._live = connected

# Iterate a copy — subscribers may unregister during their callback
for cb in list(self._connection_callbacks):
try:
cb(connected)
except Exception: # pylint: disable=broad-exception-caught
_LOGGER.exception("Connection callback raised")

async def _wait_for_circuit_names(self, timeout: float) -> None:
"""Wait for all circuit-like nodes to have a ``name`` property.
Expand Down Expand Up @@ -364,8 +426,24 @@ def set_snapshot_interval(self, interval: float) -> None:
self._cancel_snapshot_timer()

async def _dispatch_snapshot(self) -> None:
"""Build snapshot and send to all registered callbacks."""
snapshot = self._require_homie().build_snapshot()
"""Build snapshot and send to all registered callbacks.

Guarded by the same liveness predicate as get_snapshot() — if the
bridge has disconnected or the Homie device is not ready, no
dispatch occurs. This prevents a pending debounce timer that was
scheduled just before a disconnect from delivering a stale
snapshot to subscribers after the fact.
"""
bridge = self._bridge
homie = self._homie
if bridge is None or not bridge.is_connected() or homie is None or not homie.is_ready():
_LOGGER.debug(
"Skipping stale snapshot dispatch (bridge_connected=%s, homie_ready=%s)",
bridge is not None and bridge.is_connected(),
homie is not None and homie.is_ready(),
)
return
snapshot = homie.build_snapshot()
for cb in list(self._snapshot_callbacks):
try:
await cb(snapshot)
Expand Down
2 changes: 2 additions & 0 deletions src/span_panel_api/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ async def ping(self) -> bool: ...

async def get_snapshot(self) -> SpanPanelSnapshot: ...

def register_connection_callback(self, callback: Callable[[bool], None]) -> Callable[[], None]: ...


@runtime_checkable
class CircuitControlProtocol(Protocol):
Expand Down
23 changes: 23 additions & 0 deletions tests/test_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Tests for the span_panel_api exception hierarchy."""

from __future__ import annotations

from span_panel_api.exceptions import SpanPanelError


def test_stale_data_error_derives_from_span_panel_error() -> None:
from span_panel_api.exceptions import SpanPanelStaleDataError

err = SpanPanelStaleDataError("example")
assert isinstance(err, SpanPanelError)
assert str(err) == "example"


def test_stale_data_error_is_distinct_from_connection_error() -> None:
from span_panel_api.exceptions import (
SpanPanelConnectionError,
SpanPanelStaleDataError,
)

err = SpanPanelStaleDataError("example")
assert not isinstance(err, SpanPanelConnectionError)
Loading