Skip to content

Integrate Zenoh#2362

Open
paul-nechifor wants to merge 2 commits into
mainfrom
paul/feat-integrate-zenoh
Open

Integrate Zenoh#2362
paul-nechifor wants to merge 2 commits into
mainfrom
paul/feat-integrate-zenoh

Conversation

@paul-nechifor

Copy link
Copy Markdown
Contributor

Problem

Closes DIM-XXX

Solution

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@paul-nechifor paul-nechifor marked this pull request as draft June 5, 2026 02:33
@codecov

codecov Bot commented Jun 5, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2040 1 2039 29
View the top 1 failed test(s) by shortest run time
dimos.protocol.pubsub.impl.test_zenohpubsub.TestZenohPubSubBase::test_publish_and_subscribe
Stack Traces | 0.506s run time
request = <SubRequest 'monitor_threads' for <Function test_publish_and_subscribe>>

    @pytest.fixture(autouse=True)
    def monitor_threads(request):
        # Capture threads before test runs
        test_name = request.node.nodeid
        with _seen_threads_lock:
            _before_test_threads[test_name] = {
                t.ident for t in threading.enumerate() if t.ident is not None
            }
    
        yield
    
        with _seen_threads_lock:
            before = _before_test_threads.get(test_name, set())
            current = {t.ident for t in threading.enumerate() if t.ident is not None}
    
            # New threads are ones that exist now but didn't exist before this test
            new_thread_ids = current - before
    
            if not new_thread_ids:
                return
    
            # Get the actual thread objects for new threads
            new_threads = [
                t for t in threading.enumerate() if t.ident in new_thread_ids and t.name != "MainThread"
            ]
    
            # Filter out expected persistent threads that are shared globally
            # These threads are intentionally left running and cleaned up on process exit
            expected_persistent_thread_prefixes = [
                "Dask-Offload",
                # HuggingFace safetensors conversion thread - no user cleanup API
                # https://github..../transformers/issues/29513
                "Thread-auto_conversion",
            ]
            new_threads = [
                t
                for t in new_threads
                if not any(t.name.startswith(prefix) for prefix in expected_persistent_thread_prefixes)
            ]
    
            # Filter out threads we've already seen (from previous tests)
            truly_new = [t for t in new_threads if t.ident not in _seen_threads]
    
            # Mark all new threads as seen
            for t in new_threads:
                if t.ident is not None:
                    _seen_threads.add(t.ident)
    
            if not truly_new:
                return
    
            thread_names = [t.name for t in truly_new]
    
>           pytest.fail(
                f"Non-closed threads created during this test. Thread names: {thread_names}. "
                "Please look at the first test that fails and fix that."
            )
E           Failed: Non-closed threads created during this test. Thread names: ['Thread-368 (pyo3-closure)']. Please look at the first test that fails and fix that.

before     = {140441454372544, 140451535245632}
current    = {140441454372544, 140441471153856, 140451535245632}
expected_persistent_thread_prefixes = ['Dask-Offload', 'Thread-auto_conversion']
new_thread_ids = {140441471153856}
new_threads = [<Thread(Thread-368 (pyo3-closure), stopped 140441471153856)>]
request    = <SubRequest 'monitor_threads' for <Function test_publish_and_subscribe>>
t          = <Thread(Thread-368 (pyo3-closure), stopped 140441471153856)>
test_name  = '.../pubsub/impl/test_zenohpubsub.py::TestZenohPubSubBase::test_publish_and_subscribe'
thread_names = ['Thread-368 (pyo3-closure)']
truly_new  = [<Thread(Thread-368 (pyo3-closure), stopped 140441471153856)>]

dimos/conftest.py:269: Failed

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from b66e833 to f8d2d42 Compare June 5, 2026 02:34
@greptile-apps

greptile-apps Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR integrates Zenoh as a first-class transport backend alongside LCM, selectable via DIMOS_TRANSPORT env var or .env (defaulting to Zenoh on macOS, LCM elsewhere). It adds ZenohPubSubBase, typed Zenoh/PickleZenoh pub-sub classes, a shared ZenohSessionPool, a ZenohService base, ZenohRPC, ZenohTF, and a transport_factory.py that routes pub-sub/RPC/TF construction to the active backend.

  • New ZenohPubSubBase with per-key QoS rules, wildcard subscribe_all with a drain thread for best-effort delivery, and cached publisher declarations backed by a shared session pool.
  • transport_factory.make_transport / rpc_backend / tf_backend replace all direct LCMTransport/pLCMTransport construction in CLIs, the module coordinator, and the Rerun bridge, with a transport-coercion path so explicitly-mapped blueprint transports also follow the global switch.
  • RerunBridgeModule now listens on both Zenoh and LCM when the active transport is Zenoh (to capture TF frames still on LCM), resolved lazily via _resolve_pubsubs.

Confidence Score: 4/5

Safe to merge for Zenoh-transport users; the DDS transport path has a lock that was accidentally removed.

The new Zenoh stack is well-designed and thoroughly tested. One concern stands out: DDSTransport.broadcast() and DDSTransport.subscribe() had their _start_lock guard removed in this PR to match the new ZenohTransport pattern, but DDS and Zenoh don't share the same session-lifetime invariant — dds.stop() can tear down the connection while a concurrent broadcast is mid-publish, which was exactly what the original lock prevented. All other changes are clean.

dimos/core/transport.py — the DDSTransport broadcast/subscribe lock removal deserves a second look before merging.

Important Files Changed

Filename Overview
dimos/core/transport.py Adds ZenohTransport and pZenohTransport; incidentally removes _start_lock from DDSTransport.broadcast/subscribe, breaking the atomic start-and-publish guarantee against concurrent stop().
dimos/core/transport_factory.py New backend-agnostic factory for pub-sub, RPC, and TF construction; transport_topic() strips leading '/' from LCM paths before prepending 'dimos/', which produces a double prefix for topics already named '/dimos/...'.
dimos/protocol/pubsub/impl/zenohpubsub.py Correct Zenoh pub-sub with QoS resolution, typed/pickle variants, and a drain-thread subscribe_all; logic is sound and well-tested.
dimos/protocol/service/zenohservice.py ZenohSessionPool with close_all() and ZenohService base; a previous review noted no atexit cleanup for long-running processes.
dimos/core/global_config.py Adds transport field (platform-default), zenoh_qos tuple, and validate_assignment=True; clean addition with alias support for env vars and CLI.
dimos/visualization/rerun/bridge.py Adds dual-backend subscribe_all resolution and Zenoh key-expr stripping for entity paths; a prior review flagged the [LCM()] legacy sentinel ambiguity.
dimos/core/coordination/module_coordinator.py Routes _get_transport_for through make_transport and adds _coerce_transport_to_backend to rewrite blueprint-pinned LCM/Zenoh transports when the global backend differs.
dimos/protocol/rpc/pubsubrpc.py Adds ZenohRPC with correct dimos/rpc/ prefixed topic generation and no leading-slash guard; matches ZenohQoS rule for reliable+block delivery.
dimos/protocol/pubsub/impl/zenohqos.py Clean QoS model with sensible defaults: reliable+block for RPC/agent channels, best-effort+drop for high-rate sensor streams.

Sequence Diagram

sequenceDiagram
    participant CLI as dimos CLI / humancli / dtop
    participant TF as transport_factory
    participant GC as GlobalConfig (DIMOS_TRANSPORT)
    participant ZT as ZenohTransport / pZenohTransport
    participant ZSP as ZenohSessionPool
    participant ZPS as ZenohPubSubBase
    participant LCMT as LCMTransport / pLCMTransport

    CLI->>GC: apply_transport_arg(argv)
    CLI->>TF: make_transport(name, msg_type)
    TF->>GC: read transport (zenoh or lcm)
    alt "transport == zenoh"
        TF->>ZT: ZenohTransport(topic, type)
        ZT->>ZPS: "Zenoh(**kwargs)"
        ZPS->>ZSP: acquire(ZenohConfig)
        ZSP-->>ZPS: shared zenoh.Session
    else "transport == lcm"
        TF->>LCMT: LCMTransport(topic, type)
    end

    CLI->>ZT: broadcast(msg)
    ZT->>ZPS: publish(topic, lcm_encoded_bytes)
    ZPS->>ZPS: _get_publisher(key_expr) cached
    ZPS-->>CLI: published

    CLI->>ZT: subscribe(callback)
    ZT->>ZPS: subscribe(topic, wrapper)
    ZPS->>ZSP: session.declare_subscriber(key_expr)
    ZSP-->>CLI: unsubscribe fn
Loading

Reviews (2): Last reviewed commit: "Merge branch 'main' into paul/feat-integ..." | Re-trigger Greptile

Comment thread dimos/protocol/service/zenohservice.py Outdated
Comment on lines +70 to +72
def stop(self) -> None:
"""Stop the Zenoh service — does NOT close the shared session."""
super().stop()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 No production path to close stale sessions

stop() deliberately skips closing the shared session, but this means entries in _sessions accumulate for the lifetime of the process with no cleanup hook. In a long-running robot process that restarts the bridge or changes the Zenoh config mid-run, each distinct ZenohConfig.session_key opens a new session that is never closed. Test fixtures work around this by calling session.close() + _sessions.clear() manually, but there is no equivalent atexit/finalizer for production paths. Consider registering an atexit handler or exposing a close_all_sessions() utility so callers have a clean teardown path without needing to reach into module internals.

Comment on lines +150 to +161
undeclared = False

