diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a592604f0..cb9bcfdce 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -449,9 +449,10 @@ async def send_detailed_results(self, file_path): ) ) except Exception as e: - logger.error("This error might result in a Execution Time Exceeded error" + e) + logger.error(f"This error might result in a Execution Time Exceeded error: {e}") if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) + raise SubmissionException("Could not connect to instance to update detailed result") def _get_stdout_stderr_file_names(self, run_args): # run_args should be the run_args argument passed to __init__ from the run_wrapper. @@ -628,10 +629,10 @@ def _get_bundle(self, url, destination, cache=True): except BadZipFile: retries += 1 if retries >= max_retries: - raise # Re-raise the last caught BadZipFile exception + raise SubmissionException("Bad or empty zip file") else: - logger.warning("Failed. Retrying in 60 seconds...") - time.sleep(60) # Wait 60 seconds before retrying + logger.warning("Failed. Retrying in 20 seconds...") + time.sleep(20) # Wait 20 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file @@ -668,8 +669,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error( - "There was an error trying to connect to the websocket on the codabench instance" - + e + f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -718,8 +718,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) except Exception as e: logger.error( - "There was an error while starting the container and getting the logs" - + e + f"There was an error while starting the container and getting the logs: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -748,7 +747,7 @@ async def _run_container_engine_cmd(self, container, kind): Exception, ) as e: logger.error(e) - return_Code = {"StatusCode": e} + return_Code = {"StatusCode": 1} self.logs[kind] = { "returncode": return_Code["StatusCode"], @@ -1066,6 +1065,15 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): logger.info("Cache directory does not need to be pruned!") def prepare(self): + hostname = utils.nodenames.gethostname() + if self.is_scoring: + self._update_status( + STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" + ) + else: + self._update_status( + STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" + ) if not self.is_scoring: # Only during prediction step do we want to announce "preparing" self._update_status(STATUS_PREPARING) @@ -1110,15 +1118,6 @@ def prepare(self): self._get_container_image(self.container_image) def start(self): - hostname = utils.nodenames.gethostname() - if self.is_scoring: - self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" - ) - else: - self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" - ) program_dir = os.path.join(self.root_dir, "program") ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") @@ -1129,12 +1128,16 @@ def start(self): self._run_program_directory(ingestion_program_dir, kind="ingestion"), self.watch_detailed_results(), loop=loop, + return_exceptions=True, ) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: - loop.run_until_complete(gathered_tasks) + # run tasks + # keep what gather returned so we can detect async errors later + task_results = loop.run_until_complete(gathered_tasks) or [] except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1159,7 +1162,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1175,7 +1178,12 @@ def start(self): elapsed_time = logs["end"] - logs["start"] else: elapsed_time = self.execution_time_limit - return_code = logs["returncode"] + # Normalize the return_code + return_code = ( + logs["returncode"] + if logs["returncode"] is None or isinstance(logs["returncode"], int) + else 1 + ) if return_code is None: logger.warning("No return code from Process. Killing it") if kind == "ingestion": @@ -1189,7 +1197,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1212,6 +1220,16 @@ def start(self): signal.alarm(0) if self.is_scoring: + # Check if scoring program failed + program_results, _, _ = task_results + # Gather returns either normal values or exception instances when return_exceptions=True + had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError) + program_rc = getattr(self, "program_exit_code", None) + failed_rc = program_rc not in (0, None) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") self._update_status(STATUS_FINISHED) else: self._update_status(STATUS_SCORING) @@ -1273,9 +1291,12 @@ def push_output(self): "Error, the output directory already contains a metadata file. This file is used " "to store exitCode and other data, do not write to this file manually." ) - - with open(metadata_path, "w") as f: - f.write(yaml.dump(prog_status, default_flow_style=False)) + try: + with open(metadata_path, "w") as f: + f.write(yaml.dump(prog_status, default_flow_style=False)) + except Exception as e: + logger.error(e) + raise SubmissionException("Metadata file not found") if not self.is_scoring: self._put_dir(self.prediction_result, self.output_dir)