[2.7] Avoid self-message deadlock for local swarm result submission#4186
Conversation
|
/build |
Greptile OverviewGreptile SummaryFixes deadlock in swarm learning when a trainer submits results to itself as aggregator with tensor streaming enabled. Key Changes:
Problem Addressed: Solution Approach: Confidence Score: 5/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Trainer as Trainer (site-1)
participant Aggregator as Aggregator (site-1)
participant Engine as FL Engine
participant Gatherer as Result Gatherer
Note over Trainer,Aggregator: Case: aggr == self.me (Self-Aggregation)
Trainer->>Engine: send_aux_request(request_to_submit)
Note right of Engine: Permission request (safe - no blocking)
Engine->>Aggregator: _process_submission_request
Aggregator->>Gatherer: can_accept_submission()
Gatherer-->>Aggregator: OK
Aggregator-->>Engine: make_reply(OK)
Engine-->>Trainer: Permission granted
alt OLD PATH (Deadlock Risk)
Trainer->>Engine: broadcast_and_wait(result)
Note right of Engine: Synchronous self-message
Engine->>Trainer: _process_learn_result (SAME THREAD)
Note over Trainer: DEADLOCK if TensorStreamer blocks
end
alt NEW PATH (Fixed)
Trainer->>Trainer: Clone fl_ctx + set_peer_context
Trainer->>Trainer: _process_learn_result(result, local_fl_ctx)
Note right of Trainer: Direct local call - NO network
Trainer->>Gatherer: gather(result)
Gatherer-->>Trainer: make_reply(OK)
end
Last reviewed commit: de48932 |
…VIDIA#4186) ## Summary - avoid synchronous self-message path when trainer submits learn result to itself (aggr == self.me) - process local submission via _process_learn_result with local peer context, while keeping remote path unchanged - add unit coverage to verify local self-aggregation submission does not call broadcast_and_wait ## Problem PR NVIDIA#4141 fixed self-message deadlock in _scatter, but result submission in do_learn_task still used broadcast_and_wait(targets=[aggr]). When aggr == self.me with tensor streaming enabled, this can deadlock in synchronous self-message processing. ## Test Plan - added focused unit test in tests/unit_test/app_common/ccwf/test_swarm_self_message_deadlock.py - validated syntax locally for modified files - full pytest not run in this environment (pytest not available)
## Summary - Cherry-pick of #4174: reduce lock scope in `Cacheable._get_item` — `produce_item` now runs outside the lock so concurrent receivers aren't blocked - Cherry-pick of #4186: avoid self-message deadlock when swarm trainer submits learn result to itself — local submission bypasses `broadcast_and_wait`, adds unit test coverage
…ctive client exclusion, and dead-detection debounce (#4209) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR #4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs #4206, #4204, #4174, #4172, #4186, #4211, #4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR #4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…ctive client exclusion, and dead-detection debounce (NVIDIA#4209) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR NVIDIA#4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs NVIDIA#4206, NVIDIA#4204, NVIDIA#4174, NVIDIA#4172, NVIDIA#4186, NVIDIA#4211, NVIDIA#4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR NVIDIA#4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…meouts, selective client exclusion, and dead-detection debounce (#4209) (#4288) ## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR #4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs #4206, #4204, #4174, #4172, #4186, #4211, #4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR #4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Fixes # . ### Description A few sentences describing the changes proposed in this pull request. ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated. Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
Summary
Problem
PR #4141 fixed self-message deadlock in _scatter, but result submission in do_learn_task still used broadcast_and_wait(targets=[aggr]). When aggr == self.me with tensor streaming enabled, this can deadlock in synchronous self-message processing.
Test Plan