From 058e233edd014ef5cb0cbf77b1e45d17ecca5b75 Mon Sep 17 00:00:00 2001 From: dconstancy Date: Tue, 16 Jun 2026 15:53:09 +0200 Subject: [PATCH 1/2] fix: add retry with backoff to _update_submission and stop swallowing terminal exceptions --- compute_worker/compute_worker.py | 48 ++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c02abcb0f..db8ac83ae 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -8,6 +8,7 @@ import signal import socket import tempfile +import random import time import uuid import requests @@ -503,8 +504,10 @@ def __init__(self, run_args): self.requests_session = requests.Session() adapter = requests.adapters.HTTPAdapter( max_retries=Retry( - total=3, - backoff_factor=1, + total=5, + backoff_factor=2, + status_forcelist=[502, 503, 504], + allowed_methods=["PATCH", "GET", "PUT", "POST"], ) ) self.requests_session.mount("http://", adapter) @@ -614,20 +617,36 @@ def _get_stdout_stderr_file_names(self, run_args): ] return [run_args[name] for name in DETAILED_OUTPUT_NAMES] - def _update_submission(self, data): + def _update_submission(self, data, max_retries=5, backoff_base=2): url = f"{self.submissions_api_url}/submissions/{self.submission_id}/" data["secret"] = self.secret - logger.info(f"Updating submission @ {url} with data = {data}") - - resp = self.requests_session.patch(url, data=data, timeout=150) - if resp.status_code == 200: - logger.info("Submission updated successfully!") - else: - logger.error( - f"Submission patch failed with status = {resp.status_code}, and response = \n{resp.content}" - ) - raise SubmissionException("Failure updating submission data.") + for attempt in range(1, max_retries + 1): + logger.info(f"Updating submission @ {url} (attempt {attempt}/{max_retries}) with data = {data}") + try: + resp = self.requests_session.patch(url, data=data, timeout=150) + except requests.exceptions.RequestException as exc: + logger.warning(f"Submission patch request failed (attempt {attempt}/{max_retries}): {exc}") + if attempt == max_retries: + raise SubmissionException(f"Failure updating submission data after {max_retries} attempts.") + sleep_s = backoff_base ** attempt + random.uniform(0, 1) + logger.info(f"Retrying in {sleep_s:.1f}s...") + time.sleep(sleep_s) + continue + + if resp.status_code == 200: + logger.info("Submission updated successfully!") + return + else: + logger.warning( + f"Submission patch failed (attempt {attempt}/{max_retries}) " + f"with status = {resp.status_code}, and response = \n{resp.content}" + ) + if attempt == max_retries: + raise SubmissionException(f"Failure updating submission data after {max_retries} attempts.") + sleep_s = backoff_base ** attempt + random.uniform(0, 1) + logger.info(f"Retrying in {sleep_s:.1f}s...") + time.sleep(sleep_s) def _update_status(self, status, extra_information=None): # Update submission status @@ -639,8 +658,9 @@ def _update_status(self, status, extra_information=None): try: self._update_submission(data) except Exception as e: - # Always catch exception and never raise error logger.exception(f"Failed to update submission status to {status}: {e}") + if status in ("Finished", "Failed"): + raise def _get_container_image(self, image_name): logger.info("Running pull for image: {}".format(image_name)) From bbdbcbc64577ba3391e6792171d28b4645b69ac4 Mon Sep 17 00:00:00 2001 From: dconstancy Date: Fri, 19 Jun 2026 17:53:34 +0200 Subject: [PATCH 2/2] fix: address review comments on _update_submission --- compute_worker/compute_worker.py | 42 ++++++++++---------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index db8ac83ae..cebc4fe7c 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -8,7 +8,6 @@ import signal import socket import tempfile -import random import time import uuid import requests @@ -507,7 +506,7 @@ def __init__(self, run_args): total=5, backoff_factor=2, status_forcelist=[502, 503, 504], - allowed_methods=["PATCH", "GET", "PUT", "POST"], + allowed_methods=["PATCH", "GET", "PUT"], ) ) self.requests_session.mount("http://", adapter) @@ -617,36 +616,21 @@ def _get_stdout_stderr_file_names(self, run_args): ] return [run_args[name] for name in DETAILED_OUTPUT_NAMES] - def _update_submission(self, data, max_retries=5, backoff_base=2): + def _update_submission(self, data): url = f"{self.submissions_api_url}/submissions/{self.submission_id}/" data["secret"] = self.secret - for attempt in range(1, max_retries + 1): - logger.info(f"Updating submission @ {url} (attempt {attempt}/{max_retries}) with data = {data}") - try: - resp = self.requests_session.patch(url, data=data, timeout=150) - except requests.exceptions.RequestException as exc: - logger.warning(f"Submission patch request failed (attempt {attempt}/{max_retries}): {exc}") - if attempt == max_retries: - raise SubmissionException(f"Failure updating submission data after {max_retries} attempts.") - sleep_s = backoff_base ** attempt + random.uniform(0, 1) - logger.info(f"Retrying in {sleep_s:.1f}s...") - time.sleep(sleep_s) - continue - - if resp.status_code == 200: - logger.info("Submission updated successfully!") - return - else: - logger.warning( - f"Submission patch failed (attempt {attempt}/{max_retries}) " - f"with status = {resp.status_code}, and response = \n{resp.content}" - ) - if attempt == max_retries: - raise SubmissionException(f"Failure updating submission data after {max_retries} attempts.") - sleep_s = backoff_base ** attempt + random.uniform(0, 1) - logger.info(f"Retrying in {sleep_s:.1f}s...") - time.sleep(sleep_s) + logger.info(f"Updating submission @ {url}") + + resp = self.requests_session.patch(url, data=data, timeout=150) + if resp.status_code == 200: + logger.info("Submission updated successfully!") + else: + logger.error( + f"Submission patch failed with status = {resp.status_code}, " + f"and response = \n{resp.content}" + ) + raise SubmissionException("Failure updating submission data.") def _update_status(self, status, extra_information=None): # Update submission status