[2.7] Pass-Through: Zero Tensor Copy at CJ for Large-Model Federated Training#4210
[2.7] Pass-Through: Zero Tensor Copy at CJ for Large-Model Federated Training#4210chesterxgchen merged 15 commits intoNVIDIA:2.7from
Conversation
Greptile SummaryIntroduces pass-through architecture that eliminates tensor materialization at the CJ (Client Job) process for large-model federated learning. Instead of downloading and re-serializing model tensors at the CJ (consuming 140+ GB for a 70B model), the CJ now creates lightweight Key Changes:
Impact:
Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Server as FL Server
participant CJ as CJ Process
participant Sub as Subprocess Agent
Note over Server,Sub: Before This PR (Tensor Materialization)
Server->>CJ: serialize tensors
Note over CJ: Downloads & materializes<br/>full model (e.g., 140 GB)
CJ->>CJ: Re-serialize for subprocess
CJ->>Sub: New download transaction
Sub->>CJ: Download tensors
Note over CJ: CJ memory = full model size
Note over Server,Sub: After This PR (Pass-Through)
Server->>CJ: serialize tensors with ref
Note over CJ: PASS_THROUGH enabled<br/>Creates LazyDownloadRef<br/>(~100 bytes per tensor)
CJ->>Sub: Forward LazyDownloadRef<br/>(original server FQCN + ref_id)
Sub->>Server: Download tensors directly
Note over CJ: CJ memory ≈ 0 bytes<br/>(no tensor data)
Last reviewed commit: 741b460 |
d6c659a to
5f6958c
Compare
|
/build |
|
/build |
pcnudde
left a comment
There was a problem hiding this comment.
Are there any possible incompatibilities with things like filters or other users code that expects real tensors instead of the the lazy references.
Replace CellPipe with LocalCellPipe as the default pipe transport between
the CJ (Client Job) process and the trainer subprocess when
launch_external_process=True.
## What changes
* **New `nvflare/fuel/utils/pipe/local_cell_pipe.py`**: a `Pipe` subclass
that uses a direct local Cellnet Cell connection (tcp://127.0.0.1:<port>)
instead of routing through the external FL network.
- PASSIVE side (CJ): creates a server-child Cell with an internal listener
on a dynamically assigned localhost port.
- ACTIVE side (subprocess): creates a Cell that connects to the PASSIVE
cell via `parent_url=<bound-port-url>` exported at job start.
- Both task and metric pipes share one Cell per side via a class-level
`_cells_info` cache, multiplexed over distinct channel names.
* **Modified `nvflare/job_config/script_runner.py`**: adds
`_create_local_cell_pipe()` and uses it (instead of `_create_cell_pipe()`)
as the default pipe factory in `add_to_fed_job()`.
* **New `tests/unit_test/fuel/utils/pipe/test_local_cell_pipe.py`**: 33 unit
tests covering FQCN helpers, placeholder guard, cell sharing, listener
binding, bidirectional messaging, two-channel multiplexing, lifecycle /
error cases, and concurrent sends.
## Why
The existing CellPipe requires both the CJ process and the subprocess to
join the external FL cellnet (with auth tokens and TLS certs) just to
exchange model updates on the same host. LocalCellPipe eliminates that
overhead: no auth credentials, no relay round-trips, sub-millisecond local
TCP latency. It also lays the architectural foundation for a future B1
pass-through optimisation (CJ forwards raw FOBS ref-ids directly to
subprocess so tensor download happens once, at the subprocess, reducing
peak memory usage across training rounds).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Problem
With the previous CellPipe / LocalCellPipe transport, the CJ (Client Job)
process was always a full intermediate stop for model tensors:
1. CJ receives task from server → ViaDownloaderDecomposer downloads every
tensor → CJ holds the full model in memory (30–70 GB for large models).
2. CJ re-serialises the model for the subprocess pipe → new download refs
pointing to *CJ's* cell → subprocess downloads from CJ.
Peak CJ memory = model size. For 30–70 GB models this is a hard blocker.
## Solution — B1 pass-through architecture
### FOBS layer (`nvflare/fuel/utils/fobs/`)
* **`FOBSContextKey.PASS_THROUGH`** — new flag. When set in a cell's
fobs_ctx, `ViaDownloaderDecomposer` does **not** download tensors on
receipt. Instead it creates `LazyDownloadRef` placeholders (tiny
Python objects, just FQCN + ref_id + item_id) for every tensor in the
received batch.
* **`LazyDownloadRef`** — new class in `via_downloader.py`. Carries the
*original source* FQCN and batch ref_id.
* **`ViaDownloaderDecomposer.process_datum()`** — PASS_THROUGH branch:
store `(fqcn, ref_id)` tuple instead of downloading.
* **`ViaDownloaderDecomposer.recompose()`** — returns `LazyDownloadRef`
objects when `items_key` holds a tuple (PASS_THROUGH mode).
* **`ViaDownloaderDecomposer.decompose()`** — detects `LazyDownloadRef`
targets and re-emits the *original* server datum verbatim via
`_finalize_lazy_batch()` post-callback, instead of creating a new
download transaction on the CJ cell.
### Executor (`nvflare/app_common/executors/client_api_launcher_executor.py`)
`initialize()` now sets `PASS_THROUGH=True` on the engine's communication
cell so all incoming FL tasks arrive at CJ without tensor downloads.
### Pipe defaults (`nvflare/job_config/script_runner.py`)
* **Task pipe** → `CellPipe` (FL-network connected). The subprocess
agent's pipe cell can reach any FQCN in the FL network, including the
server cell that owns the original download transaction. Each tensor is
downloaded individually by `ViaDownloaderDecomposer` at the subprocess —
no single-message 2 GB cap applies.
* **Metric pipe** → `LocalCellPipe` (direct local TCP). Metric payloads
are scalars/strings; local TCP is faster and needs no FL auth.
## Data flow after this change
```
Server ──(ViaDownloader refs)──► CJ ──(same refs forwarded)──► subprocess
no tensor download at CJ downloads from server directly
```
CJ peak memory: O(number of tensor refs) instead of O(model size).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… check - Remove LocalCellPipe and its tests (dead code: CellPipe covers the same use-case with acceptable overhead and no extra Cell infrastructure) - Revert script_runner.py metric pipe back to CellPipe (both pipes now use CellPipe, same as before the LocalCellPipe experiment) - Replace fragile isinstance(items, tuple) guard in ViaDownloaderDecomposer with a named _LazyBatchInfo sentinel class, making the PASS_THROUGH path unambiguous and robust against accidental type collisions Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…accumulation) Tests cover: - LazyDownloadRef and _LazyBatchInfo construction and slot integrity - process_datum() in PASS_THROUGH mode: creates _LazyBatchInfo sentinel, does NOT invoke the network download path - recompose() returns LazyDownloadRef with correct fqcn/ref_id/item_id when _LazyBatchInfo is in fobs_ctx - decompose() on a LazyDownloadRef: re-emits the original server datum via _finalize_lazy_batch post-CB; CB registered only once per batch regardless of item count; emitted datum carries correct fqcn/ref_id/DOT - No memory accumulation: _CtxKey.OBJECTS absent from fobs_ctx after PASS_THROUGH round, no DownloadService transaction created at CJ, 50 repeated cycles do not accumulate state Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n + E2E tests
Fix a real serialization bug in the PASS_THROUGH path and add end-to-end
tests that validate the full B1 architecture round-trip.
Problem:
When CJ deserialized a task in PASS_THROUGH mode, numpy arrays became
LazyDownloadRef objects. When CJ then re-serialized those objects for
the subprocess via the task pipe, FOBS had no decomposer for LazyDownloadRef
and raised:
RuntimeError: <class 'LazyDownloadRef'> can't be serialized by FOBS
without a decomposer
This made the entire B1 pass-through architecture non-functional.
Changes:
via_downloader.py
• Add `dot` field to LazyDownloadRef and _LazyBatchInfo so each
placeholder carries the Datum Object Type of the originating
ViaDownloaderDecomposer subclass (e.g. NUMPY_DOWNLOAD).
• process_datum() PASS_THROUGH branch: pass datum.dot → _LazyBatchInfo.dot
• recompose() PASS_THROUGH branch: pass items.dot → LazyDownloadRef.dot
• New LazyDownloadRefDecomposer (auto-registered via register_folder):
- decompose(): delegates to get_dot_handler(lazy.dot).decompose(), which
re-emits the original server datum via _finalize_lazy_batch post-CB,
then appends lazy_dot to the returned encoding dict.
- recompose(): uses lazy_dot to look up the handler and delegates to
handler.recompose(), which retrieves the real tensor from fobs_ctx
(populated by process_datum() at the subprocess).
New tests:
tests/unit_test/fuel/f3/streaming/test_pass_through_e2e.py
Five end-to-end tests using real TCP-connected Cell pairs:
1. test_arrays_survive_pass_through_hop — full round-trip correctness
2. test_cj_holds_only_lazy_refs_not_tensor_data — no tensors at CJ
3. test_cj_creates_no_download_transaction — DownloadService unchanged
4. test_forwarded_payload_carries_original_server_ref — subprocess
downloads from server, not CJ
5. test_multiple_array_roundtrip — 8-array batch, bit-exact values
All 27 B1 tests pass (22 unit + 5 E2E).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a new integration test job that exercises the full B1 PASS_THROUGH
stack with a PyTorch model large enough (≈ 8 MB) to trigger the
ViaDownloaderDecomposer streaming path (threshold: 2 MB per array).
Why this test:
Existing pt_client_api_launch_once tests use the CIFAR-10 CNN (~240 KB
of parameters) which falls below the 2 MB streaming threshold and
therefore serialises tensors natively, bypassing the download/PASS_THROUGH
path entirely. The new job forces the real B1 code path:
FL Server ──stream──▶ CJ (PASS_THROUGH, LazyDownloadRef only)
└──forward──▶ Subprocess
└──download──▶ FL Server
New files:
tests/integration_test/data/jobs/pt_large_model_pass_through/
├── meta.conf
└── app/
├── config/
│ ├── config_fed_server.conf (PTFileModelPersistor + ScatterAndGather)
│ └── config_fed_client.conf (PTClientAPILauncherExecutor, launch_once=true)
└── custom/
├── large_model_net.py (LargeNet: 3-layer MLP, ≈ 8 MB float32)
└── large_model_train.py (llm_hf-style while-loop, synthetic data,
CPU-only — no dataset download needed)
Test suite:
tests/integration_test/data/test_configs/standalone_job/client_api.yml
+ "run pt-large-model-pass-through" entry (no setup/teardown needed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove three unused imports flagged by CI flake8: - test_pass_through_e2e.py: `from nvflare.fuel.utils import fobs` - test_pass_through_e2e.py: `from nvflare.fuel.utils.fobs import get_dot_handler` - test_pass_through.py: `_LAZY_BATCH_CTX_SUFFIX` All 27 B1 tests still pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Preserve and restore the prior PASS_THROUGH fobs context value so this executor pass-through mode does not leak process-wide beyond the run, including when initialize fails.
Update memory management documentation to reflect current behavior: jemalloc preload is enabled only when NVFLARE_ENABLE_JEMALLOC_PRELOAD is set true, rather than auto-enabled by default.
1f7674d to
b80c037
Compare
|
/build |
|
|
/build |
Extend the fake FLContext in client_api_launcher_executor tests to provide identity and job context accessors used by FLComponent logging helpers.
|
/build |
Use unique server and subprocess FQCNs per test fixture and clean up CoreCell registry entries on teardown to prevent duplicate cell name failures across test runs.
|
/build |
…ent, hierarchical startup stability Add three major new sections to flare_272.rst covering work merged after the initial 2.7.2 draft: Memory Management (restructured): - Zero Tensor Copy at CJ process via LazyDownloadRef pass-through (PR NVIDIA#4210) - Client-side memory management: malloc_trim, jemalloc, torch.cuda.empty_cache injected after flare.send() without training script changes (PR NVIDIA#4211) - Retain existing TensorDownloader and server-side cleanup content F3 Streaming Reliability and Performance (new section): - HOL stall mitigation: bounded send_frame() timeout, ACK watchdog, stall detection/recovery with recommended env-var settings (PR NVIDIA#4206) - Stream pool starvation fix: blob callbacks dispatched to dedicated thread pool, preventing stream worker exhaustion (PR NVIDIA#4171/NVIDIA#4172) - Streaming download retry with exponential backoff on timeout (PR NVIDIA#4167) - RxTask self-deadlock fix: stop() deferred until after map_lock released (PR NVIDIA#4204) - Lock contention reduction in produce_item() for concurrent model downloads (PR NVIDIA#4174) Hierarchical FL Startup Stability (new section): - Deployment timeout correctly classified as failure; min_sites check applied at deployment phase (PR NVIDIA#4209) - Startup grace period for dead-client detection (debounce default=true) (PR NVIDIA#4209) - Selective client exclusion on start-job timeout instead of full abort (PR NVIDIA#4209) - Hardened job metadata parsing: TypeError replaced with descriptive RuntimeError (PR NVIDIA#4209) - Recommended config snippets for HPC/Lustre environments (Frontier/ORNL scale) Bug Fixes section updated with all streaming and hierarchical startup fixes. Intro paragraph updated to reflect system hardening scope. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ctive client exclusion, and dead-detection debounce (#4209) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR #4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs #4206, #4204, #4174, #4172, #4186, #4211, #4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR #4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…ctive client exclusion, and dead-detection debounce (NVIDIA#4209) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR NVIDIA#4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs NVIDIA#4206, NVIDIA#4204, NVIDIA#4174, NVIDIA#4172, NVIDIA#4186, NVIDIA#4211, NVIDIA#4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR NVIDIA#4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…ent, hierarchical startup stability Add three major new sections to flare_272.rst covering work merged after the initial 2.7.2 draft: Memory Management (restructured): - Zero Tensor Copy at CJ process via LazyDownloadRef pass-through (PR NVIDIA#4210) - Client-side memory management: malloc_trim, jemalloc, torch.cuda.empty_cache injected after flare.send() without training script changes (PR NVIDIA#4211) - Retain existing TensorDownloader and server-side cleanup content F3 Streaming Reliability and Performance (new section): - HOL stall mitigation: bounded send_frame() timeout, ACK watchdog, stall detection/recovery with recommended env-var settings (PR NVIDIA#4206) - Stream pool starvation fix: blob callbacks dispatched to dedicated thread pool, preventing stream worker exhaustion (PR NVIDIA#4171/NVIDIA#4172) - Streaming download retry with exponential backoff on timeout (PR NVIDIA#4167) - RxTask self-deadlock fix: stop() deferred until after map_lock released (PR NVIDIA#4204) - Lock contention reduction in produce_item() for concurrent model downloads (PR NVIDIA#4174) Hierarchical FL Startup Stability (new section): - Deployment timeout correctly classified as failure; min_sites check applied at deployment phase (PR NVIDIA#4209) - Startup grace period for dead-client detection (debounce default=true) (PR NVIDIA#4209) - Selective client exclusion on start-job timeout instead of full abort (PR NVIDIA#4209) - Hardened job metadata parsing: TypeError replaced with descriptive RuntimeError (PR NVIDIA#4209) - Recommended config snippets for HPC/Lustre environments (Frontier/ORNL scale) Bug Fixes section updated with all streaming and hierarchical startup fixes. Intro paragraph updated to reflect system hardening scope. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ent, hierarchical startup stability Add three major new sections to flare_272.rst covering work merged after the initial 2.7.2 draft: Memory Management (restructured): - Zero Tensor Copy at CJ process via LazyDownloadRef pass-through (PR NVIDIA#4210) - Client-side memory management: malloc_trim, jemalloc, torch.cuda.empty_cache injected after flare.send() without training script changes (PR NVIDIA#4211) - Retain existing TensorDownloader and server-side cleanup content F3 Streaming Reliability and Performance (new section): - HOL stall mitigation: bounded send_frame() timeout, ACK watchdog, stall detection/recovery with recommended env-var settings (PR NVIDIA#4206) - Stream pool starvation fix: blob callbacks dispatched to dedicated thread pool, preventing stream worker exhaustion (PR NVIDIA#4171/NVIDIA#4172) - Streaming download retry with exponential backoff on timeout (PR NVIDIA#4167) - RxTask self-deadlock fix: stop() deferred until after map_lock released (PR NVIDIA#4204) - Lock contention reduction in produce_item() for concurrent model downloads (PR NVIDIA#4174) Hierarchical FL Startup Stability (new section): - Deployment timeout correctly classified as failure; min_sites check applied at deployment phase (PR NVIDIA#4209) - Startup grace period for dead-client detection (debounce default=true) (PR NVIDIA#4209) - Selective client exclusion on start-job timeout instead of full abort (PR NVIDIA#4209) - Hardened job metadata parsing: TypeError replaced with descriptive RuntimeError (PR NVIDIA#4209) - Recommended config snippets for HPC/Lustre environments (Frontier/ORNL scale) Bug Fixes section updated with all streaming and hierarchical startup fixes. Intro paragraph updated to reflect system hardening scope. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ent, hierarchical startup stability [skip ci] (#4218) ## Merge Dependency >⚠️ **Depends on #4209** — The *Hierarchical FL Startup Stability* section documents changes introduced by PR #4209 (currently open). **Merge PR #4209 into `2.7` before merging this PR.** All other sections cover already-merged PRs. --- ## Summary This PR updates `docs/release_notes/flare_272.rst` to reflect all major changes merged into the 2.7.x line after the initial 2.7.2 draft, covering three new areas: - **Memory Management** — restructured and expanded with Zero Tensor Copy at CJ (PR #4210) and client-side memory lifecycle management (PR #4211) - **F3 Streaming Reliability and Performance** — new section covering HOL stall mitigation (PR #4206), stream pool starvation fix (PR #4171/#4172), streaming download retry (PR #4167), RxTask self-deadlock fix (PR #4204), and lock contention reduction (PR #4174) - **Hierarchical FL Startup Stability** — new section covering deployment timeout classification, startup grace period, selective client exclusion, and metadata hardening (PR #4209 — pending merge), with recommended config snippets for HPC/Lustre environments The Bug Fixes section and intro paragraph are also updated accordingly. A source-level RST comment has been added above the Hierarchical FL section in the file to alert future maintainers to the merge dependency. ## Merged PRs Documented | PR | Area | Status | |---|---|---| | #4171 / #4172 | Stream pool starvation fix | Merged | | #4174 | Lock contention reduction | Merged | | #4167 | Streaming download retry | Merged | | #4204 | RxTask self-deadlock fix | Merged | | #4206 | HOL stall mitigation | Merged | | #4210 | Zero tensor copy at CJ | Merged | | #4211 | Client-side memory management | Merged | | #4209 | Hierarchical FL startup stability | **Open — merge before this PR** | ## Changes ### Memory Management (restructured) - **Zero Tensor Copy at CJ** (`ClientAPILauncherExecutor`): CJ now holds `LazyDownloadRef` placeholders instead of materializing full tensors, eliminating the CJ as a memory bottleneck for LLM-scale models. - **Client-Side Memory Management**: `gc.collect()` + `malloc_trim(0)` / jemalloc purge / `torch.cuda.empty_cache()` injected after every `flare.send()`, configurable via `client_memory_gc_rounds`. - Existing TensorDownloader and server-side cleanup content retained. ### F3 Streaming Reliability and Performance (new section) - **HOL Stall Mitigation**: Bounded `send_frame()` timeout, ACK-progress watchdog, and stall detection/recovery. Includes recommended environment variable settings for large hierarchical deployments. - **Stream Pool Starvation Fix**: Blob callbacks dispatched to a dedicated `callback_thread_pool`, keeping stream workers free for concurrent downloads. - **Streaming Download Retry**: Exponential-backoff retry (up to 3 attempts, capped at 60 s) on `TIMEOUT` errors; abort-signal aware. - **RxTask Self-Deadlock Fix**: `stop()` deferred until after `map_lock` released, eliminating stream-error-triggered deadlock. - **Lock Contention Reduction**: `produce_item()` runs outside `self.lock`; compare-and-store for cache write. Reduces model-download latency under high client concurrency. ### Hierarchical FL Startup Stability (new section — pending PR #4209) - **Deployment Timeout as Failure**: `reply=None` correctly counted against `min_sites`; timed-out clients excluded before `start_client_job`. - **Startup Grace Period**: Dead-client detection debounced — client must be observed once before absence triggers dead-job notification. Default changed to `True`. - **Selective Client Exclusion**: Stragglers at start-job excluded rather than causing full abort, if remaining count ≥ `min_clients`. - **Metadata Hardening**: `TypeError` on absent job metadata replaced with descriptive `RuntimeError`. - Recommended `config_fed_server.json` / `config_fed_client.json` snippets for HPC (Frontier/ORNL) scale. ## Test plan - [ ] Sphinx build (`make html`) passes without RST warnings on the updated file - [ ] All new cross-references (`.. code-block::`, `.. note::`) render correctly in the docs build - [ ] Verify section hierarchy (underline characters) is consistent throughout the file - [ ] Confirm PR #4209 is merged before this PR is merged 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…meouts, selective client exclusion, and dead-detection debounce (#4209) (#4288) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR #4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs #4206, #4204, #4174, #4172, #4186, #4211, #4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR #4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Fixes # . ### Description A few sentences describing the changes proposed in this pull request. ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated. Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
Summary
This PR introduces the pass-through architecture for
ClientAPILauncherExecutor, eliminating tensor materialisation at the CJ (Client Job) process when large models are exchanged between the FL server and a subprocess agent.In large-model federated learning (e.g., 7B–70B LLM fine-tuning), the CJ process today acts as a blind relay that fully deserializes and re-serializes every tensor it receives from the FL server before forwarding to the subprocess. For a 70B float16 model, this consumes ~140 GB of CJ memory and requires two complete network transfers. B1 pass-through removes both costs.
Problem Description
NVFlare's multi-hop execution path for
launch_external_process=Truelooks like:Before this PR
Each tensor in the global model is handled as follows at CJ:
For large models, this means:
This is the reason why workflows are infeasible for large models beyond what the CJ machine can hold.
Proposed Architecture and Solution
Pass-Through Architecture
With
FOBSContextKey.PASS_THROUGHenabled on CJ's cell FOBS context, the data path becomes:CJ holds only lightweight placeholders (< 100 bytes per tensor). The subprocess downloads each tensor directly from the FL server — CJ is never involved in the tensor data path.
Key Components
1.
FOBSContextKey.PASS_THROUGH(nvflare/fuel/utils/fobs/__init__.py)A new context key that signals ViaDownloaderDecomposer to skip the download step and create lazy placeholders instead.
2.
LazyDownloadRef(via_downloader.py)A small sentinel object (four fields:
fqcn,ref_id,item_id,dot) created byrecompose()in PASS_THROUGH mode. It carries the original FL server's FQCN, batch ref_id, intra-batch item ID, and Datum Object Type — everything the subprocess needs to download the tensor directly.3.
_LazyBatchInfo(via_downloader.py)A named sentinel stored in
fobs_ctx[items_key]during PASS_THROUGH receive. Using a typed class (rather than a plain tuple) makes the PASS_THROUGH branch unambiguous and immune to accidental type collisions with real item dicts.4.
LazyDownloadRefDecomposer(via_downloader.py)A new auto-registered FOBS decomposer for
LazyDownloadRef. When CJ re-serializes a task containingLazyDownloadRefobjects:decompose()delegates toget_dot_handler(lazy.dot)— the originalViaDownloaderDecomposersubclass (e.g.,TensorDecomposer,NumpyArrayDecomposer). That handler's_finalize_lazy_batchpost-callback re-emits the original server datum (fqcn + ref_id + DOT) so the subprocess knows exactly where to download from.lazy_dotis appended to the encoding dict for routing on the receive side.recompose()useslazy_dotto look up the handler and delegates tohandler.recompose(), which retrieves the real tensor fromfobs_ctx[handler.items_key](populated byprocess_datum()when the subprocess received the forwarded datum).The
dot(Datum Object Type) field on bothLazyDownloadRefand_LazyBatchInfoensures that numpy arrays stay withNumpyArrayDecomposerand PyTorch tensors stay withTensorDecomposer, preserving type safety through the full pass-through hop.5.
ClientAPILauncherExecutor.initialize()(client_api_launcher_executor.py)On startup, the executor enables PASS_THROUGH on the engine cell's FOBS context:
This single line activates the full B1 architecture for every job that uses
launch_external_process=True— includingllm_hfand any recipe that callsScriptRunner(launch_external_process=True).Analysis
Why not intercept at the pipe level?
The pipe (CellPipe) operates on already-serialized bytes. Intercepting at the pipe level would require parsing FOBS binary format, re-writing datum references, and re-assembling the byte stream — fragile and tightly coupled to the wire format.
Intercepting at the FOBS decomposer level is the natural extension point: decomposers already control exactly when and how data is materialised. PASS_THROUGH simply adds a "don't materialise" branch to that existing mechanism.
Why
dot(Datum Object Type) onLazyDownloadRef?The subprocess must know which
ViaDownloaderDecomposersubclass owns the downloaded data so it can store it in the correctfobs_ctx[items_key]and routerecompose()correctly. Thedotfield, set when the server originally serialized the tensor, carries this type information through the pass-through hop without any type-switching logic.Memory profile at CJ
Benefits
launch_external_process=Trueautomatically activates B1.dotpropagation preserves tensor type (numpy / pytorch) through the hop without any if/elif type switching.Backward Compatibility
launch_external_process=Trueautomatically benefit. No config or script changes required.launch_external_process=False(in-process executor) are completely unaffected —ClientAPILauncherExecutor.initialize()is not called.PASS_THROUGH— behaviour is identical to before.LazyDownloadRefDecomposeris auto-registered via the existingregister_foldermechanism; no explicit registration call is needed by any caller.Changed Files
nvflare/fuel/utils/fobs/__init__.pyFOBSContextKey.PASS_THROUGHnvflare/fuel/utils/fobs/decomposers/via_downloader.pyLazyDownloadRef,_LazyBatchInfo, PASS_THROUGH branches inprocess_datum()/recompose(),LazyDownloadRefDecomposer,_finalize_lazy_batchpost-callbacknvflare/app_common/executors/client_api_launcher_executor.pyinitialize()enables PASS_THROUGH on engine cellTest Strategy
Unit tests —
tests/unit_test/fuel/utils/fobs/test_pass_through.py(22 tests)TestLazyDownloadRef__slots__, per-item distinctnessTestLazyBatchInfo__slots__,isinstancereliability vs plain tupleTestProcessDatumPassThrough_LazyBatchInfo, never calls_download_from_remote_cell; normal mode calls downloadTestRecomposePassThroughLazyDownloadRefwith correctfqcn,ref_id,item_idfrom_LazyBatchInfoTestDecomposeWithLazyDownloadRef_finalize_lazy_batchpost-CB registered once per batch regardless of item count; emitted datum has correct fqcn/ref_id/DOTTestNoMemoryAccumulation_CtxKey.OBJECTSabsent after PASS_THROUGH (no download transaction opened);DownloadService._tx_tableunchanged; 50-cycle repeat produces no state bleedEnd-to-end tests —
tests/unit_test/fuel/f3/streaming/test_pass_through_e2e.py(5 tests, real TCP Cells)test_arrays_survive_pass_through_hoptest_cj_holds_only_lazy_refs_not_tensor_dataLazyDownloadRef, nevernp.ndarraytest_cj_creates_no_download_transactionDownloadService._tx_tableis unchanged during PASS_THROUGH + re-serializationtest_forwarded_payload_carries_original_server_reffqcnandref_id— subprocess downloads from server, not CJtest_multiple_array_roundtripIntegration test —
tests/integration_test/data/jobs/pt_large_model_pass_through/Full-stack integration test using
PTClientAPILauncherExecutorwithlaunch_once=True(the pattern used byllm_hf):LargeNet— 3-layer MLP with ~8 MB of float32 parameters, well above the 2 MB ViaDownloaderDecomposer streaming threshold. This forces the real B1 code path (streaming + PASS_THROUGH) rather than the native inline path used by small models.llm_hf/client.pystructure (while flare.is_running()loop, receive / train / send). Uses CPU-only synthetic data — no dataset download required in CI.client_api.ymlas"run pt-large-model-pass-through".🤖 Generated with Claude Code