From f91042637de2ee037fe365e0ac5e775723fd0b4b Mon Sep 17 00:00:00 2001 From: didayolo Date: Sat, 11 Oct 2025 15:28:27 +0200 Subject: [PATCH 1/5] Update compute worker to FAILED when needed --- compute_worker/compute_worker.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a592604f0..311add58b 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1129,12 +1129,16 @@ def start(self): self._run_program_directory(ingestion_program_dir, kind="ingestion"), self.watch_detailed_results(), loop=loop, + return_exceptions=True, ) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: loop.run_until_complete(gathered_tasks) + # keep what gather returned so we can detect async errors later + task_results = list(gathered_tasks.result() or []) except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1211,6 +1215,18 @@ def start(self): logger.info("Program finished") signal.alarm(0) + # Failure "gate" BEFORE changing status + # An async task error? + had_async_exc = any(isinstance(r, BaseException) for r in task_results) + # Non-zero exit from either container counts as failure for this phase + program_rc = getattr(self, "program_exit_code", None) + ingestion_rc = getattr(self, "ingestion_program_exit_code", None) + failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") + if self.is_scoring: self._update_status(STATUS_FINISHED) else: From 1ffe72caec220fef8f367f9b73573d02b93dd542 Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 11:54:41 +0100 Subject: [PATCH 2/5] Catch only real errors, improve formatting of Exception --- compute_worker/compute_worker.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 311add58b..ae708d12c 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -449,7 +449,7 @@ async def send_detailed_results(self, file_path): ) ) except Exception as e: - logger.error("This error might result in a Execution Time Exceeded error" + e) + logger.error(f"This error might result in a Execution Time Exceeded error: {e}") if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -668,8 +668,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error( - "There was an error trying to connect to the websocket on the codabench instance" - + e + f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -718,8 +717,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) except Exception as e: logger.error( - "There was an error while starting the container and getting the logs" - + e + f"There was an error while starting the container and getting the logs: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1136,9 +1134,9 @@ def start(self): signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: - loop.run_until_complete(gathered_tasks) + # run tasks # keep what gather returned so we can detect async errors later - task_results = list(gathered_tasks.result() or []) + task_results = loop.run_until_complete(gathered_tasks) or [] except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1163,7 +1161,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1193,7 +1191,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1216,14 +1214,18 @@ def start(self): signal.alarm(0) # Failure "gate" BEFORE changing status - # An async task error? - had_async_exc = any(isinstance(r, BaseException) for r in task_results) - # Non-zero exit from either container counts as failure for this phase + def is_real_async_failure(r): + # gather returns either normal values or exception instances when return_exceptions=True + return isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError) + had_async_exc = any(is_real_async_failure(r) for r in task_results) program_rc = getattr(self, "program_exit_code", None) ingestion_rc = getattr(self, "ingestion_program_exit_code", None) failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) if had_async_exc or failed_rc: - self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}") + self._update_status( + STATUS_FAILED, + extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}, async={task_results}", + ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") From 63fa6fe062891213420ae6fbf4069d98c61f0913 Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 12:26:27 +0100 Subject: [PATCH 3/5] Make status code integers, add logs --- compute_worker/compute_worker.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index ae708d12c..28eaa0f8d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -746,7 +746,7 @@ async def _run_container_engine_cmd(self, container, kind): Exception, ) as e: logger.error(e) - return_Code = {"StatusCode": e} + return_Code = {"StatusCode": 1} self.logs[kind] = { "returncode": return_Code["StatusCode"], @@ -1177,7 +1177,12 @@ def start(self): elapsed_time = logs["end"] - logs["start"] else: elapsed_time = self.execution_time_limit - return_code = logs["returncode"] + # Normalize the return_code + return_code = ( + logs["returncode"] + if logs["returncode"] is None or isinstance(logs["returncode"], int) + else 1 + ) if return_code is None: logger.warning("No return code from Process. Killing it") if kind == "ingestion": @@ -1229,6 +1234,10 @@ def is_real_async_failure(r): # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") + logger.info( + "PROGRAM STATUS: is_scoring=%s program_rc=%r ingestion_rc=%r task_results=%r", + self.is_scoring, program_rc, ingestion_rc, task_results + ) if self.is_scoring: self._update_status(STATUS_FINISHED) else: From 6e19ba271760238e9271d14f8be2ae9c9966fe7c Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 13:41:55 +0100 Subject: [PATCH 4/5] For scoring program only --- compute_worker/compute_worker.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 28eaa0f8d..14a1b3c59 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1218,27 +1218,17 @@ def start(self): logger.info("Program finished") signal.alarm(0) - # Failure "gate" BEFORE changing status - def is_real_async_failure(r): - # gather returns either normal values or exception instances when return_exceptions=True - return isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError) - had_async_exc = any(is_real_async_failure(r) for r in task_results) - program_rc = getattr(self, "program_exit_code", None) - ingestion_rc = getattr(self, "ingestion_program_exit_code", None) - failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) - if had_async_exc or failed_rc: - self._update_status( - STATUS_FAILED, - extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}, async={task_results}", - ) - # Raise so upstream marks failed immediately - raise SubmissionException("Child task failed or non-zero return code") - - logger.info( - "PROGRAM STATUS: is_scoring=%s program_rc=%r ingestion_rc=%r task_results=%r", - self.is_scoring, program_rc, ingestion_rc, task_results - ) if self.is_scoring: + # Check if scoring program failed + program_results, _, _ = task_results + # Gather returns either normal values or exception instances when return_exceptions=True + had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError) + program_rc = getattr(self, "program_exit_code", None) + failed_rc = program_rc not in (0, None) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") self._update_status(STATUS_FINISHED) else: self._update_status(STATUS_SCORING) From fdb5a6b7f79354a5d779f1f9c9f9086a6f7174aa Mon Sep 17 00:00:00 2001 From: Obada Haddad Date: Mon, 16 Feb 2026 13:54:05 +0100 Subject: [PATCH 5/5] rebased branch; added some more try...except and error handling; moved scoring and ingestion update to prepare() from start() --- compute_worker/compute_worker.py | 34 ++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 14a1b3c59..cb9bcfdce 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -452,6 +452,7 @@ async def send_detailed_results(self, file_path): logger.error(f"This error might result in a Execution Time Exceeded error: {e}") if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) + raise SubmissionException("Could not connect to instance to update detailed result") def _get_stdout_stderr_file_names(self, run_args): # run_args should be the run_args argument passed to __init__ from the run_wrapper. @@ -628,10 +629,10 @@ def _get_bundle(self, url, destination, cache=True): except BadZipFile: retries += 1 if retries >= max_retries: - raise # Re-raise the last caught BadZipFile exception + raise SubmissionException("Bad or empty zip file") else: - logger.warning("Failed. Retrying in 60 seconds...") - time.sleep(60) # Wait 60 seconds before retrying + logger.warning("Failed. Retrying in 20 seconds...") + time.sleep(20) # Wait 20 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file @@ -1064,6 +1065,15 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): logger.info("Cache directory does not need to be pruned!") def prepare(self): + hostname = utils.nodenames.gethostname() + if self.is_scoring: + self._update_status( + STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" + ) + else: + self._update_status( + STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" + ) if not self.is_scoring: # Only during prediction step do we want to announce "preparing" self._update_status(STATUS_PREPARING) @@ -1108,15 +1118,6 @@ def prepare(self): self._get_container_image(self.container_image) def start(self): - hostname = utils.nodenames.gethostname() - if self.is_scoring: - self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" - ) - else: - self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" - ) program_dir = os.path.join(self.root_dir, "program") ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") @@ -1290,9 +1291,12 @@ def push_output(self): "Error, the output directory already contains a metadata file. This file is used " "to store exitCode and other data, do not write to this file manually." ) - - with open(metadata_path, "w") as f: - f.write(yaml.dump(prog_status, default_flow_style=False)) + try: + with open(metadata_path, "w") as f: + f.write(yaml.dump(prog_status, default_flow_style=False)) + except Exception as e: + logger.error(e) + raise SubmissionException("Metadata file not found") if not self.is_scoring: self._put_dir(self.prediction_result, self.output_dir)