def unsubscribe() -> None:
nonlocal undeclared
if undeclared:
return
undeclared = True
with self._subscriber_lock:
if sub not in self._subscribers:
return # Already removed by stop() — stop() owns the undeclare
self._subscribers.remove(sub)
sub.undeclare() # type: ignore[no-untyped-call]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Non-atomic idempotency guard on unsubscribe()

The undeclared bool is read and written without a lock, so two threads can both pass if undeclared: before either sets it to True. The behavior is ultimately safe because the subsequent with self._subscriber_lock: check prevents double-undeclare() (the second caller sees sub already removed and returns), but the undeclared flag never achieves its stated purpose of avoiding the lock contention — both callers still acquire the lock. Replacing the flag with a threading.Lock acquired-once pattern (or threading.Event) would make the idempotency guarantee explicit and lock-free for subsequent callers.

Comment on lines +205 to +209
if "pubsubs" in fields_set and pubsubs is not None:
is_legacy_default = len(pubsubs) == 1 and isinstance(pubsubs[0], LCM)
if not is_legacy_default:
return pubsubs
return _default_pubsubs(getattr(config, "g", config))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 [LCM()] explicit override silently falls through to transport default

_resolve_pubsubs treats pubsubs=[LCM()] as the legacy implicit default even when the caller explicitly set the field ("pubsubs" in fields_set). A developer who wants to force LCM-only visualization on a zenoh transport — e.g., to suppress the second Zenoh subscription for debugging — cannot express that intent: their value is indistinguishable from the old default and will be silently replaced by [Zenoh(), LCM()]. The companion test TestBridgePubsubResolution only covers explicit non-LCM overrides, leaving this case untested. Consider a sentinel (e.g., None) to mean "use transport default" rather than overloading the [LCM()] value.

Comment thread pyproject.toml
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch 6 times, most recently from 0afc3a9 to 4472fc9 Compare June 9, 2026 00:56
def make_transport(
name: str, msg_type: type | None = None, *, g: GlobalConfig = global_config
) -> PubSubTransport[Any]:
"""Construct the active-backend pub/sub transport for a logical channel.

@leshy leshy Jun 9, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few things, Transport isn't neccessarily a PubSubTransport - we can use TCP and IP address as a setting here etc (in theory, haven't implemented)

in case of PubSubTransport, a string doesn't define a full topic, there is a reason why Topic for LCM and Zenoh is a different object. Zenoh offers QoS settings etc per channel. maybe specific router config etc.

So I'm thinking when global switch zenoh or LCM is used, for lcm that can literally be just LCM(topic_string) but zenoh probably wants reliable delivery for RPC specifically or specific per topic configuration (Image can be unreliable, but not agent messages)

I'm not sure right now what to suggest here - if we can normalize transport requirements across topics "this is reliable", "this is unreliable" or if we need per transport global blueprint config overlay that this global config switch just applies? global overlay seems better to me

return Topic(topic=topic)


class ZenohRPC(PubSubRPCMixin[Topic, Any], PickleZenoh):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this initially, but Zenoh actually supports RPC on their protocol level, so we dont need to piggyback to pubsub here.

@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from 4472fc9 to 2eaac55 Compare June 10, 2026 01:33
@paul-nechifor paul-nechifor force-pushed the paul/feat-integrate-zenoh branch from eab7ba7 to ee8cb1f Compare June 10, 2026 05:51
@paul-nechifor paul-nechifor changed the title WIP: integrate zenoh Integrate Zenoh Jun 10, 2026
@paul-nechifor paul-nechifor marked this pull request as ready for review June 10, 2026 11:57
Comment thread dimos/core/transport.py
Comment on lines 321 to +332
self._started = False

def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
with self._start_lock:
if not self._started:
self.start()
self.dds.publish(self.topic, msg)
if not self._started:
self.start()
self.dds.publish(self.topic, msg)

def subscribe(
self, callback: Callable[[T], None], selfstream: Stream[T] | None = None
) -> Callable[[], None]:
with self._start_lock:
if not self._started:
self.start()
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg))
if not self._started:
self.start()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 DDSTransport loses atomic start-and-publish guarantee

The PR removed _start_lock from broadcast() and subscribe() in DDSTransport. The original code held the lock around both the start check and the dds.publish() / dds.subscribe() call, serializing those operations against stop(). Without the lock, a concurrent stop() can call dds.stop() while broadcast() is mid-publish — whether that crashes or silently corrupts depends on the underlying DDS implementation. The new ZenohTransport intentionally skips this lock because ZenohSessionPool never closes the session, so a post-stop publish safely re-declares the publisher on the live session. DDS doesn't have the same invariant: dds.stop() can genuinely tear down the connection. The pre-existing _start_lock field is still allocated in __init__, so restoring the lock in broadcast and subscribe is a one-line change per method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants