maru mp mode adapter#18
Open
seohui-XCENA wants to merge 14 commits into
Open
Conversation
Remove "Option B" / "Phase 1.X" / integration.md references from docstrings and comments. The doc-tree pointer rots with future restructuring, and the phase labels are meaningless outside the planning context they were written in.
Split the maru allocator's two-phase startup so the MaruHandler connection and CxlMemoryAdapter pool are built on the first register_kv_layout call (triggered by the first register_kv_cache RPC) rather than at allocator construction. This matches the MP server lifecycle, where StorageManager is built before any vLLM worker has registered its KV cache tensors. Strips shapes/dtypes/fmt/full_chunk_size_bytes/chunk_size_in_tokens out of MaruL1Config (they are derived from metadata at init_layout time) and surfaces the single-model constraint + TODO(maru-multi- model) in the class docstring. L1MemoryManager picks up register_kv_layout as a no-op for default backends and a forwarder for the maru backend; the get_memory_usage maru branch short- circuits to (0, 0) before init_layout to keep eviction-controller polls quiet on a cold startup.
Pull the maru-specific pass-through helpers out of L1Manager into a dedicated MaruL1Dispatcher class in maru_l1_dispatch.py. L1Manager now constructs a dispatcher when it detects a MaruMemoryAllocator and forwards reserve_read / unsafe_read / finish_read / reserve_write / finish_write / finish_write_and_reserve_read / delete / clear / report_status to it. The dispatcher owns the read-side channel and the maru-only allocator / memory_manager references; L1Manager is left with the default-backend state machine plus a thin dispatch shim. Also moves the shared L1OperationResult type alias to internal_api so both modules can import it without creating a circular dependency.
Phase 1.D plumbing: thread the maru L1 backend end-to-end through the MP server. - StorageManager: detect maru mode at construction (via ``L1MemoryManagerConfig.maru_config``) and skip the L1/L2 eviction / store / prefetch controllers plus L2 adapter creation — the CXL pool is the L2 tier and is owned by MaruServer. Add register_kv_layout (forwards to L1Manager) and thread ``memory_objs`` through finish_write so the maru branch can issue MaruHandler.batch_store. - MPCacheEngine: on register_kv_cache, derive the KV layout from the registering vLLM worker's gpu_context and call storage_manager.register_kv_layout — this is what triggers MaruMemoryAllocator.init_layout to bring up the CXL pool. On store, snapshot reserved keys / memory_objs and pass them to storage_manager.finish_write via the host-func closure. - config: add --maru-server-url / --maru-pool-size-gb / --maru-instance-id MP CLI flags. parse_args_to_config builds a MaruL1Config when these are set; no new top-level engine fields. - docs/design/v1/distributed/maru_memory_allocator.md: design rationale, lifecycle, single-model constraint + TODO, Phase 2 evolution path. - tests/v1/distributed/test_storage_manager_maru.py: coverage for maru-mode StorageManager construction (skipped controllers, empty L2 adapters), register_kv_layout chain, memory_objs threading, close/report_status without controllers.
Per follow-up review: the class-level docstring already covers lifecycle, single-model constraint, and the TODO marker, so the external design doc duplicates that without adding signal.
Registers under ``--l2-adapter '{"type":"maru",...}'`` via
``register_l2_adapter_factory``. The adapter keeps L1 as the default
DRAM allocator and routes L2 traffic to a ``MaruServer``-managed CXL
pool: engine hot path is GPU ↔ DRAM ↔ CXL with the DRAM-side memcpy
running on the adapter's worker pool.
- ``MaruL2AdapterConfig`` with ``from_dict`` / ``help`` and RPC
tunables matching ``MaruL1Config`` (server_url, pool_size_gb,
chunk_size_bytes, instance_id, timeout_ms, etc.).
- ``submit_store_task``: ``MaruHandler.alloc`` →
``ctypes.memmove`` DRAM→CXL → ``batch_store``.
- ``submit_lookup_and_lock_task``: ``batch_pin`` → prefix-bitmap.
- ``submit_load_task``: ``batch_retrieve`` → memmove CXL→DRAM per
``MemoryInfo.view`` → completion bitmap.
- ``submit_unlock`` / ``delete``: ``batch_unpin`` / per-key
``delete``.
- Three ``ThreadPoolExecutor`` pools + three ``EventNotifier`` event
fds wired to the controller-facing surface.
- Factory + type registration so the auto-discovery in
``l2_adapters/__init__.py`` picks the adapter up.
37 tests covering MaruL2AdapterConfig parsing, factory registration, the store/lookup/load task lifecycle (happy path, partial failure, exception swallowing), submit_unlock / delete edge cases, close() idempotence, and the key-encoding / memoryview-address helpers. MaruHandler is monkey-patched via MaruL2Adapter._create_handler so no MaruServer connection is needed; worker pools are swapped for an inline _SyncExecutor so submit_* returns deterministically before assertions run. Numpy uint8 buffers stand in for L1 DRAM MemoryObj backing so byte-level memmove can be verified end-to-end.
T7 builds a MaruL2Adapter against the running MaruServer and checks the three event fds are distinct. T8 then runs a single- key store → lookup-and-lock → load round-trip on a deterministic 64KB payload and asserts the loaded bytes match the source — exercising the full DRAM→CXL memcpy + batch_store and batch_retrieve + CXL→DRAM memcpy paths end-to-end. Closes the adapter at the end to leave the server clean. Numpy uint8 buffers stand in for L1 DRAM MemoryObj backing (matching the unit-test pattern); MagicMock wraps them so the adapter receives proper data_ptr / get_size() values.
Defer ``MaruHandler.connect()`` to the first store / load so the adapter can derive ``chunk_size_bytes`` from the inbound ``MemoryObj.get_physical_size()`` instead of forcing the user to spell it out in CLI / yaml. The config field becomes Optional — explicit values still win and remain useful when lookup or load must run before any store. - ``MaruL2AdapterConfig.chunk_size_bytes``: ``Optional[int] = None``, validated only when present. ``help()`` text updated. - ``MaruL2Adapter.__init__``: handler stays ``None``; the event fds and worker pools come up as before. - New ``_ensure_connected(hint_size=None)``: thread-safe lazy bring-up. ``_create_handler`` becomes ``_connect_handler(config, chunk_size_bytes)`` taking the resolved size as an arg. - ``submit_store_task`` / ``submit_load_task`` pass the first ``MemoryObj``'s physical size as the hint. Lookup / unlock / delete short-circuit when the handler is still ``None`` (lookup ends up with an all-miss bitmap; unlock and delete no-op since there can be no live pins / entries). Tests: ``adapter`` fixture pre-pins the handler so existing store/load assertions are unchanged; new ``lazy_adapter`` fixture plus ``TestLazyConnect`` exercise the lazy contract (no eager connect, store hint, explicit-size precedence, lookup-first yielding empty bitmap, unlock/delete skipping, single connect across stores, optional ``chunk_size_bytes`` in ``from_dict``).
T7 now reports "constructed; handler=None" instead of "connected", since the lazy ``MaruL2Adapter`` defers ``MaruHandler.connect()`` until the first store / load. T7 also asserts ``_handler is None`` so the lazy contract is positively verified (not just inferred from log silence). T8's header makes it explicit that the first store is what fires the lazy connect. No behavioural change.
User-facing field stays ``pool_size_gb`` (matches the L1 path's ``--maru-pool-size-gb`` flag); convert to bytes once in ``__init__`` and expose the result as ``pool_size_bytes`` so downstream code (``super().__init__``, ``MaruConfig(pool_size=...)``, logs) reads the cached attribute instead of re-multiplying. Aligns the internal attribute name with ``MaruL1Config.pool_size_bytes``.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Maru as MP-mode L1 backend
Architecture
Data path — store
write 의 두 단계:
reserve_writefinish_writebatch_storecaller 입장에서 한 번의 store 흐름이 dispatcher 안에서 allocate → 데이터 채우기 → register 두 단계로 분해된다.
sequenceDiagram autonumber participant vLLM as vLLM worker participant Engine as MPCacheEngine participant SM as StorageManager participant L1 as L1Manager participant Disp as MaruL1Dispatcher participant Alloc as MaruMemoryAllocator participant Maru as MaruHandler participant Srv as MaruServer vLLM->>Engine: store(keys, kv_tensors) [ZMQ RPC] Engine->>SM: reserve_write(keys, ...) SM->>L1: reserve_write(keys, ...) L1->>Disp: reserve_write(keys, ...) Disp->>Alloc: batched_allocate() Alloc-->>Disp: CXL pages (TensorMemoryObj) Disp-->>L1: MemoryObj per key L1-->>SM: MemoryObj per key SM-->>Engine: MemoryObj per key Note over Engine,vLLM: cudaMemcpy: GPU → CXL page (1 hop) Engine->>SM: finish_write(keys, memory_objs) SM->>L1: finish_write(keys, memory_objs) L1->>Disp: finish_write(keys, memory_objs) Note over Disp: keys → object_key_to_string()<br/>alloc handle 추출 Disp->>Maru: batch_store(key_strs, handles) Note over Maru: Phase 1 — dup check Maru->>Srv: batch_exists_kv RPC Srv-->>Maru: exists bitmap per key Note over Maru: Phase 2 — client-side cleanup of dups loop dup key (local cache 또는 server has it) Maru->>Maru: self._owned.free(region_id, page_idx)<br/>(client-side free, server 모름) end Note over Maru: Phase 3 — register new entries Maru->>Srv: batch_register_kv RPC<br/>(key, region_id, offset, length) Note over Srv: kv_manager.register:<br/>새 entry → store + region refcount++<br/>이미 있음 → idempotent skip Srv-->>Maru: results Maru-->>Disp: success bool per key Disp-->>L1: dict[key, L1Error] L1-->>SM: ... SM-->>Engine: ...Data path — retrieve
read 는 세 단계로 쪼개져 있다:
reserve_readMemoryInfo) 가져와MemoryObj로 변환, side-channel_pending_read_memobjs에 보관batch_pin+batch_retrieve(실패/tail 시batch_unpin)unsafe_readreserve_read가 채워둔 side-channel 에서MemoryObj만 꺼냄 — RPC 없음. caller (engine) 가 이걸로 GPU 로 cudaMemcpyfinish_readbatch_unpincaller 입장에서 한 번의 lookup → load → finish 흐름이 dispatcher 안에서 3 단계로 분해된다.
unsafe_read가 RPC 없이 빠르게 끝나도록reserve_read가 미리MemoryObj까지 만들어 두는 게 핵심.sequenceDiagram autonumber participant vLLM as vLLM worker participant Engine as MPCacheEngine participant SM as StorageManager participant L1 as L1Manager participant Disp as MaruL1Dispatcher participant Alloc as MaruMemoryAllocator participant Maru as MaruHandler participant Srv as MaruServer rect rgb(235, 245, 255) Note over vLLM,Srv: reserve_read — pin + metadata 확보 vLLM->>Engine: lookup(keys) [ZMQ RPC] Engine->>SM: reserve_read(keys) SM->>L1: reserve_read(keys) L1->>Disp: reserve_read(keys) Note over Disp: keys → object_key_to_string() Disp->>Maru: batch_pin(key_strs) Maru->>Srv: pin RPC Srv-->>Maru: pin bitmap (prefix-stop) Maru-->>Disp: pin_results Disp->>Maru: batch_retrieve(key_strs[:num_pinned]) Maru->>Srv: retrieve RPC Srv-->>Maru: MemoryInfo per key Maru-->>Disp: mem_infos alt retrieve 자체 실패 Disp->>Maru: batch_unpin(key_strs[:num_pinned]) (rollback) Maru->>Srv: unpin RPC end Note over Disp,Alloc: 각 MemoryInfo →<br/>allocator.get_by_location() →<br/>_pending_read_memobjs[key] = mem_obj alt 일부 tail resolve 실패 Disp->>Maru: batch_unpin(extras) (reconciliation) Maru->>Srv: unpin RPC end Disp-->>L1: dict[key, L1OperationResult] L1-->>SM: ... SM-->>Engine: results end rect rgb(245, 255, 235) Note over vLLM,Srv: unsafe_read — side-channel 만, RPC 없음 vLLM->>Engine: load(keys, dst_kv_tensors) [ZMQ RPC] Engine->>L1: unsafe_read(keys) L1->>Disp: unsafe_read(keys) Note over Disp: _pending_read_memobjs[k]<br/>꺼내서 반환 Disp-->>L1: MemoryObj per key L1-->>Engine: ... Note over Engine,vLLM: cudaMemcpy: CXL page → GPU (1 hop) end rect rgb(255, 245, 235) Note over vLLM,Srv: finish_read — unpin Engine->>SM: finish_read(keys) SM->>L1: finish_read(keys) L1->>Disp: finish_read(keys) Note over Disp: side-channel drain Disp->>Maru: batch_unpin(to_unpin) Maru->>Srv: unpin RPC Srv-->>Maru: ok Maru-->>Disp: ok Disp-->>L1: dict[key, L1Error] endData path — delete
sequenceDiagram autonumber participant Trigger as 명시적 delete RPC<br/>또는 shutdown participant Engine as MPCacheEngine participant SM as StorageManager participant L1 as L1Manager participant Disp as MaruL1Dispatcher participant Maru as MaruHandler participant Srv as MaruServer Trigger->>Engine: delete(keys) Engine->>SM: ... SM->>L1: delete(keys) L1->>Disp: delete(keys) loop for k in keys (단건, batch_delete 없음) Note over Disp: key_str = object_key_to_string(k) Disp->>Maru: handler.delete(key_str) Maru->>Srv: delete RPC alt entry 존재 AND unpinned Note over Srv: KV entry 제거 + page free Srv-->>Maru: True Maru-->>Disp: True → SUCCESS else missing 또는 pinned (conflated) Srv-->>Maru: False Maru-->>Disp: False → KEY_NOT_EXIST end end Disp-->>L1: dict[key, L1Error] L1-->>SM: ...언제 호출되나? maru-L1 mode 에서는 default DRAM mode 와 달리 다음 트리거가 모두 우회 된다:
EvictionController(l1_mgr.delete(evict_keys)) — L1 entry 의 lifecycle 이 MaruServer 소유라 L1 측 eviction 자체가 없음.StoreController/PrefetchController의 cleanup delete — 두 controller 가 maru 모드에서 instantiate 안 됨.serde_wrapper의 cleanup — L2 cascade 가 우회되어 도달 안 함.따라서 실제로 불리는 경우는:
pool 내부의 capacity-driven eviction 은
MaruServer자체 LRU 가 처리하며L1Manager와 무관.Maru as MP-mode L2 adapter
Architecture
Data path — store
write 의 두 단계:
submit_store_taskhandler.alloc(size)× N +batch_store(key_strs, handles)pop_completed_store_tasksDRAM L1 에 채워진 KV 가 cascade overflow 로 L2 로 spill 될 때 호출된다. cudaMemcpy 는 GPU→DRAM 단계에서 이미 끝났고, L2 가 보는 건 DRAM page → CXL page 의 byte copy.
sequenceDiagram autonumber participant Ctrl as StoreController participant Adapter as MaruL2Adapter participant Exec as store_executor participant Maru as MaruHandler participant Srv as MaruServer Ctrl->>Adapter: submit_store_task(keys, dram_objs) Note over Adapter: lock-protect:<br/>task_id 발급<br/>_inflight_store_tasks++ Adapter->>Exec: submit(_execute_store_task, ...) Adapter-->>Ctrl: task_id Note over Exec: hint = objs[0].get_physical_size()<br/>handler = _ensure_connected(hint) loop per object Exec->>Maru: handler.alloc(size) Maru-->>Exec: AllocHandle (CXL page mapped) Note over Exec: ctypes.memmove(handle.buf, obj.data_ptr, size)<br/>(DRAM L1 page → CXL page) end Exec->>Maru: batch_store(key_strs, handles) Note over Maru: Phase 1 — dup check Maru->>Srv: batch_exists_kv RPC Srv-->>Maru: exists bitmap Note over Maru: Phase 2 — client-side cleanup of dups loop dup key Maru->>Maru: self._owned.free(region_id, page_idx) end Note over Maru: Phase 3 — register new entries Maru->>Srv: batch_register_kv RPC Srv-->>Maru: results Maru-->>Exec: success bool per key Note over Exec: lock-protect:<br/>_completed_store_tasks[task_id] = all(results)<br/>_inflight_store_tasks-- Exec->>Adapter: signal _store_efd Ctrl->>Adapter: pop_completed_store_tasks() Adapter-->>Ctrl: dict[task_id, bool]Data path — retrieve
read 는 세 단계:
submit_lookup_and_lock_taskbatch_pin(별도batch_exists없음 — pin 자체가 존재 검사)submit_load_taskbatch_retrieve+ memcpy CXL→DRAMsubmit_unlockbatch_unpinL2 cascade 의
PrefetchController/LookupResultPromote가 이 흐름을 구동한다.sequenceDiagram autonumber participant Ctrl as PrefetchController participant Adapter as MaruL2Adapter participant LE as lookup_executor participant LD as load_executor participant Maru as MaruHandler participant Srv as MaruServer rect rgb(235, 245, 255) Note over Ctrl,Srv: submit_lookup_and_lock_task — pin 으로 존재 확인 + 권한 확보 Ctrl->>Adapter: submit_lookup_and_lock_task(keys) Adapter->>LE: submit(_execute_lookup_task, ...) Adapter-->>Ctrl: task_id LE->>Maru: handler.batch_pin(key_strs) Maru->>Srv: pin RPC Srv-->>Maru: per-key ok/fail Maru-->>LE: pin_results Note over LE: prefix-stop:<br/>첫 miss 까지 bitmap.set(i) LE->>Adapter: signal _lookup_efd Ctrl->>Adapter: query_lookup_and_lock_result(task_id) Adapter-->>Ctrl: Bitmap (prefix-stop) end rect rgb(245, 255, 235) Note over Ctrl,Srv: submit_load_task — CXL → DRAM copy Ctrl->>Adapter: submit_load_task(keys, dst_dram_objs) Adapter->>LD: submit(_execute_load_task, ...) Adapter-->>Ctrl: task_id LD->>Maru: handler.batch_retrieve(key_strs) Maru->>Srv: retrieve RPC Srv-->>Maru: MemoryInfo per key Maru-->>LD: mem_infos loop per (info, dst) Note over LD: ctypes.memmove(dst.data_ptr,<br/>info.view, len(info.view))<br/>(CXL page → DRAM L1 page) end LD->>Adapter: signal _load_efd Ctrl->>Adapter: query_load_result(task_id) Adapter-->>Ctrl: Bitmap (hit per index) Note over Ctrl: (상위 layer 가 DRAM → GPU cudaMemcpy)<br/>(LookupResultPromote 가 DRAM L1 으로 promote 가능) end rect rgb(255, 245, 235) Note over Ctrl,Srv: submit_unlock — pin 해제 (동기, fire-and-forget) Ctrl->>Adapter: submit_unlock(keys) alt _handler is None Note over Adapter: short-circuit (live pin 없음) else handler 있음 Adapter->>Maru: batch_unpin(key_strs) Maru->>Srv: unpin RPC Srv-->>Maru: ok end Adapter-->>Ctrl: (return) endData path — delete
sequenceDiagram autonumber participant Trigger as EvictionController<br/>또는 명시적 delete participant Adapter as MaruL2Adapter participant Maru as MaruHandler participant Srv as MaruServer Trigger->>Adapter: delete(keys) alt _handler is None Note over Adapter: short-circuit (live entry 없음) else loop for k in keys (단건, batch_delete 없음) Note over Adapter: key_str = _object_key_to_string(k) Adapter->>Maru: handler.delete(key_str) Maru->>Srv: delete RPC alt entry 존재 AND unpinned Note over Srv: index 제거 + region refcount-- Srv-->>Maru: True else missing 또는 pinned Srv-->>Maru: False Note over Adapter: 로그만 남기고 진행 (best-effort) end end end