Skip to content

Fan-io#2364

Open
bogwi wants to merge 28 commits into
mainfrom
danvi/feat/fan-io
Open

Fan-io#2364
bogwi wants to merge 28 commits into
mainfrom
danvi/feat/fan-io

Conversation

@bogwi

@bogwi bogwi commented Jun 5, 2026

Copy link
Copy Markdown
Member

Fan-io

Problem

StreamModule only supports 1 input and 1 output.

Solution

  • We want to support multiple inputs (which can be aligned on time).
  • And multiple outputs emitted at once.
  • The first declared In regulates all emission times. The pipeline must construct a Bundle which is used to publish messages to multiple outputs.

Proof the platform works

  • memory2/test_module.py fusion tests (literal N:M)
  • MarkerModule migration (clearest before/after)
  • Detection3DModule migration (Rx -> pipeline, compute-once multi-out)

Out of scope

module-as-transform in arbitrary outer chains (will be new mem2-native-modules PR).

What this branch is (the core deliverable)

Piece File What it adds
Generalized StreamModule memory2/module.py N in / M out; ingest() seam; self.streams.<port>; no 1:1 gate
Fan-out primitives memory2/fanio.py Bundle, scatter_to_ports, normalize_to_bundle
Cross-stream pairing memory2/stream.py Stream.align, optional interpolator=
Stock interpolators memory2/interpolators.py lerp_pose, interp_odom
Contract tests memory2/test_module.py TwoInputFusion, ChainedFusion, multi-out compute-once, interpolation

Why the design

Before fan-I/O:

  • MarkerModule copied 24 lines of start() for one ingestion change
  • Detection3DModule was not a mem2 pipeline at all - Rx alignment in start()
  • Multi-out meant manual subscribe/publish or duplicate compute
  • Every new fusion module picked its own private wiring idiom

After fan-I/O:

  • Port count is data: declare N in, M out
  • One authoring model: align -> transform -> Bundle ingest() for ingress enrichment without copying start()
  • One subscribe, one compute, scatter to all outs
  • Migrated modules share the same mental model as offline/replay streams

Summary

Fan-I/O is the wrapper layer: mem2 fusion inside deployed StreamModules. That works today - and an important step toward modules that are just mem2 transforms: the same pipeline() you deploy on a robot should run in any mem2 chain, on a recorded SqliteStore, faster than realtime.

What survives from fan-I/O into that follow-up will be the core - every input as a mem2 stream, .align() / interpolator=, Bundle scatter, ingest() - but not the legacy port/transport wrapper around it.

@greptile-apps

greptile-apps Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a fan-I/O abstraction layer for StreamModule, expanding support from a strict 1-in/1-out shape to N inputs and M outputs. The core deliverables are Bundle/scatter_to_ports for fan-out, Stream.align(interpolator=) for time-synthesized secondary alignment, stock lerp_pose/interp_odom interpolators, and an ingest() seam so subclasses can enrich or drop messages without copying start().

  • memory2/module.py replaces the old 1:1 wiring with _wire_input (routes every In port through ingest() into a NullStore-backed Stream) and scatter_to_ports (one subscribe, fan-out to all Out ports via a Bundle); the streams accessor exposes sibling In ports for .align() inside pipeline().
  • memory2/fanio.py + stream.py add Bundle, normalize_to_bundle, scatter_to_ports, and the extended _nearest_align_iter that dispatches to either nearest-neighbor or interpolated pairing; memory2/interpolators.py ships reusable lerp_pose and interp_odom helpers with slerp for orientation.
  • Detection3DModule and MarkerModule are fully migrated: the old Rx align_timestamped wiring in start() is replaced by Stream.align(..., tolerance=0.25) inside pipeline(), and the single 2D-detect pass now scatters four output types atomically; ObjectDBModule (which depended on the removed Rx stream) is deleted.

Confidence Score: 5/5

Safe to merge — the fan-I/O wiring is correct, the bundle-scatter contract is well-tested, and the migrated modules preserve existing runtime behavior.

The core algorithmic changes (interpolated align, scatter, normalize_to_bundle) are all covered by contract tests that pin boundary conditions, exact-match short-circuits, compute-once guarantees, and the tolerance gate. The migration of Detection3DModule and MarkerModule eliminates the old Rx alignment path and replaces it with a simpler, more testable pipeline. Transport keys in the blueprints are correctly updated to (port_name, message_type). The one finding is a private-symbol import used only for a type annotation that is never evaluated at runtime.

dimos/perception/detection/module3D.py imports the private _pair_class symbol from memory2.stream; worth cleaning up if the annotation proves unnecessary.

Important Files Changed

Filename Overview
dimos/memory2/fanio.py New file: Bundle (frozendict subclass), normalize_to_bundle, and scatter_to_ports. Bundle-only, M-agnostic contract is clearly documented and correctly implemented; single subscribe ensures pipeline runs once per tick.
dimos/memory2/module.py N-in/M-out wiring: _wire_input routes each In port through ingest() into a NullStore stream; scatter_to_ports replaces stream_to_port; streams accessor exposes siblings for alignment. Closure safety in _wire_input confirmed (each call is a separate scope).
dimos/memory2/stream.py align() gains optional interpolator= path; _nearest_align_iter refactored into _nearest_secondary / _interpolated_secondary helpers. Tolerance boundary (<=) preserved; exact-ts match fast-path correct; loop invariant (prev.ts < p.ts) properly upheld.
dimos/memory2/interpolators.py New file: lerp_pose and interp_odom. _slerp handles antipodal quaternions and near-parallel fallback (nlerp) correctly. Tests in test_interpolators.py pin midpoint, alpha fraction, short-arc, and parallel-orientation edge cases.
dimos/perception/detection/module3D.py Detection3DModule migrated from Rx align_timestamped to Stream.align; one pipeline pass now scatters to detections_2d, detections_3d, and 6 visualization ports via Bundle. One style issue: imports private _pair_class for a runtime-unreachable type annotation.
dimos/perception/fiducial/marker_module.py New MarkerModule: ingest() seam anchors each frame with its world->camera pose (dropping frames without TF), pipeline scatters detections_3d and detections_2d from one DetectMarkers pass.
dimos/memory2/test_module.py Comprehensive new tests: TwoInputFusion, ChainedFusion, MyFusion (pose-at-scan-time), OdomFusion (3-in/2-out), TestFanOutScatter (scatter semantics), TestIngestSeam. Covers tolerances, bundle-tail 1:1, unrouted key drops, and compute-once guarantee.
dimos/robot/unitree/go2/blueprints/smart/unitree_go2_detection.py Transport keys corrected from (port, module_class) to (port, message_type); detections_3d transport added for the new Out port.
dimos/robot/unitree/g1/blueprints/perceptive/unitree_g1_detection.py Transport keys updated to (port_name, message_type) format; ObjectDBModule and its remappings removed; new detections_3d transport added.

Sequence Diagram

sequenceDiagram
    participant Port as In Port(s)
    participant Ingest as ingest()
    participant Store as NullStore / Stream
    participant Pipeline as pipeline()
    participant Norm as normalize_to_bundle()
    participant Scatter as scatter_to_ports()
    participant OutPorts as Out Port(s)

    Note over Port,Store: start() wires every In port
    Port->>Ingest: message arrives (push)
    Ingest->>Store: "stream.append(msg, ts=msg.ts)"

    Note over Store,Pipeline: primary.live() drives pipeline
    Store->>Pipeline: Observation[T] (pull / live tail)
    Pipeline->>Pipeline: align(self.streams.sibling, tolerance, interpolator?)
    Pipeline->>Pipeline: transform / map_data → FusedDetections
    Pipeline->>Pipeline: "map_data → Bundle{port_name: payload}"

    Pipeline->>Norm: "Stream[Bundle | T]"
    Note over Norm: 1-Out raw payload? wrap in Bundle
    Norm->>Scatter: Stream[Bundle] (one subscribe)

    Scatter->>OutPorts: bundle[port_a] → Out port_a.publish()
    Scatter->>OutPorts: bundle[port_b] → Out port_b.publish()
    Note over Scatter,OutPorts: missing/None keys → port idle this tick
Loading

Reviews (7): Last reviewed commit: "gh-reviews: resolve-2" | Re-trigger Greptile

Comment thread dimos/memory2/module.py
Comment thread dimos/memory2/fanio.py
Comment thread dimos/robot/unitree/go2/blueprints/smart/unitree_go2_detection.py Outdated
Comment thread dimos/perception/detection/moduleDB.py Outdated
Comment thread dimos/perception/detection/module3D.py
@bogwi bogwi added the PlzReview label Jun 5, 2026
Comment thread dimos/robot/unitree/go2/blueprints/smart/test_unitree_go2_markers.py Outdated
Comment thread dimos/robot/unitree/g1/blueprints/perceptive/unitree_g1_detection.py Outdated
Comment thread dimos/robot/unitree/g1/blueprints/perceptive/test_unitree_g1_detection.py Outdated
Comment thread dimos/perception/detection/module3D.py
Comment thread dimos/perception/detection/module3D.py Outdated
Comment thread dimos/memory2/fanio.py Outdated
Comment thread dimos/memory2/fanio.py Outdated
Comment thread dimos/perception/detection/test_module3D.py Outdated
Comment thread dimos/perception/detection/type/detection2d/bbox.py Outdated
Comment thread dimos/perception/fiducial/marker_transformer.py
Comment thread dimos/perception/detection/module3D.py Outdated
Comment thread dimos/perception/detection/module3D.py
Comment thread dimos/perception/fiducial/marker_module.py Outdated
Comment thread dimos/perception/fiducial/marker_transformer.py Outdated
Comment thread dimos/memory2/test_module.py Outdated
Comment thread dimos/memory2/test_module.py Outdated
Comment thread dimos/memory2/test_module.py Outdated
Comment thread dimos/memory2/test_stream.py Outdated
bogwi and others added 28 commits June 8, 2026 01:16
The nav_vlm inline import had no circular/heavy-dep reason; the sibling
geometry msgs are already imported at the top.
Detection2DArray imported Detection3DArray._label_for_detection across
modules; a member used from outside isn't private. Rename to
label_for_detection and update both callers and the bbox.py reference.
Replace # type: ignore[no-untyped-def] on _FakePort.subscribe/emit,
_RecordingOut.publish and the _wait_until pollers with real parameter
and return annotations.
@bogwi bogwi force-pushed the danvi/feat/fan-io branch from 1ac21c6 to fed5000 Compare June 7, 2026 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants