Skip to content

[2.7] Fix Swarm deadlock: _executing guard prevents pipe handler replacement mid-transaction#4314

Merged
chesterxgchen merged 12 commits intoNVIDIA:2.7from
YuanTingHsieh:fix_task_exchanger_executing_guard
Mar 13, 2026
Merged

[2.7] Fix Swarm deadlock: _executing guard prevents pipe handler replacement mid-transaction#4314
chesterxgchen merged 12 commits intoNVIDIA:2.7from
YuanTingHsieh:fix_task_exchanger_executing_guard

Conversation

@YuanTingHsieh
Copy link
Collaborator

Problem

BEFORE_TASK_EXECUTION is a global event broadcast to all FLComponents. In a CCWF Swarm aggregator node, receiving swarm_report_learn_result aux tasks from other sites while the local subprocess is training fires this event concurrently. This causes TaskExchanger.handle_event() to stop and recreate the PipeHandler while execute() is blocked in its polling loop — the old handler (that the subprocess is writing to) is orphaned, and the polling loop reads from the new empty handler forever. Silent deadlock, no error, no PEER_GONE, no timeout.

Affects all pipe types (FilePipe, CellPipe). Independent of the FilePipe TOCTOU fix in #4296.

Fix

  • Add threading.Event _executing as a guard flag:

    handle_event(BEFORE_TASK_EXECUTION) skips handler replacement when _executing.is_set()

  • TaskExchanger.execute() uses an ownership pattern so super().execute() from LauncherExecutor does not prematurely clear the flag

  • LauncherExecutor.execute() sets the flag at the very top — before _initialize_external_execution() — covering the up-to-60 s _wait_external_setup() window

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

…acement mid-transaction

BEFORE_TASK_EXECUTION is a global broadcast to all FLComponents. In a CCWF
Swarm aggregator node, receiving swarm_report_learn_result aux tasks from other
sites while the local subprocess is training fires this event concurrently,
causing TaskExchanger.handle_event() to stop and recreate the PipeHandler while
execute() is blocked in its polling loop. The new handler's queue is empty so
get_next() returns None forever — a silent deadlock with no error or timeout.

Fix: add threading.Event _executing as a guard flag. handle_event skips pipe
handler replacement when the flag is set. TaskExchanger.execute() uses an
ownership pattern (acquired = not is_set()) so it only clears the flag if it
set it, preserving the flag across the super().execute() call from
LauncherExecutor. LauncherExecutor.execute() sets the flag unconditionally at
the top — before _initialize_external_execution() — covering the up-to-60 s
_wait_external_setup() window that the base-class guard would miss.

Affects all pipe types (FilePipe, CellPipe); independent of the FilePipe TOCTOU
fix in NVIDIA#4296.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 13, 2026 18:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a Swarm/CCWF deadlock where concurrent BEFORE_TASK_EXECUTION events could reset TaskExchanger’s PipeHandler mid-transaction, orphaning the handler the subprocess is writing to and causing the executor to poll an empty handler forever.

Changes:

  • Add a _executing (threading.Event) guard to TaskExchanger to skip pipe-handler reset while execute() is in progress, and refactor execute() to use an ownership pattern.
  • Update LauncherExecutor.execute() to set _executing at the very start (covering external init/setup) and clear it in a finally.
  • Add unit tests covering _executing lifecycle, handler-reset suppression, and initialization-time concurrency behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
nvflare/app_common/executors/task_exchanger.py Introduces _executing guard + execute() ownership refactor to prevent handler reset during execution.
nvflare/app_common/executors/launcher_executor.py Sets _executing at the start of execute() (before external init) and clears it at the end.
tests/unit_test/app_common/executors/client_api_launcher_executor_test.py Adds regression/unit tests for _executing behavior and concurrent BEFORE_TASK_EXECUTION handling.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 13, 2026

Greptile Summary

This PR fixes a silent Swarm deadlock where BEFORE_TASK_EXECUTION global broadcast events from concurrent aux tasks (e.g., swarm_report_learn_result) were stopping and recreating the PipeHandler mid-execution, orphaning the pipe that the local subprocess was actively writing to. The polling loop in _do_execute() would then read from the new, permanently empty handler — no error, no PEER_GONE, no timeout.

Key changes:

  • task_exchanger.py: Adds _executing (threading.Event) + _executing_lock (threading.Lock). handle_event(BEFORE_TASK_EXECUTION) checks the flag under the lock and skips handler replacement when set. execute() is split into a lock-protected acquire/release wrapper and _do_execute(), implementing an ownership pattern so that super().execute() from LauncherExecutor does not prematurely clear the flag.
  • launcher_executor.py: Calls _executing.set() as the very first statement in execute() (before _initialize_external_execution()), covering the entire 60-second _wait_external_setup() window. Clears unconditionally in a finally block.
  • pipe_handler.py: Two companion fixes — an additional asked_to_stop check after time.sleep() in _try_read() reduces stop latency, and _read() now suppresses spurious error logs and PEER_GONE messages when the handler is intentionally stopped during replacement.
  • Tests: 9 new unit tests covering flag lifecycle, ownership semantics, and the concurrent BEFORE_TASK_EXECUTION scenario. The thread-alive assertion after join() is correctly included.

One style observation: In pipe_handler._read(), exceptions from _try_read() are completely silenced (no logging at any level) when asked_to_stop=True. A debug-level log would preserve diagnosability for unexpected exceptions without reintroducing the spurious ERROR noise the fix is designed to eliminate.

Confidence Score: 4/5

  • Safe to merge — the fix correctly addresses the described Swarm deadlock and all previously flagged review concerns have been resolved in this iteration.
  • The core _executing guard logic is sound: the lock-protected check-and-set in TaskExchanger.execute() is atomic, the ownership pattern ensures super().execute() from LauncherExecutor does not prematurely clear the flag, and the PipeHandler companion fixes eliminate spurious PEER_GONE noise. Previous round's review comments (lock placement, log_debug outside lock, unused variable captures, thread-alive assertion) have all been addressed. One point off: LauncherExecutor.execute() calls _executing.set() and _executing.clear() without the lock, leaving a narrow theoretical race window between BEFORE_TASK_EXECUTION's lock-release and execute()'s first statement — acceptable in practice since LauncherExecutor sets the flag immediately as the very first statement, and threading.Event operations are themselves thread-safe.
  • No files require special attention; nvflare/app_common/executors/task_exchanger.py and launcher_executor.py carry the core concurrency logic and should be reviewed together when making future changes to the event handling or execute() flow.

Important Files Changed

Filename Overview
nvflare/app_common/executors/task_exchanger.py Adds _executing threading.Event + _executing_lock guard: handle_event(BEFORE_TASK_EXECUTION) checks the flag under the lock and skips handler replacement when execution is in progress; execute() is split into a lock-protected acquire/release wrapper and _do_execute() for ownership semantics. Log-debug message placed outside the lock. Logic is correct for the primary Swarm deadlock scenario.
nvflare/app_common/executors/launcher_executor.py Wraps the execute() body in a try/finally that calls _executing.set() before _initialize_external_execution() and _executing.clear() in finally, so the guard covers the entire 60-s _wait_external_setup() window. The set/clear are called without _executing_lock (intentional — they are atomic threading.Event operations and _executing_lock is only needed for the check-and-set atomic sequence in the base class).
nvflare/fuel/utils/pipe/pipe_handler.py Two companion fixes: (1) redundant asked_to_stop check after time.sleep() in _try_read() reduces worst-case stop latency from 2× to 1× read_interval; (2) _read() now suppresses error log and PEER_GONE message when asked_to_stop is already True, preventing spurious error noise when an orphaned handler is intentionally stopped during BEFORE_TASK_EXECUTION replacement. Both changes are correct.
tests/unit_test/app_common/executors/client_api_launcher_executor_test.py Adds 9 new tests covering the _executing guard: flag lifecycle (set/clear/exception), ownership pattern (super() doesn't clear flag), and the key concurrent scenario (BEFORE_TASK_EXECUTION blocked during _initialize_external_execution). Thread-alive assertion is correctly included after join(). _create_pipe_handler is monkeypatched where needed to avoid spawning real background threads against a MagicMock pipe.

Sequence Diagram

sequenceDiagram
    participant FW as FL Framework
    participant LE as LauncherExecutor.execute()
    participant TE as TaskExchanger.execute()
    participant HE as handle_event(BEFORE_TASK_EXECUTION)
    participant PH as PipeHandler

    Note over FW: Local training task assigned
    FW->>HE: fire BEFORE_TASK_EXECUTION (local task)
    HE->>HE: _executing_lock: is_set()=False → skip=False
    HE->>PH: stop(old_handler, close_pipe=False)
    HE->>PH: _create_pipe_handler() → new_handler
    HE->>PH: new_handler.start()

    FW->>LE: execute(train_task)
    LE->>LE: _executing.set()  [no lock needed]
    LE->>LE: _initialize_external_execution() [up to 60s]

    Note over FW: Concurrent aux task from swarm peer
    FW-->>HE: fire BEFORE_TASK_EXECUTION (aux task)
    HE->>HE: _executing_lock: is_set()=True → skip=True
    HE-->>HE: log_debug + return (handler NOT replaced ✓)

    LE->>TE: super().execute()
    TE->>TE: _executing_lock: acquired=False (already set)
    TE->>PH: _do_execute() — polls new_handler
    PH-->>TE: result from subprocess
    TE-->>LE: return result
    LE->>LE: _finalize_external_execution()
    LE->>LE: finally: _executing.clear()
Loading

Comments Outside Diff (1)

  1. nvflare/fuel/utils/pipe/pipe_handler.py, line 328-333 (link)

    Silent exception swallowing when stopped

    When asked_to_stop is True, the exception from _try_read() is silently discarded with no logging at any level. In the expected case (read error on an intentionally orphaned handler), this is correct. However, if a programming error inside _try_read() raises an unexpected exception (e.g., a logic bug introduced later), it would be completely invisible whenever the handler happens to be stopped at the same moment.

    Consider adding a debug-level log so that unexpected exceptions remain diagnosable without re-introducing the spurious ERROR noise the fix is trying to eliminate:

Last reviewed commit: fa204de

YuanTingHsieh and others added 2 commits March 13, 2026 13:01
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The bare read-then-write on threading.Event was not atomic: a concurrent
BEFORE_TASK_EXECUTION on the CellNet thread could pass the is_set() check
between steps (1) read and (2) set in TaskExchanger.execute(), allowing the
handler to be replaced in exactly the window the guard was meant to prevent.

Add threading.Lock _executing_lock and hold it for both the check-and-set in
execute() and the guard check in handle_event(BEFORE_TASK_EXECUTION), making
the two operations mutually exclusive.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@YuanTingHsieh
Copy link
Collaborator Author

/build

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
…tor_test.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@YuanTingHsieh
Copy link
Collaborator Author

/build

chesterxgchen
chesterxgchen previously approved these changes Mar 13, 2026
@chesterxgchen
Copy link
Collaborator

/build

@YuanTingHsieh
Copy link
Collaborator Author

/build

@YuanTingHsieh
Copy link
Collaborator Author

/build

@chesterxgchen chesterxgchen merged commit e76d801 into NVIDIA:2.7 Mar 13, 2026
19 checks passed
@YuanTingHsieh YuanTingHsieh deleted the fix_task_exchanger_executing_guard branch March 13, 2026 22:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants