Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 46 additions & 25 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,10 @@ async def send_detailed_results(self, file_path):
)
)
except Exception as e:
logger.error("This error might result in a Execution Time Exceeded error" + e)
logger.error(f"This error might result in a Execution Time Exceeded error: {e}")
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
logger.exception(e)
raise SubmissionException("Could not connect to instance to update detailed result")

def _get_stdout_stderr_file_names(self, run_args):
# run_args should be the run_args argument passed to __init__ from the run_wrapper.
Expand Down Expand Up @@ -628,10 +629,10 @@ def _get_bundle(self, url, destination, cache=True):
except BadZipFile:
retries += 1
if retries >= max_retries:
raise # Re-raise the last caught BadZipFile exception
raise SubmissionException("Bad or empty zip file")
else:
logger.warning("Failed. Retrying in 60 seconds...")
time.sleep(60) # Wait 60 seconds before retrying
logger.warning("Failed. Retrying in 20 seconds...")
time.sleep(20) # Wait 20 seconds before retrying
# Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
return bundle_file

Expand Down Expand Up @@ -668,8 +669,7 @@ async def _run_container_engine_cmd(self, container, kind):
)
except Exception as e:
logger.error(
"There was an error trying to connect to the websocket on the codabench instance"
+ e
f"There was an error trying to connect to the websocket on the codabench instance: {e}"
)
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
logger.exception(e)
Expand Down Expand Up @@ -718,8 +718,7 @@ async def _run_container_engine_cmd(self, container, kind):
logger.error(e)
except Exception as e:
logger.error(
"There was an error while starting the container and getting the logs"
+ e
f"There was an error while starting the container and getting the logs: {e}"
)
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
logger.exception(e)
Expand Down Expand Up @@ -748,7 +747,7 @@ async def _run_container_engine_cmd(self, container, kind):
Exception,
) as e:
logger.error(e)
return_Code = {"StatusCode": e}
return_Code = {"StatusCode": 1}

self.logs[kind] = {
"returncode": return_Code["StatusCode"],
Expand Down Expand Up @@ -1066,6 +1065,15 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB):
logger.info("Cache directory does not need to be pruned!")

def prepare(self):
hostname = utils.nodenames.gethostname()
if self.is_scoring:
self._update_status(
STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}"
)
else:
self._update_status(
STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}"
)
if not self.is_scoring:
# Only during prediction step do we want to announce "preparing"
self._update_status(STATUS_PREPARING)
Expand Down Expand Up @@ -1110,15 +1118,6 @@ def prepare(self):
self._get_container_image(self.container_image)

def start(self):
hostname = utils.nodenames.gethostname()
if self.is_scoring:
self._update_status(
STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}"
)
else:
self._update_status(
STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}"
)
program_dir = os.path.join(self.root_dir, "program")
ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")

Expand All @@ -1129,12 +1128,16 @@ def start(self):
self._run_program_directory(ingestion_program_dir, kind="ingestion"),
self.watch_detailed_results(),
loop=loop,
return_exceptions=True,
)

task_results = [] # will store results/exceptions from gather
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(self.execution_time_limit)
try:
loop.run_until_complete(gathered_tasks)
# run tasks
# keep what gather returned so we can detect async errors later
task_results = loop.run_until_complete(gathered_tasks) or []
except ExecutionTimeLimitExceeded:
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
logger.error(error_message)
Expand All @@ -1159,7 +1162,7 @@ def start(self):
logger.error(e)
except Exception as e:
logger.error(
"There was a problem killing " + str(containers_to_kill) + e
f"There was a problem killing {containers_to_kill}: {e}"
)
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
logger.exception(e)
Expand All @@ -1175,7 +1178,12 @@ def start(self):
elapsed_time = logs["end"] - logs["start"]
else:
elapsed_time = self.execution_time_limit
return_code = logs["returncode"]
# Normalize the return_code
return_code = (
logs["returncode"]
if logs["returncode"] is None or isinstance(logs["returncode"], int)
else 1
)
if return_code is None:
logger.warning("No return code from Process. Killing it")
if kind == "ingestion":
Expand All @@ -1189,7 +1197,7 @@ def start(self):
logger.error(e)
except Exception as e:
logger.error(
"There was a problem killing " + str(containers_to_kill) + e
f"There was a problem killing {containers_to_kill}: {e}"
)
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
logger.exception(e)
Expand All @@ -1212,6 +1220,16 @@ def start(self):
signal.alarm(0)

if self.is_scoring:
# Check if scoring program failed
program_results, _, _ = task_results
# Gather returns either normal values or exception instances when return_exceptions=True
had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError)
program_rc = getattr(self, "program_exit_code", None)
failed_rc = program_rc not in (0, None)
if had_async_exc or failed_rc:
self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}")
# Raise so upstream marks failed immediately
raise SubmissionException("Child task failed or non-zero return code")
self._update_status(STATUS_FINISHED)
else:
self._update_status(STATUS_SCORING)
Expand Down Expand Up @@ -1273,9 +1291,12 @@ def push_output(self):
"Error, the output directory already contains a metadata file. This file is used "
"to store exitCode and other data, do not write to this file manually."
)

with open(metadata_path, "w") as f:
f.write(yaml.dump(prog_status, default_flow_style=False))
try:
with open(metadata_path, "w") as f:
f.write(yaml.dump(prog_status, default_flow_style=False))
except Exception as e:
logger.error(e)
raise SubmissionException("Metadata file not found")

if not self.is_scoring:
self._put_dir(self.prediction_result, self.output_dir)
Expand Down