Skip to content

Commit f8efaeb

Browse files
YuanTingHsiehchesterxgchenclaudepcnudde
authored
Cherry-pick [2.7] Fix hierarchical FL startup failures: deployment timeouts, selective client exclusion, and dead-detection debounce (#4209) (#4288)
## Problem Large-scale hierarchical FL jobs (e.g. BERT NER, 144 clients, 6 relays on Frontier) abort in Round 0 due to a cascading startup failure chain. The root sequence is: 1. F3 streaming HOL stall (PR #4206) delays deployment ACKs from relay-connected clients 2. **`_deploy_job()`** treats `reply=None` (timeout) as `"unknown"` — not a failure — so timed-out clients silently appear to have been deployed 3. **`_start_run()`** tries to start those clients; they again time out, and `check_client_replies()` ignores the `None` reply 4. **`_sync_client_jobs()`** fires dead-job notification on the very first heartbeat with no startup grace period 5. FedAvg requires 144/144 — one or two missing clients → abort 6. A late-starting CJ crashes with `TypeError: 'NoneType' object is not iterable` when `get_job_clients()` receives `None` metadata from an already-aborted job PRs #4206, #4204, #4174, #4172, #4186, #4211, #4210 (all merged in 2.7.2) address the transport layer. This PR addresses the remaining job lifecycle layer. --- ## Fixes Included ### 1 — `_deploy_job()`: Treat deployment timeout as failure (`job_runner.py`) **Root bug**: `reply=None` was logged as `"unknown"` and excluded from `failed_clients`, so timed-out clients counted as "successfully deployed" for the `min_sites` check. **Fix**: Add timed-out clients to `failed_clients` with a `"deployment timeout"` label. The existing `min_sites` / `required_sites` logic then correctly decides whether to abort. ### 2 — `check_client_replies()`: Return timed-out clients instead of raising (`admin.py`) **Root bug**: In strict mode, any timeout raised immediately, aborting the whole job even when the remaining active clients satisfied `min_sites`. **Fix**: In strict mode, collect timed-out clients into a return list rather than raising. Explicit errors (non-OK return code or error body) still raise. Also fixes the non-strict mode to use name-keyed dict lookup instead of fragile positional `zip()`. New signature: `check_client_replies(...) -> List[str]` (timed-out client names; empty = none). ### 3 — `_start_run()`: Selective exclusion with min_sites re-evaluation (`job_runner.py`) **Root bug**: A start-job timeout under strict mode aborted the entire job with no tolerance for stragglers within `min_sites` bounds. **Fix**: Use the returned timed-out list from `check_client_replies()`. If remaining active clients >= `min_sites`, log a warning and proceed. Only abort when below tolerance. ### 4 — `_sync_client_jobs()`: Require-prior-report default changed to `True` (`fed_server.py`) **Root bug**: `SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT` defaulted to `False`, meaning the bug fix was opt-in and the unsafe behaviour remained the default. **Fix**: Default changed to `True`. Operators who want the aggressive legacy detection can set it to `False` explicitly. ### 5 — `_sync_client_jobs()`: Move `_reported_clients` out of `job_info` dict (`fed_server.py`) **Root bug**: Positive-observation tracking was stored as `job_info["_reported_clients"]`, injecting algorithm state into a data dict with no corresponding `RunProcessKey` constant. **Fix**: Tracking moved to `self._job_reported_clients: Dict[str, set]` on `FederatedServer`. Stale entries are purged whenever a job is no longer in `run_processes`. ### 6 — `ClientRunManager.get_job_clients()`: Explicit meta validation (`client_run_manager.py`) Raises `RuntimeError` with a descriptive message instead of an opaque `TypeError` when `JOB_CLIENTS` is absent or the wrong type. --- ## Configuration Recommendations (No Code Change Needed) | Setting | Recommended value | Effect | |---|---|---| | `FedAvg(min_clients=...)` | 96-98% of `num_clients` | Tolerates a few startup stragglers | | `runner_sync_timeout` | `120` s | Allows Lustre-backed deployments time to complete | | `strict_start_job_reply_check` | `true` | Start-job timeouts surfaced, straggler clients excluded | | `sync_client_jobs_require_previous_report` | `true` (now the default) | Prevents premature dead-job from startup delay | | `SFM_CLOSE_STALLED_CONNECTION` (PR #4206) | `true` after staging | Disconnects stalled relay connections | --- ## Files Changed - `nvflare/private/fed/server/job_runner.py` — `_deploy_job()` timeout as failure; `_start_run()` selective exclusion - `nvflare/private/fed/server/admin.py` — `check_client_replies()` returns timed-out list; dict-keyed non-strict path - `nvflare/private/fed/server/fed_server.py` — `_sync_client_jobs()` default `True`; `_job_reported_clients` attr; stale cleanup - `nvflare/private/fed/client/client_run_manager.py` — explicit meta validation in `get_job_clients()` --- ## Test Coverage New and updated unit tests with both positive and negative cases: | File | Tests | What they cover | |---|---|---| | `admin_test.py` | 8 | Timeout returned not raised; dict lookup; error still raises; reorder OK | | `job_runner_test.py` | 4 | strict flag wiring; timeout within tolerance → warn; timeout below tolerance → raise | | `job_runner_deploy_test.py` | 9 (new file) | Timeout counted as failure; OK reply not failed; mixed outcomes; detail label; min_sites with timeouts; integration sequence | | `fed_server_test.py` | 5 | Default requires-prior-report; legacy explicit-False still fires; tracking in server attr not job_info; stale cleanup | All 29 targeted unit tests pass. ## Test Plan - [x] Unit tests for each changed function (positive + negative) - [x] New `job_runner_deploy_test.py` covering deployment timeout classification end-to-end - [x] All 29 targeted unit tests pass - [ ] Hierarchical staging run with all flags at default - [ ] Hierarchical staging run with `strict_start_job_reply_check=true` and reduced `min_clients` - [ ] Verify no regression on standard (non-hierarchical) FL jobs --------- Fixes # . ### Description A few sentences describing the changes proposed in this pull request. ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated. Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
1 parent 60c917a commit f8efaeb

File tree

13 files changed

+1261
-25
lines changed

13 files changed

+1261
-25
lines changed

docs/programming_guide/timeouts.rst

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2519,6 +2519,45 @@ application.conf Settings
25192519
# Shutdown
25202520
end_run_readiness_timeout = 10.0
25212521
2522+
# Server startup/dead-job safety flags
2523+
strict_start_job_reply_check = false
2524+
sync_client_jobs_require_previous_report = true
2525+
2526+
2527+
.. _server_startup_dead_job_safety_flags:
2528+
2529+
Server Startup and Dead-Job Safety Flags
2530+
----------------------------------------
2531+
2532+
These ``application.conf`` flags are server-side safety controls used during job startup
2533+
and client heartbeat synchronization:
2534+
2535+
.. list-table::
2536+
:header-rows: 1
2537+
:widths: 36 12 52
2538+
2539+
* - Parameter
2540+
- Default
2541+
- Purpose
2542+
* - strict_start_job_reply_check
2543+
- false
2544+
- Enables strict START_JOB reply validation (detects missing/timeout replies and non-OK return codes).
2545+
* - sync_client_jobs_require_previous_report
2546+
- true
2547+
- Requires a prior positive heartbeat report before treating "missing job on client" as a dead-job signal.
2548+
2549+
Recommended usage:
2550+
2551+
- ``strict_start_job_reply_check`` defaults to ``false`` for backward compatibility.
2552+
Enable it (``true``) for large-scale or hierarchical deployments where startup timeouts
2553+
are expected and you want the server to proceed with the subset of clients that responded,
2554+
rather than failing the entire job. With ``false``, a timed-out reply is treated as a
2555+
silent success, which can mask startup problems.
2556+
- Keep ``sync_client_jobs_require_previous_report=true`` (default) to prevent false
2557+
dead-job reports during startup races and transient heartbeat delays.
2558+
- Set ``sync_client_jobs_require_previous_report=false`` only to restore legacy behavior
2559+
where the first missing-job heartbeat immediately triggers dead-job detection.
2560+
25222561

25232562
Admin Client Session (Python API)
25242563
---------------------------------

docs/user_guide/timeout_troubleshooting.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,17 @@ Via Configuration Files
214214
get_task_timeout = 300.0
215215
submit_task_result_timeout = 300.0
216216
217+
# Server startup/dead-job safety flags
218+
strict_start_job_reply_check = false
219+
sync_client_jobs_require_previous_report = true
220+
221+
Server-side safety flags guidance (see :ref:`server_startup_dead_job_safety_flags` for full details):
222+
223+
- ``strict_start_job_reply_check`` (default ``false``): keep default for backward-compatible startup behavior;
224+
set to ``true`` to enforce stricter START_JOB reply checks.
225+
- ``sync_client_jobs_require_previous_report`` (default ``true``): keep enabled to avoid false dead-job reports
226+
caused by transient startup or sync races.
227+
217228
**comm_config.json** (system-level, in startup kit):
218229

219230
.. code-block:: json

nvflare/apis/fl_constant.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,12 @@ class ConfigVarName:
535535
# server: wait this long since job schedule time before starting to check dead/disconnected clients
536536
DEAD_CLIENT_CHECK_LEAD_TIME = "dead_client_check_lead_time"
537537

538+
# server: require all start-job replies to be non-timeout and OK before considering the run started
539+
STRICT_START_JOB_REPLY_CHECK = "strict_start_job_reply_check"
540+
541+
# server: require prior positive job observation before reporting "missing job on client" as dead-job
542+
SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT = "sync_client_jobs_require_previous_report"
543+
538544
# customized nvflare decomposers module name
539545
DECOMPOSER_MODULE = "nvflare_decomposers"
540546

nvflare/private/fed/client/client_run_manager.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,15 @@ def get_job_clients(self, fl_ctx: FLContext):
320320
321321
"""
322322
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
323+
if not isinstance(job_meta, dict):
324+
raise RuntimeError(f"invalid job meta type: expected dict but got {type(job_meta)}")
325+
323326
job_clients = job_meta.get(JobMetaKey.JOB_CLIENTS)
327+
if job_clients is None:
328+
raise RuntimeError(f"missing {JobMetaKey.JOB_CLIENTS} in job meta")
329+
if not isinstance(job_clients, list):
330+
raise RuntimeError(f"invalid {JobMetaKey.JOB_CLIENTS} type: expected list but got {type(job_clients)}")
331+
324332
self.all_clients = [from_dict(d) for d in job_clients]
325333
for c in self.all_clients:
326334
self.name_to_clients[c.name] = c

nvflare/private/fed/server/admin.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from nvflare.fuel.hci.server.hci import AdminServer
3333
from nvflare.fuel.hci.server.login import LoginModule, SessionManager
3434
from nvflare.fuel.sec.audit import Auditor, AuditService
35-
from nvflare.private.admin_defs import Message
35+
from nvflare.private.admin_defs import Message, MsgHeader, ReturnCode
3636
from nvflare.private.defs import ERROR_MSG_PREFIX, RequestHeader
3737
from nvflare.private.fed.server.message_send import ClientReply, send_requests
3838

@@ -77,20 +77,70 @@ def __init__(self, client, req: Message):
7777
self.req = req
7878

7979

80-
def check_client_replies(replies: List[ClientReply], client_sites: List[str], command: str):
80+
def check_client_replies(
81+
replies: List[ClientReply], client_sites: List[str], command: str, strict: bool = False
82+
) -> List[str]:
83+
"""Check client replies for errors.
84+
85+
Args:
86+
replies: list of client replies
87+
client_sites: list of expected client names
88+
command: command description for error messages
89+
strict: if True, detect timed-out clients (reply=None) and return them as a list
90+
rather than raising. Explicit errors (non-OK return code or error body)
91+
always raise regardless of this flag.
92+
93+
Returns:
94+
List of client names whose reply was None (timed out). Only populated when
95+
strict=True; always empty when strict=False.
96+
97+
Raises:
98+
RuntimeError: if no replies were received, reply count mismatches, structurally
99+
missing replies (strict mode), or any client returned an explicit error.
100+
"""
81101
display_sites = ", ".join(client_sites)
82102
if not replies:
83103
raise RuntimeError(f"Failed to {command} to the clients {display_sites}: no replies.")
84104
if len(replies) != len(client_sites):
85105
raise RuntimeError(f"Failed to {command} to the clients {display_sites}: not enough replies.")
86106

87107
error_msg = ""
88-
for r, client_name in zip(replies, client_sites):
89-
if r.reply and ERROR_MSG_PREFIX in r.reply.body:
90-
error_msg += f"\t{client_name}: {r.reply.body}\n"
91-
if error_msg != "":
108+
timed_out_clients = []
109+
replies_by_client = {r.client_name: r for r in replies}
110+
111+
if strict:
112+
missing_clients = [c for c in client_sites if c not in replies_by_client]
113+
if missing_clients:
114+
raise RuntimeError(
115+
f"Failed to {command} to the clients {display_sites}: missing replies from {missing_clients}."
116+
)
117+
118+
for client_name in client_sites:
119+
r = replies_by_client[client_name]
120+
if not r.reply:
121+
# Timeout: record and continue — caller decides whether to exclude or abort.
122+
timed_out_clients.append(client_name)
123+
continue
124+
125+
return_code = r.reply.get_header(MsgHeader.RETURN_CODE, ReturnCode.OK)
126+
if return_code != ReturnCode.OK:
127+
detail = r.reply.body if r.reply.body else f"return code {return_code}"
128+
error_msg += f"\t{client_name}: {detail}\n"
129+
continue
130+
131+
if isinstance(r.reply.body, str) and r.reply.body.startswith(ERROR_MSG_PREFIX):
132+
error_msg += f"\t{client_name}: {r.reply.body}\n"
133+
else:
134+
for client_name in client_sites:
135+
r = replies_by_client.get(client_name)
136+
if r and r.reply and isinstance(r.reply.body, str) and r.reply.body.startswith(ERROR_MSG_PREFIX):
137+
error_msg += f"\t{client_name}: {r.reply.body}\n"
138+
139+
if error_msg:
92140
raise RuntimeError(f"Failed to {command} to the following clients: \n{error_msg}")
93141

142+
return timed_out_clients
143+
94144

95145
class FedAdminServer(AdminServer):
96146
def __init__(

nvflare/private/fed/server/fed_server.py

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,13 @@ def __init__(
343343
self.name_to_reg = {}
344344
self.cred_keeper = CredKeeper()
345345

346+
# Tracks per-job which client tokens have been positively observed running the job.
347+
# Keyed by job_id -> set of client tokens. Used by _sync_client_jobs() to require
348+
# a prior positive heartbeat before classifying a client's missing job as "dead".
349+
# Entries are cleaned up as soon as the job is no longer in run_processes.
350+
self._job_reported_clients: Dict[str, set] = {}
351+
self._job_reported_clients_lock = threading.Lock()
352+
346353
# these are used when the server sends a message to itself.
347354
self.my_own_auth_client_name = "server"
348355
self.my_own_token = "server"
@@ -795,21 +802,63 @@ def client_heartbeat(self, request: Message) -> Message:
795802
def _sync_client_jobs(self, request, client_token):
796803
# jobs that are running on client but not on server need to be aborted!
797804
client_jobs = request.get_header(CellMessageHeaderKeys.JOB_IDS)
798-
server_jobs = self.engine.run_processes.keys()
799-
jobs_need_abort = list(set(client_jobs).difference(server_jobs))
800-
801-
# also check jobs that are running on server but not on the client
802-
jobs_on_server_but_not_on_client = list(set(server_jobs).difference(client_jobs))
803-
if jobs_on_server_but_not_on_client:
804-
# notify all the participating clients these jobs are not running on server anymore
805-
for job_id in jobs_on_server_but_not_on_client:
806-
job_info = self.engine.run_processes[job_id]
805+
if not isinstance(client_jobs, (list, tuple, set)):
806+
client_jobs = []
807+
808+
client_jobs = set(client_jobs)
809+
server_jobs = set(self.engine.run_processes.keys())
810+
jobs_need_abort = list(client_jobs.difference(server_jobs))
811+
812+
require_previous_report = ConfigService.get_bool_var(
813+
name=ConfigVarName.SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT,
814+
conf=SystemConfigs.APPLICATION_CONF,
815+
default=True,
816+
)
817+
818+
with self._job_reported_clients_lock:
819+
# Remove stale tracking entries for jobs that are no longer running.
820+
for stale_job_id in list(self._job_reported_clients.keys()):
821+
if stale_job_id not in server_jobs:
822+
del self._job_reported_clients[stale_job_id]
823+
824+
# Record jobs that this client has reported at least once.
825+
# If require_previous_report is enabled, we only treat "missing job on client"
826+
# as dead-job after first positive observation.
827+
for job_id in server_jobs.intersection(client_jobs):
828+
job_info = self.engine.run_processes.get(job_id)
829+
if not job_info:
830+
continue
831+
807832
participating_clients = job_info.get(RunProcessKey.PARTICIPANTS, None)
808-
if participating_clients:
833+
if not participating_clients or client_token not in participating_clients:
834+
continue
835+
836+
self._job_reported_clients.setdefault(job_id, set()).add(client_token)
837+
838+
# Also check jobs that are running on server but not on the client.
839+
jobs_on_server_but_not_on_client = list(server_jobs.difference(client_jobs))
840+
dead_job_notifications = []
841+
if jobs_on_server_but_not_on_client:
842+
for job_id in jobs_on_server_but_not_on_client:
843+
job_info = self.engine.run_processes.get(job_id)
844+
if not job_info:
845+
continue
846+
847+
participating_clients = job_info.get(RunProcessKey.PARTICIPANTS, None)
848+
if not participating_clients:
849+
continue
850+
809851
# this is a dict: token => nvflare.apis.client.Client
810852
client = participating_clients.get(client_token, None)
811-
if client:
812-
self._notify_dead_job(client, job_id, "missing job on client")
853+
if not client:
854+
continue
855+
856+
reported_clients = self._job_reported_clients.get(job_id, set())
857+
if (not require_previous_report) or (client_token in reported_clients):
858+
dead_job_notifications.append((client, job_id))
859+
860+
for client, job_id in dead_job_notifications:
861+
self._notify_dead_job(client, job_id, "missing job on client")
813862

814863
return jobs_need_abort
815864

nvflare/private/fed/server/job_runner.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,21 @@
2121
from nvflare.apis.client import Client
2222
from nvflare.apis.event_type import EventType
2323
from nvflare.apis.fl_component import FLComponent
24-
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey, RunProcessKey, SiteType, SystemComponents
24+
from nvflare.apis.fl_constant import (
25+
AdminCommandNames,
26+
ConfigVarName,
27+
FLContextKey,
28+
RunProcessKey,
29+
SiteType,
30+
SystemComponents,
31+
SystemConfigs,
32+
)
2533
from nvflare.apis.fl_context import FLContext
2634
from nvflare.apis.job_def import ALL_SITES, Job, JobMetaKey, RunStatus
2735
from nvflare.apis.job_scheduler_spec import DispatchInfo
2836
from nvflare.apis.workspace import Workspace
2937
from nvflare.fuel.utils.argument_utils import parse_vars
38+
from nvflare.fuel.utils.config_service import ConfigService
3039
from nvflare.lighter.utils import verify_folder_signature
3140
from nvflare.private.admin_defs import Message, MsgHeader, ReturnCode
3241
from nvflare.private.defs import RequestHeader, TrainingTopic
@@ -214,7 +223,12 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> Tuple[str, li
214223
else:
215224
deploy_detail.append(f"{client_name}: OK")
216225
else:
217-
deploy_detail.append(f"{client_name}: unknown")
226+
# No reply means the client timed out during deployment.
227+
# Count this as a failure so the min_sites / required_sites check
228+
# can decide whether to abort, rather than silently treating a
229+
# timed-out client as successfully deployed.
230+
failed_clients.append(client_name)
231+
deploy_detail.append(f"{client_name}: no reply (deployment timeout)")
218232

219233
# see whether any of the failed clients are required
220234
if failed_clients:
@@ -248,15 +262,63 @@ def _start_run(self, job_id: str, job: Job, client_sites: Dict[str, DispatchInfo
248262
# job_clients is a dict of: token => Client
249263
assert isinstance(job_clients, dict)
250264
participating_clients = [c.to_dict() for c in job_clients.values()]
265+
# start_client_job serializes job.meta into request headers; make sure
266+
# JOB_CLIENTS is available before client startup.
251267
job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients
252268
err = engine.start_app_on_server(fl_ctx, job=job, job_clients=job_clients)
253269
if err:
254270
raise RuntimeError(f"Could not start the server App for job: {job_id}.")
255271

256272
replies = engine.start_client_job(job, client_sites, fl_ctx)
257-
client_sites_names = list(client_sites.keys())
258-
check_client_replies(replies=replies, client_sites=client_sites_names, command=f"start job ({job_id})")
259-
display_sites = ",".join(client_sites_names)
273+
all_client_sites = list(client_sites.keys())
274+
active_client_sites = list(all_client_sites)
275+
strict_start_reply_check = ConfigService.get_bool_var(
276+
name=ConfigVarName.STRICT_START_JOB_REPLY_CHECK,
277+
conf=SystemConfigs.APPLICATION_CONF,
278+
default=False,
279+
)
280+
timed_out = check_client_replies(
281+
replies=replies,
282+
client_sites=all_client_sites,
283+
command=f"start job ({job_id})",
284+
strict=strict_start_reply_check,
285+
)
286+
if timed_out:
287+
active_count = len(all_client_sites) - len(timed_out)
288+
289+
# A required site timing out is fatal regardless of min_sites, same as deploy phase.
290+
if job.required_sites:
291+
for c in timed_out:
292+
if c in job.required_sites:
293+
raise RuntimeError(f"start job ({job_id}): required client {c} timed out")
294+
295+
if job.min_sites and active_count < job.min_sites:
296+
raise RuntimeError(
297+
f"start job ({job_id}): {len(timed_out)} client(s) timed out and remaining "
298+
f"{active_count} < min_sites {job.min_sites}: {timed_out}"
299+
)
300+
self.log_warning(
301+
fl_ctx,
302+
f"start job ({job_id}): {len(timed_out)} client(s) timed out at start-job: {timed_out}; "
303+
f"{active_count} of {len(all_client_sites)} clients started successfully.",
304+
)
305+
active_client_sites = [c for c in all_client_sites if c not in timed_out]
306+
307+
if not strict_start_reply_check:
308+
# In non-strict mode, check_client_replies() does not return timed-out clients.
309+
# Build active clients directly from actual replies so JOB_CLIENTS stays accurate.
310+
replies_by_client = {r.client_name: r for r in replies}
311+
active_client_sites = []
312+
for client_name in all_client_sites:
313+
client_reply = replies_by_client.get(client_name)
314+
if client_reply and client_reply.reply:
315+
active_client_sites.append(client_name)
316+
317+
# Set metadata once, after any timeout exclusion, so it always reflects active participants.
318+
active_sites = set(active_client_sites)
319+
participating_clients = [c.to_dict() for c in job_clients.values() if c.name in active_sites]
320+
job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients
321+
display_sites = ",".join(active_client_sites)
260322

261323
self.log_info(fl_ctx, f"Started run: {job_id} for clients: {display_sites}")
262324
self.fire_event(EventType.JOB_STARTED, fl_ctx)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.

0 commit comments

Comments
 (0)