Skip to content

feat(transport): WebRTC DataChannel transport (cloudflare SFU)#2048

Open
spomichter wants to merge 15 commits into
ruthwik/hosted-teleopfrom
feat/webrtc-transport
Open

feat(transport): WebRTC DataChannel transport (cloudflare SFU)#2048
spomichter wants to merge 15 commits into
ruthwik/hosted-teleopfrom
feat/webrtc-transport

Conversation

@spomichter

@spomichter spomichter commented May 11, 2026

Copy link
Copy Markdown
Contributor

Stacked on #2411 (ruthwik/hosted-teleop) — merge that first. This PR turns its session plumbing into a first-class transport.

Problem

DimOS has LCM, SHM, DDS, and ROS transports for local/LAN communication but no transport for internet-scale real-time data (NAT traversal, global edge routing). The hosted teleop system currently requires a dedicated HostedTeleopModule to bridge WebRTC DataChannels to DimOS streams.

Solution

A WebRTC DataChannel transport backed by Cloudflare Realtime, following the same pubsub abstraction as LCMTransport / SHMTransport.

Architecture (dimos/protocol/pubsub/impl/webrtc/)

  • providers/spec.pyProvider protocol (Cloudflare, broker, LiveKit, …), AsyncProviderBase (shared loop-thread lifecycle), and ProviderConfig: a picklable, hashable factory that resolves to a per-process singleton provider. Transports survive pickling into module worker processes, and all transports in a process share one PeerConnection.
  • providers/broker.py — broker-mediated CF Realtime (hosted teleop): heartbeat-driven channel lifecycle incl. operator rejoin (SCTP id changes), receive-only until the broker bridges robot→operator channels.
  • providers/cloudflare.py — direct CF access (pub+sub session loopback pair); used by integration tests and the benchmark.
  • webrtcpubsub.pyWebRTCPubSub(AllPubSub[str, bytes]), passes the standard grid tests in pubsub/test_spec.py.
  • WebRTCTransport[M: DimosMsg] (core/transport.py) — typed LCM encode/decode + wire-fingerprint demux on a multiplexed channel; CloudflareTransport subclass binds BrokerConfig for blueprints:
teleop_hosted_go2_transport = unitree_go2_basic.transports(
    {("cmd_vel", Twist): CloudflareTransport("cmd_unreliable", TwistStamped)}
).global_config(viewer="none")   # registered as: dimos run teleop-hosted-go2-transport

Fingerprints are derived from the wire format, not _get_packed_fingerprint()TwistStamped inherits Twist's fingerprint but encodes as LCM TwistStamped, so class fingerprints would drop every real message (regression-tested).

Benchmark (standard harness, sustained 1s windows, this machine → CF edge)

CF_TELEOP_APP_ID=… CF_TELEOP_APP_SECRET=… DIMOS_BENCH_RECEIVE_TIMEOUT_S=5 \
  pytest -m tool dimos/protocol/pubsub/benchmark/test_benchmark.py -k webrtc
Size Sustained rate Loss
256 B ~995 msg/s 0.0%
1 KiB ~994 msg/s 0.1%
4 KiB (~4 MB/s) ~994 msg/s 0.2%
16 KiB flooded 93% (unreliable channel, offered load ≫ path)
> 16 KiB dropped by CF 100%

CF paces DataChannel forwarding at ~1k msg/s per channel — plenty for teleop command planes (50–100 Hz). Same-host loopback RTT through the CF edge: median 15.6 ms. Benchmark knobs are env-overridable (DIMOS_BENCH_DURATION_S, DIMOS_BENCH_MAX_MESSAGES, DIMOS_BENCH_RECEIVE_TIMEOUT_S) for networked runs.

Breaking changes

None — new optional transport (pip install dimos[webrtc], included in all). dimos/teleop/hosted/ (duplicate twist scaler) is deleted; the hosted blueprints from #2411 are untouched.

How to test

Fast (CI, no credentials, <2s):

pytest dimos/protocol/pubsub/impl/webrtc/test_transport.py dimos/protocol/pubsub/test_spec.py

Covers typed/raw modes, fingerprint demux, pickle round-trip, per-process provider sharing, broker credential validation.

Cloudflare integration (tool marker, ~25s):

pytest -m tool dimos/protocol/pubsub/impl/webrtc/test_webrtcpubsub.py

Blueprint e2e (tool marker, ~17s): deploys two modules through a real ModuleCoordinator, transport pickled into the worker, TwistStamped over live CF:

pytest -m tool dimos/protocol/pubsub/impl/webrtc/test_blueprint_e2e.py

Follow-ups (next phases)

Video track + state channels (clock sync, telemetry) move into the broker provider, then the hosted blueprints switch to transport-only and HostedTeleopModule slims to engagement logic.

Contributor License Agreement

  • I have read and approved the CLA

@spomichter spomichter force-pushed the feat/webrtc-transport branch from e56fd1f to 212450b Compare May 11, 2026 22:44
…e SFU

Implements a new pubsub transport backed by WebRTC DataChannels over
Cloudflare's Realtime SFU. Two new classes in
dimos/protocol/pubsub/impl/webrtcpubsub.py:

- CloudflareSession: manages the WebRTC PeerConnection lifecycle.
  Opens two CF sessions (publisher + subscriber) so a single process
  can do loopback pubsub. Runs aiortc on a dedicated background
  asyncio thread with its own ThreadPoolExecutor (so we don't leak
  asyncio_N worker threads). Uses negotiated=True placeholder DCs
  with id=100 during transport establishment to avoid stream-id
  collisions with CF-assigned ids.

- WebRTCPubSub: bytes-on-the-wire pubsub facade matching the
  LCMPubSubBase / BytesSharedMemory interface (string topics, bytes
  payloads). Lazily creates pub/sub DataChannel pairs on first
  publish/subscribe per topic.

Also adds:
- WebRTCTransport in dimos/core/transport.py (mirrors LCMTransport
  pattern, no encoding - bytes only).
- WebRTC benchmark testcase in
  dimos/protocol/pubsub/benchmark/testdata.py, gated on aiortc +
  CF_TELEOP_APP_ID / CF_TELEOP_APP_SECRET env vars.
- Integration test in
  dimos/protocol/pubsub/impl/test_webrtcpubsub.py covering basic
  pub/sub, latency, and throughput (all live tests skip without CF
  credentials).
- aiortc + httpx as new 'webrtc' optional extra in pyproject.toml.

Live benchmark (us-east-2 -> CF edge):
- 64-256B:  ~10K msgs/s, 0% loss
- 1KiB:     ~7K msgs/s, 0% loss
- >= 64KiB: dropped (above SCTP message size)
- Median single-RTT: ~2.5 ms
…sport

- Add BrokerProvider: DataChannelProvider that works through the hosted
  teleop broker (dimensional-teleop) instead of directly with CF credentials.
  Handles session registration, heartbeat loop, and DataChannel creation
  when an operator joins via the broker's bridge-datachannel API.

- Extend WebRTCTransport with optional msg_type parameter for typed LCM
  encode/decode with fingerprint-based filtering. Multiple transports can
  share a single multiplexed DataChannel and each receives only its type.

- Add hosted teleop blueprints (dimos/teleop/hosted/) demonstrating the
  module-free architecture: make_teleop_hosted_go2() uses pure transport
  (zero modules), make_teleop_hosted_go2_scaled() adds a thin
  TeleopScalerModule for speed scaling only.

- Add unit tests for typed mode, fingerprint filtering, multiplexed
  dispatch, and BrokerProvider credential validation.
- Rebase on main and regenerate uv.lock (resolve conflict)
- Add _LoopbackProvider (in-process, no network) to benchmark testdata
- Enables local WebRTC transport benchmarking without CF credentials
- All 12 message sizes pass locally (2.78s total)
@spomichter spomichter force-pushed the feat/webrtc-transport branch from 212450b to 2c263eb Compare May 19, 2026 23:30
@codecov

codecov Bot commented May 19, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1849 1 1848 151
View the top 1 failed test(s) by shortest run time
dimos.project.test_no_init_files::test_no_init_files
Stack Traces | 0.017s run time
def test_no_init_files():
        dimos_dir = DIMOS_PROJECT_ROOT / "dimos"
        init_files = sorted(dimos_dir.rglob("__init__.py"))
        # The root dimos/__init__.py is allowed for the porcelain lazy import.
        init_files = [f for f in init_files if f != dimos_dir / "__init__.py"]
        if init_files:
            listing = "\n".join(f"  - {f.relative_to(dimos_dir)}" for f in init_files)
>           raise AssertionError(
                f"Found __init__.py files in dimos/:\n{listing}\n\n"
                "__init__.py files are not allowed because they lead to unnecessary "
                "extraneous imports. Everything should be imported straight from the "
                "source module."
            )
E           AssertionError: Found __init__.py files in dimos/:
E             - .../impl/webrtc/__init__.py
E             - .../webrtc/providers/__init__.py
E           
E           __init__.py files are not allowed because they lead to unnecessary extraneous imports. Everything should be imported straight from the source module.

dimos_dir  = PosixPath('.../dimos/dimos/dimos')
init_files = [PosixPath('.../dimos/dimos/dimos/.../impl/webrtc/__init__.py'), PosixPath('.../dimos/dimos/dimos/.../webrtc/providers/__init__.py')]
listing    = '  - .../impl/webrtc/__init__.py\n  - .../webrtc/providers/__init__.py'

dimos/project/test_no_init_files.py:25: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@spomichter spomichter marked this pull request as ready for review May 19, 2026 23:51
@greptile-apps

greptile-apps Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a WebRTC DataChannel transport (WebRTCTransport) backed by Cloudflare Realtime, following the same pubsub abstraction as LCMTransport/SHMTransport. It fixes several issues flagged in the prior review round (pickling, stop() lifecycle race, channel-name collisions, zombie threads, publish() silence).

  • WebRTCTransport/CloudflareTransport (core/transport.py): typed LCM encode/decode with wire-fingerprint demux; __reduce__ now correctly delegates reconstruction to _rebuild_webrtc_transport so pickling into worker processes preserves msg_type and config without double-constructing the provider.
  • AsyncProviderBase (providers/spec.py): shared daemon-loop lifecycle; start() now calls _teardown() on any _connect() failure, clearing _thread/_loop/_stop_ev so retries start clean; _started is set to False at the top of stop(), eliminating the publish/stop race.
  • providers/cloudflare.py and providers/broker.py: _dc_name adds a SHA1 suffix to sanitized names, preventing topic-collision; _ensure_pub/_ensure_sub are both serialised behind _channel_lock; BrokerProvider.publish() now raises NotImplementedError immediately instead of silently logging a warning.

Confidence Score: 5/5

Safe to merge; all the blocking issues from the prior review round have been addressed in this revision.

The previous round surfaced a cluster of defects — double-constructing provider on pickle, silent publish no-op, lifecycle races in start/stop, zombie loop threads, and channel-name collisions. The current code resolves each one: _rebuild_webrtc_transport prevents double construction, BrokerProvider.publish() raises NotImplementedError, AsyncProviderBase sets _started=False at the top of stop() and tears down the loop thread on _connect() failure, and _dc_name appends a SHA1 suffix to prevent collisions. The only new finding is a narrow first-call race in WebRTCTransport.start() that can orphan subscribe_all callbacks — an edge case since subscribe_all is rarely used and the underlying provider is a singleton.

dimos/core/transport.py (WebRTCTransport.start() lazy-init guard) and dimos/protocol/pubsub/impl/webrtc/webrtcpubsub.py (subscribe_all N×M delivery, noted in prior review).

Important Files Changed

Filename Overview
dimos/core/transport.py Adds WebRTCTransport and CloudflareTransport with typed LCM encode/decode and wire-fingerprint demux; pickling fixed via _rebuild_webrtc_transport; lazy start() has a narrow thread-safety gap.
dimos/protocol/pubsub/impl/webrtc/providers/spec.py AsyncProviderBase correctly tears down the event-loop thread on _connect() failure and sets _started=False at the top of stop(); per-process singleton registry is thread-safe.
dimos/protocol/pubsub/impl/webrtc/providers/cloudflare.py _dc_name adds SHA1 suffix to prevent topic-name collisions; _ensure_pub/_ensure_sub both serialised under _channel_lock; publish/stop lifecycle race fixed by base class.
dimos/protocol/pubsub/impl/webrtc/providers/broker.py publish() now raises NotImplementedError immediately; heartbeat lifecycle is correct; _open_cmd_channel is synchronous so the previous wait_open timeout/duplicate-channel issue no longer applies.
dimos/protocol/pubsub/impl/webrtc/webrtcpubsub.py subscribe_all fires N times per message when N topic subscriptions exist (previously flagged); otherwise sound AllPubSub wrapper over any Provider.
dimos/teleop/quest_hosted/blueprints.py New teleop_hosted_go2_transport blueprint using CloudflareTransport; receive-only usage is correct given BrokerProvider.publish() raises.

Sequence Diagram

sequenceDiagram
    participant MT as Module Thread
    participant WRT as WebRTCTransport
    participant WPS as WebRTCPubSub
    participant PC as ProviderConfig
    participant P as Provider (singleton)
    participant CF as Cloudflare Realtime

    MT->>WRT: subscribe(callback)
    WRT->>WRT: start() [lazy]
    WRT->>PC: config.provider()
    PC-->>WRT: Provider singleton (per-process)
    WRT->>WPS: WebRTCPubSub(provider)
    WRT->>WPS: start()
    WPS->>P: start()
    P->>CF: POST /sessions/new (pub + sub)
    P->>CF: WebRTC ICE/DTLS handshake
    CF-->>P: connected
    WRT->>WPS: subscribe(topic, _typed_cb)
    WPS->>P: subscribe(topic, _wrapped)
    P->>CF: POST /datachannels/new (sub)
    CF-->>P: DataChannel open

    MT->>WRT: broadcast(msg)
    WRT->>WPS: publish(topic, lcm_encoded)
    WPS->>P: publish(topic, data)
    P->>CF: DataChannel.send(data)
    CF-->>P: message event (loopback/broker)
    P->>WPS: _wrapped(data, topic)
    WPS->>WRT: _typed_cb(data, topic)
    WRT->>WRT: fingerprint check → lcm_decode
    WRT->>MT: callback(typed_msg)
Loading

Reviews (13): Last reviewed commit: "docs(webrtc): mark CloudflareProvider as..." | Re-trigger Greptile

Comment thread dimos/teleop/hosted/blueprints.py Outdated
Comment thread dimos/protocol/pubsub/impl/webrtc_providers/cloudflare.py Outdated
Comment thread dimos/protocol/pubsub/impl/webrtc_providers/broker.py Outdated
Comment thread dimos/protocol/pubsub/impl/webrtc_providers/cloudflare.py Outdated
The previous lock regen dropped the `exclude-newer-span` marker, leaving
only the frozen `exclude-newer` timestamp. uv then treats every resolve as
"cooldown was newly added" and forces a re-resolve against today minus 7
days — which currently excludes md-babel-py 1.2.0 (published 2026-05-15)
and breaks `uv sync --extra all` / `uv lock`.

Re-adding the span line tells uv the lock was generated with P7D
semantics, so the existing pinned versions are honored.
Comment thread dimos/core/transport.py Outdated
- Remove __init__.py files (project policy: no init files)
- Remove section markers from test_webrtcpubsub.py
- Regenerate all_blueprints.py (adds TeleopScalerModule)
- Fix WebRTCTransport.__reduce__ to preserve msg_type across pickle
- Fix CloudflareProvider.publish() race: snapshot loop ref before use
- Fix CloudflareProvider.subscribe() race: check sub_channels inside lock
- Add comment clarifying TwistStamped→Twist type safety in blueprint
Comment thread dimos/core/transport.py Outdated
Comment on lines +382 to +390
def __reduce__(self): # type: ignore[no-untyped-def]
# Provider cannot be pickled (holds sockets/threads); on unpickle
# a new provider is created from env vars. Preserve msg_type so
# typed fingerprint filtering survives multiprocessing.
return (WebRTCTransport, (self.topic,), {"msg_type": self._msg_type})

def __setstate__(self, state: dict) -> None: # type: ignore[no-untyped-def]
msg_type = state.get("msg_type")
self.__init__(self.topic, msg_type=msg_type) # type: ignore[misc]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 __reduce__ double-construction breaks pickling when env vars are absent

Python's pickle protocol for __reduce__ returning a 3-tuple (callable, args, state) first calls callable(*args) — i.e. WebRTCTransport(self.topic) with no provider — before calling __setstate__. That intermediate constructor call immediately tries to build a CloudflareProvider from env vars. If CF_TELEOP_APP_ID / CF_TELEOP_APP_SECRET are not set in the worker process (e.g. the transport was originally built with an explicit BrokerProvider), the reconstruction raises RuntimeError("CF_TELEOP_APP_ID and CF_TELEOP_APP_SECRET required") before __setstate__ is ever reached. When the env vars ARE set, two CloudflareProvider instances are constructed: the first (from the (callable, args) step) is immediately orphaned when __setstate__ creates a second one. The fix is to delegate reconstruction to a module-level helper so __init__ is called exactly once.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. This line calls __init__ a second time. So it constructs two CloudflareProvider classes.

@spomichter spomichter changed the title feat(transport): WebRTC DataChannel pubsub via Cloudflare Realtime SFU feat(transport): WebRTC DataChannel transport (support for cloudflare SFU) May 20, 2026
- Add return type annotations to CloudflareProvider event handlers
- Fix type: ignore codes to match actual mypy errors (attr-defined)
- Add type annotation to __setstate__ dict parameter
- Add type: ignore[arg-type] for WebRTCPubSub→PubSub duck typing in benchmark
- Remove TwistStamped subclass comment from blueprint
@spomichter spomichter changed the title feat(transport): WebRTC DataChannel transport (support for cloudflare SFU) feat(transport): WebRTC DataChannel transport (cloudflare SFU) May 20, 2026
@spomichter spomichter force-pushed the feat/webrtc-transport branch from 46028dc to 3baba45 Compare May 20, 2026 05:57
Comment on lines +391 to +393
ch = self._pub_channels.get(topic)
if ch is None:
ch = self._run_sync(self._ensure_pub(topic))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 publish() checks _pub_channels and lazily creates the pub DataChannel with no lock, so two concurrent first-publish calls to the same new topic both see ch = None, dispatch two _ensure_pub coroutines to the event loop, and both pass _ensure_pub's own if topic in self._pub_channels guard before either coroutine stores the channel (they interleave during the HTTP await). The result is two CF REST calls for the same channel name, a race to overwrite _pub_channels[topic], and one thread holding a stale/orphaned channel reference that routes messages to the wrong (or duplicate) CF DataChannel. subscribe() already applies a partial guard by reading topic not in self._sub_channels inside self._lock — the same pattern should be used here.

Suggested change
ch = self._pub_channels.get(topic)
if ch is None:
ch = self._run_sync(self._ensure_pub(topic))
with self._lock:
ch = self._pub_channels.get(topic)
needs_pub = ch is None
if needs_pub:
ch = self._run_sync(self._ensure_pub(topic))

The throughput benchmark blasts 5000 msgs/sec which overflows WebRTC
DataChannel SCTP buffers. WebRTC is a low-rate control transport
(50-80Hz teleop), not a bulk pipe.

WebRTC transport is still tested by:
- test_webrtc_transport.py (unit tests, mock provider, <1s)
- test_webrtcpubsub.py (CF integration, skipped without creds)
test_webrtcpubsub.py talks to live Cloudflare SFU (~68s) and requires
CF_TELEOP_APP_ID + CF_TELEOP_APP_SECRET. Tag with pytestmark = tool
so they're excluded by CI's default -m 'not (tool or self_hosted ...)'
filter. Run locally with: pytest -m tool
Comment on lines +104 to +107
self._external_pub_id = publisher_session_id
self._ordered = ordered
self._max_retransmits = max_retransmits

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 DataChannel name collision via _sanitize_topic

_sanitize_topic replaces every non-alphanumeric/underscore/dash character with _. This means two distinct topic names that differ only in those characters — e.g., "cmd/vel" and "cmd_vel" — both produce "pub_cmd_vel" as the CF DataChannel name. When _ensure_pub is then called for the second topic, it issues a second POST /datachannels/new with the same dataChannelName in the same CF session. CF either rejects this with an error (the assert fires) or creates a duplicate channel, silently mixing or dropping messages between the two topics. The collision is invisible at the Python dict level because _pub_channels is keyed by the original topic string, not the sanitized name — so neither guard in _ensure_pub catches it.

Delete test_import and test_sanitize_dc_name (no useful coverage).
Move pytestmark from module-level to individual test functions.
@spomichter

Copy link
Copy Markdown
Contributor Author

@greptile review

Comment on lines +286 to +297
await self._http.delete(
f"{self._broker_url}/api/v1/sessions/{self._session_id}",
headers=self._headers,
)
except Exception:
pass # Best-effort cleanup

if self._pc:
await self._pc.close()
self._pc = None
if self._http:
await self._http.aclose()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Zombie event-loop thread on start() failure

If _run_sync(self._connect()) raises (broker unreachable, ICE timeout, network error), self._started stays False but self._thread is still alive — blocked forever on _stop_ev.wait(). A subsequent start() call sees _started == False, creates a fresh thread, and overwrites self._loop/self._stop_ev, orphaning the old thread. Every failed start leaks one daemon thread and its event loop. The same pattern exists in CloudflareProvider.start() (~line 693–702).

@dimensionalOS dimensionalOS deleted a comment from greptile-apps Bot May 20, 2026
Comment on lines +138 to +147
def start(self) -> None:
with self._lock:
if self._started:
return
self._thread = threading.Thread(target=self._run_loop, daemon=True, name="cf-webrtc")
self._thread.start()
if not self._ready.wait(timeout=5.0):
raise RuntimeError("CF event loop failed to start")
self._run_sync(self._connect())
self._started = True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 State corruption on start() retry after a failed _connect()

If _run_sync(self._connect()) raises (e.g., network error, CF 5xx, ICE timeout), self._started stays False but self._thread, self._loop, and self._ready remain in a partially-initialized state. The next start() call then:

  1. Creates a new Thread2 and sets self._thread.
  2. self._ready.wait(5.0) returns immediately because the event was already set by Thread1 — it is never cleared on a failed start.
  3. _run_sync(self._connect()) fires before Thread2 has had a chance to run, so self._loop might still point to loop1 (Thread1's loop, which is still alive). If _connect() succeeds this time, self._started = True, but when Thread2 eventually overwrites self._loop = loop2, every subsequent publish() / subscribe() routes coroutines to loop2 while all channel state was built on loop1. This is silent state corruption.

The fix is to reset self._ready, self._loop, and self._thread in the exception path of start(), or (better) to call stop() in the exception handler so the cleanup path is unified.

The same pattern exists in BrokerProvider.start() at line 166 for identical reasons.

Comment thread dimos/core/transport.py Outdated
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
self.start()
if self._msg_type is not None and hasattr(msg, "lcm_encode"):

@paul-nechifor paul-nechifor May 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggests that you shouldn't use msg_type: type[T] | None, but a bound type var. If that's done, then there's no need for hasattr(msg, "lcm_encode") because we'll know at static analysis time that a particular type cannot be used.

from dimos.msgs.protocol import DimosMsg
M = TypeVar("M", bound=DimosMsg)

And then change __init__ to take msg_type: type[M] | None.

Comment thread dimos/core/transport.py Outdated

def _typed_cb(data: bytes, _topic: str) -> None:
if len(data) >= 8 and data[:8] == fp:
callback(msg_type.lcm_decode(data)) # type: ignore[attr-defined]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
callback(msg_type.lcm_decode(data)) # type: ignore[attr-defined]
callback(msg_type.lcm_decode(data))

Not needed once you do the DimosMsg comment from above.

Comment thread dimos/core/transport.py Outdated

def __setstate__(self, state: dict[str, Any]) -> None:
msg_type = state.get("msg_type")
self.__init__(self.topic, msg_type=msg_type) # type: ignore[misc]

@paul-nechifor paul-nechifor May 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling __init__ is quite odd, especially since by this point it has already been called. I guess the reason you have to do this is because __reduce__ only allows restoring through the args and not kwargs.
The better way is to use a factory function. No need for __setstate__ at all:

    @classmethod
    def _from_pickle(cls, topic: str, msg_type: type[M] | None) -> "WebRTCTransport[M]":
        return cls(topic, msg_type=msg_type)

    def __reduce__(
        self,
    ) -> tuple[Callable[[str, type[M] | None], "WebRTCTransport[M]"], tuple[str, type[M] | None]]:
        return (self.__class__._from_pickle, (self.topic, self._msg_type))

Comment thread dimos/core/transport.py Outdated
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg))


class WebRTCTransport(PubSubTransport[T]):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this intended to be used?

What concerns me is that by having one Transport per field, we're wasting a lot of resources. It might make sense for LCM which is more lightweight, but CloudflareProvider is quite a heavy thing, intended to be used to transport multiple channels.

Is this just meant to be used for 1 or 2 In/Out fields? Because if it's used for whole modules, then it would be quite wasteful.

(I mentioned this before. The root issue is that we don't have we don't usually maintain relations between objects in memory. If we had a root TransportProvider object, that object could maintain active transports and share resources which are common (so different transports could share the same CloudflareProvider) and it could also gracefully shutdown things when they're no longer needed.)

@leshy leshy May 21, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK right now and can think more on this, but basically individual pubsub implementations can maintain their own global registry if needed and more efficient

we had an LCM implementation which shared the single cpp LCM instance previously. implemnetations can own this, so we don't have to worry on user end


@property
def is_connected(self) -> bool:
return self._started

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs with self._lock because it's also updated with the lock.

def _on_msg(payload: Any) -> None:
if isinstance(payload, str):
payload = payload.encode()
for cb in list(cbs.get(topic, ())):

@paul-nechifor paul-nechifor May 21, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cbs is modified under a lock so you need to use a lock to read the topics here too.

You're doing list(topics) here to avoid looping over a list that could be altered from a different thread. Although this works, it relies on the fact that list is a C code which holds the GIL. In some versions of Python the GIL is removed and this could fail. So both the .get and the list duplication need to be done under a lock.

# ─── Public API (DataChannelProvider) ────────────────────────────

def publish(self, topic: str, data: bytes) -> None:
if not self._started:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need thread-safe access to self._started.

logger = setup_logger()


class DataChannelProvider(ABC):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all the methods are abstract, DataChannelProvider could work better as a Protocol.

Comment thread dimos/teleop/hosted/blueprints.py Outdated
Comment thread dimos/teleop/hosted/blueprints.py Outdated
Comment thread dimos/teleop/hosted/blueprints.py Outdated
Comment on lines +50 to +53
skip_unless_cf = pytest.mark.skipif(
not (WEBRTC_AVAILABLE and CF_CREDS_PRESENT),
reason="Requires aiortc + CF_TELEOP_APP_ID/CF_TELEOP_APP_SECRET",
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think tests should check imports.

Checking imports makes sense for users so that we support smaller install times.

But for developers, tests should fail, not "silently pass" if you're missing dependencies.

There's the potential that a person is writing a feature, breaks these tests, but he doesn't know because he doesn't have the necessary packages installed. The same thing can happen in CI, where tests "pass" only because we forgot to install the dependencies. (If I'm not mistaken, this is already the case since the webrtc group has not been included in all.)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda, but we'll also need to help downstream packagers with running tests etc. Many of these packagers will not be able to use API keys and may run the tests with no network access. So, having a simple way to skip tests that have such dependencies makes sense.

I think it makes sense for a default pytest run to pass without additional setup needed. Then we need a way to ensure our team is running the extra tests..

Comment thread pyproject.toml
"cyclonedds>=0.10.5",
]

webrtc = [

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be included in all, below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import threading
from typing import Any

from dimos.protocol.pubsub.impl.webrtcpubsub import DataChannelProvider

@leshy leshy May 21, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put this DataChannelProvider in webrtc_providers/spec.py

@pytest.mark.tool
@skip_unless_cf
@pytest.mark.timeout(60)
def test_basic_pub_sub(pubsub: WebRTCPubSub) -> None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main tests to pass for pubsub are standardized grid tests in pubsub/spec.py - these check that you actually behave like other pubsubs

and you have a benchmark which is good for comparison, at pubsub/benchmark
python -m pytest -svk "not bytes and not udp" -m tool dimos/protocol/pubsub/benchmark/test_benchmark.py

@pytest.mark.tool
@skip_unless_cf
@pytest.mark.timeout(60)
def test_latency(pubsub: WebRTCPubSub) -> None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def for a benchmark no need to diy this


logger = setup_logger()


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not name this Provider and move it to webrtc_providers/spec.py

even better, host your stuff in your dir (because webrtc is complex)

protocol/pubsub/impl/webrtc
protocol/pubsub/impl/webrtc/webrtcpubsub.py
protocol/pubsub/impl/webrtc/test_webrtcpubsub.py
protocol/pubsub/impl/webrtc/providers
protocol/pubsub/impl/webrtc/providers/spec.py
protocol/pubsub/impl/webrtc/providers/cloudflare

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done moved into this format

Comment thread dimos/protocol/pubsub/impl/webrtcpubsub.py Outdated

# Start heartbeat loop
assert self._loop is not None
self._loop.create_task(self._heartbeat_loop())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really hold onto this task and on shutdown do:

t.cancel()
with suppress(asyncio.CancelledError):
    await t

Or use aiojobs (in which case the exception log inside the loop shouldn't be needed anymore).



@pytest.fixture
def pubsub() -> Generator[WebRTCPubSub, None, None]:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really need to put this in CLAUDE.md...

Suggested change
def pubsub() -> Generator[WebRTCPubSub, None, None]:
def pubsub() -> Iterator[WebRTCPubSub]:

- WebRTCPubSub now extends AllPubSub[str, bytes] from the pubsub spec,
  making it a first-class DimOS pubsub (same as LCM, SHM, Redis)
- DataChannelProvider changed from ABC to Protocol (per review feedback)
- Implements subscribe_all via fan-out on per-topic subscriptions
- Added WebRTCPubSub to the standardized grid tests in test_spec.py
  using MockProvider (no network, runs in CI)
- Enables encoder mixin composition (LCMEncoderMixin, PickleEncoderMixin)
- Gains free sugar methods: sub(), aiter(), queue()
…ew/webrtc-transport

# Conflicts:
#	dimos/robot/all_blueprints.py
#	pyproject.toml
#	uv.lock
…le provider configs

- Move webrtcpubsub + providers into protocol/pubsub/impl/webrtc/ with
  providers/spec.py (Provider protocol, ProviderConfig, AsyncProviderBase)
- ProviderConfig: picklable, hashable factory resolving to a per-process
  singleton provider — transports survive pickling into module workers and
  share one PeerConnection per process
- WebRTCTransport rebuilt on DimosMsg-bound typevar; CloudflareTransport
  subclass binds BrokerConfig for blueprint use
- Fingerprint filter now derives from the wire format (TwistStamped inherits
  Twist's fingerprint but encodes as LCM TwistStamped)
- BrokerProvider: operator rejoin via SCTP id tracking, heartbeat task held
  and cancelled on disconnect, X-Robot-API-Key auth, id=0 throwaway channel,
  publish() raises (broker is receive-only for now)
- CloudflareProvider: locking discipline, asyncio channel-creation lock,
  collision-safe DC names
- Benchmark: WebRTC case in the standard harness, env-overridable knobs
  (DIMOS_BENCH_DURATION_S / _MAX_MESSAGES / _RECEIVE_TIMEOUT_S)
- teleop-hosted-go2-transport: transport-only go2 blueprint (3 lines)
- Delete dimos/teleop/hosted (duplicate scaler), add webrtc extra to all
@spomichter spomichter requested a review from mustafab0 as a code owner June 10, 2026 10:00
@spomichter spomichter changed the base branch from main to ruthwik/hosted-teleop June 10, 2026 10:00
@spomichter spomichter requested a review from mustafab0 as a code owner June 10, 2026 10:00
Comment on lines +70 to +82
def subscribe(self, topic: str, callback: Callable[[bytes, str], None]) -> Callable[[], None]:
if not self._started:
self.start()

def _wrapped(data: bytes, t: str) -> None:
callback(data, t)
for all_cb in list(self._all_callbacks):
try:
all_cb(data, t)
except Exception:
logger.exception("subscribe_all callback error")

return self._provider.subscribe(topic, _wrapped)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 subscribe_all fires N times per message when N topic subscriptions exist

Each subscribe() call wraps the callback in _wrapped, which calls every entry in _all_callbacks on delivery. With N subscriptions on the same topic (e.g., two typed transports both subscribing to "cmd_unreliable" on the same WebRTCPubSub instance, or the test_multiple_subscribers spec test), each inbound message triggers N _wrapped closures, and each one fires all _all_callbacks — so a single message causes every subscribe_all callback to execute N times. The AllPubSub contract requires each subscribe_all callback to receive each message exactly once.

return CloudflareProvider(self)


class CloudflareProvider(AsyncProviderBase):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify this is for TEST AND BENCHMARK only. in production this cloudflare provider sits in Ec2 in the Teleop server. The broker.py communicates with the server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants