|
21 | 21 | from nvflare.apis.client import Client |
22 | 22 | from nvflare.apis.event_type import EventType |
23 | 23 | from nvflare.apis.fl_component import FLComponent |
24 | | -from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey, RunProcessKey, SiteType, SystemComponents |
| 24 | +from nvflare.apis.fl_constant import ( |
| 25 | + AdminCommandNames, |
| 26 | + ConfigVarName, |
| 27 | + FLContextKey, |
| 28 | + RunProcessKey, |
| 29 | + SiteType, |
| 30 | + SystemComponents, |
| 31 | + SystemConfigs, |
| 32 | +) |
25 | 33 | from nvflare.apis.fl_context import FLContext |
26 | 34 | from nvflare.apis.job_def import ALL_SITES, Job, JobMetaKey, RunStatus |
27 | 35 | from nvflare.apis.job_scheduler_spec import DispatchInfo |
28 | 36 | from nvflare.apis.workspace import Workspace |
29 | 37 | from nvflare.fuel.utils.argument_utils import parse_vars |
| 38 | +from nvflare.fuel.utils.config_service import ConfigService |
30 | 39 | from nvflare.lighter.utils import verify_folder_signature |
31 | 40 | from nvflare.private.admin_defs import Message, MsgHeader, ReturnCode |
32 | 41 | from nvflare.private.defs import RequestHeader, TrainingTopic |
@@ -214,7 +223,12 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> Tuple[str, li |
214 | 223 | else: |
215 | 224 | deploy_detail.append(f"{client_name}: OK") |
216 | 225 | else: |
217 | | - deploy_detail.append(f"{client_name}: unknown") |
| 226 | + # No reply means the client timed out during deployment. |
| 227 | + # Count this as a failure so the min_sites / required_sites check |
| 228 | + # can decide whether to abort, rather than silently treating a |
| 229 | + # timed-out client as successfully deployed. |
| 230 | + failed_clients.append(client_name) |
| 231 | + deploy_detail.append(f"{client_name}: no reply (deployment timeout)") |
218 | 232 |
|
219 | 233 | # see whether any of the failed clients are required |
220 | 234 | if failed_clients: |
@@ -248,15 +262,63 @@ def _start_run(self, job_id: str, job: Job, client_sites: Dict[str, DispatchInfo |
248 | 262 | # job_clients is a dict of: token => Client |
249 | 263 | assert isinstance(job_clients, dict) |
250 | 264 | participating_clients = [c.to_dict() for c in job_clients.values()] |
| 265 | + # start_client_job serializes job.meta into request headers; make sure |
| 266 | + # JOB_CLIENTS is available before client startup. |
251 | 267 | job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients |
252 | 268 | err = engine.start_app_on_server(fl_ctx, job=job, job_clients=job_clients) |
253 | 269 | if err: |
254 | 270 | raise RuntimeError(f"Could not start the server App for job: {job_id}.") |
255 | 271 |
|
256 | 272 | replies = engine.start_client_job(job, client_sites, fl_ctx) |
257 | | - client_sites_names = list(client_sites.keys()) |
258 | | - check_client_replies(replies=replies, client_sites=client_sites_names, command=f"start job ({job_id})") |
259 | | - display_sites = ",".join(client_sites_names) |
| 273 | + all_client_sites = list(client_sites.keys()) |
| 274 | + active_client_sites = list(all_client_sites) |
| 275 | + strict_start_reply_check = ConfigService.get_bool_var( |
| 276 | + name=ConfigVarName.STRICT_START_JOB_REPLY_CHECK, |
| 277 | + conf=SystemConfigs.APPLICATION_CONF, |
| 278 | + default=False, |
| 279 | + ) |
| 280 | + timed_out = check_client_replies( |
| 281 | + replies=replies, |
| 282 | + client_sites=all_client_sites, |
| 283 | + command=f"start job ({job_id})", |
| 284 | + strict=strict_start_reply_check, |
| 285 | + ) |
| 286 | + if timed_out: |
| 287 | + active_count = len(all_client_sites) - len(timed_out) |
| 288 | + |
| 289 | + # A required site timing out is fatal regardless of min_sites, same as deploy phase. |
| 290 | + if job.required_sites: |
| 291 | + for c in timed_out: |
| 292 | + if c in job.required_sites: |
| 293 | + raise RuntimeError(f"start job ({job_id}): required client {c} timed out") |
| 294 | + |
| 295 | + if job.min_sites and active_count < job.min_sites: |
| 296 | + raise RuntimeError( |
| 297 | + f"start job ({job_id}): {len(timed_out)} client(s) timed out and remaining " |
| 298 | + f"{active_count} < min_sites {job.min_sites}: {timed_out}" |
| 299 | + ) |
| 300 | + self.log_warning( |
| 301 | + fl_ctx, |
| 302 | + f"start job ({job_id}): {len(timed_out)} client(s) timed out at start-job: {timed_out}; " |
| 303 | + f"{active_count} of {len(all_client_sites)} clients started successfully.", |
| 304 | + ) |
| 305 | + active_client_sites = [c for c in all_client_sites if c not in timed_out] |
| 306 | + |
| 307 | + if not strict_start_reply_check: |
| 308 | + # In non-strict mode, check_client_replies() does not return timed-out clients. |
| 309 | + # Build active clients directly from actual replies so JOB_CLIENTS stays accurate. |
| 310 | + replies_by_client = {r.client_name: r for r in replies} |
| 311 | + active_client_sites = [] |
| 312 | + for client_name in all_client_sites: |
| 313 | + client_reply = replies_by_client.get(client_name) |
| 314 | + if client_reply and client_reply.reply: |
| 315 | + active_client_sites.append(client_name) |
| 316 | + |
| 317 | + # Set metadata once, after any timeout exclusion, so it always reflects active participants. |
| 318 | + active_sites = set(active_client_sites) |
| 319 | + participating_clients = [c.to_dict() for c in job_clients.values() if c.name in active_sites] |
| 320 | + job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients |
| 321 | + display_sites = ",".join(active_client_sites) |
260 | 322 |
|
261 | 323 | self.log_info(fl_ctx, f"Started run: {job_id} for clients: {display_sites}") |
262 | 324 | self.fire_event(EventType.JOB_STARTED, fl_ctx) |
|
0 commit comments