From d5620c95471628427fee4a03a28eeb414f81e9c9 Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 1 Sep 2023 18:46:21 +0200 Subject: [PATCH 1/4] Add retries and comments --- compute_worker/compute_worker.py | 66 +++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 6ac63c9d6..768d1f2df 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -16,7 +16,7 @@ from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve -from zipfile import ZipFile +from zipfile import ZipFile, BadZipFile import requests import websockets @@ -343,6 +343,7 @@ def _get_bundle(self, url, destination, cache=True): logger.info(f"Getting bundle {url} to unpack @ {destination}") download_needed = True + # Try to find the bundle in the cache of the worker if cache: # Hash url and download it if it doesn't exist url_without_params = url.split("?")[0] @@ -354,17 +355,27 @@ def _get_bundle(self, url, destination, cache=True): os.mkdir(self.bundle_dir) bundle_file = tempfile.NamedTemporaryFile(dir=self.bundle_dir, delete=False).name - if download_needed: + # Fetch and extract + retries, max_retries = (0, 3) + while retries < max_retries: + if download_needed: + try: + # Download the bundle + urlretrieve(url, bundle_file) + except HTTPError: + raise SubmissionException(f"Problem fetching {url} to put in {destination}") try: - urlretrieve(url, bundle_file) - except HTTPError: - raise SubmissionException(f"Problem fetching {url} to put in {destination}") - - # Extract the contents to destination directory - with ZipFile(bundle_file, 'r') as z: - z.extractall(os.path.join(self.root_dir, destination)) - - # Give back zip file path for other uses, i.e. md5'ing the zip to ID it + # Extract the contents to destination directory + with ZipFile(bundle_file, 'r') as z: + z.extractall(os.path.join(self.root_dir, destination)) + break # Break if the loop is successful + except BadZipFile: + retries += 1 + if retries >= max_retries: + raise # Re-raise the last caught BadZipFile exception + else: + time.sleep(30) # Wait 30 seconds before retrying + # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file async def _run_container_engine_cmd(self, engine_cmd, kind): @@ -579,26 +590,45 @@ async def _run_program_directory(self, program_dir, kind, can_be_output=False): # This runs the container engine command and asynchronously passes data back via websocket return await self._run_container_engine_cmd(engine_cmd, kind=kind) + def is_valid_zip(zip_path): + # Check zip integrity + try: + with ZipFile(zip_path, 'r') as zf: + return zf.testzip() is None + except BadZipFile: + return False + def _put_dir(self, url, directory): + """ Zip the directory and send it to the given URL using _put_file. + """ logger.info("Putting dir %s in %s" % (directory, url)) - - zip_path = make_archive(os.path.join(self.root_dir, str(uuid.uuid4())), 'zip', directory) - self._put_file(url, file=zip_path) + retries, max_retries = (0, 3) + while retries < max_retries: + # Zip the directory + zip_path = make_archive(os.path.join(self.root_dir, str(uuid.uuid4())), 'zip', directory) + if is_valid_zip(zip_path): # Check zip integrity + self._put_file(url, file=zip_path) # Send the file + break # Leave the loop in case of success + else: + retries += 1 + if retries >= max_retries: + raise Exception("ZIP file is corrupted or incomplete.") + else: + time.sleep(30) # Wait 30 seconds before retrying def _put_file(self, url, file=None, raw_data=None, content_type='application/zip'): - + """ Send the file in the storage. + """ if file and raw_data: raise Exception("Cannot put both a file and raw_data") - + headers = { # For Azure only, other systems ignore these headers 'x-ms-blob-type': 'BlockBlob', 'x-ms-version': '2018-03-28', } - if content_type: headers['Content-Type'] = content_type - if file: logger.info("Putting file %s in %s" % (file, url)) data = open(file, 'rb') From 8d1374dc8afd7ba1877689de6cbc2e78af434b16 Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 1 Sep 2023 18:51:24 +0200 Subject: [PATCH 2/4] Add logger messages --- compute_worker/compute_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 768d1f2df..c90de1b7d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -374,6 +374,7 @@ def _get_bundle(self, url, destination, cache=True): if retries >= max_retries: raise # Re-raise the last caught BadZipFile exception else: + logger.info("Failed. Retrying in 30 seconds...") time.sleep(30) # Wait 30 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file @@ -614,6 +615,7 @@ def _put_dir(self, url, directory): if retries >= max_retries: raise Exception("ZIP file is corrupted or incomplete.") else: + logger.info("Failed. Retrying in 30 seconds...") time.sleep(30) # Wait 30 seconds before retrying def _put_file(self, url, file=None, raw_data=None, content_type='application/zip'): From 9faa17dcb489ef1e4de71ec35febcb0c94c2accc Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 1 Sep 2023 18:59:32 +0200 Subject: [PATCH 3/4] Minor fixes --- compute_worker/compute_worker.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c90de1b7d..37301d022 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -151,6 +151,15 @@ def delete_files_in_folder(folder): shutil.rmtree(file_path) +def is_valid_zip(zip_path): + # Check zip integrity + try: + with ZipFile(zip_path, 'r') as zf: + return zf.testzip() is None + except BadZipFile: + return False + + class ExecutionTimeLimitExceeded(Exception): pass @@ -356,7 +365,7 @@ def _get_bundle(self, url, destination, cache=True): bundle_file = tempfile.NamedTemporaryFile(dir=self.bundle_dir, delete=False).name # Fetch and extract - retries, max_retries = (0, 3) + retries, max_retries = (0, 10) while retries < max_retries: if download_needed: try: @@ -375,7 +384,7 @@ def _get_bundle(self, url, destination, cache=True): raise # Re-raise the last caught BadZipFile exception else: logger.info("Failed. Retrying in 30 seconds...") - time.sleep(30) # Wait 30 seconds before retrying + time.sleep(60) # Wait 60 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file @@ -591,14 +600,6 @@ async def _run_program_directory(self, program_dir, kind, can_be_output=False): # This runs the container engine command and asynchronously passes data back via websocket return await self._run_container_engine_cmd(engine_cmd, kind=kind) - def is_valid_zip(zip_path): - # Check zip integrity - try: - with ZipFile(zip_path, 'r') as zf: - return zf.testzip() is None - except BadZipFile: - return False - def _put_dir(self, url, directory): """ Zip the directory and send it to the given URL using _put_file. """ @@ -606,7 +607,10 @@ def _put_dir(self, url, directory): retries, max_retries = (0, 3) while retries < max_retries: # Zip the directory + start_time = time.time() zip_path = make_archive(os.path.join(self.root_dir, str(uuid.uuid4())), 'zip', directory) + duration = time.time() - start_time + logger.info("Time needed to zip archive: {duration} seconds.") if is_valid_zip(zip_path): # Check zip integrity self._put_file(url, file=zip_path) # Send the file break # Leave the loop in case of success From a2bbb74cfd9950d81c938734114a65f0d3523fdf Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 1 Sep 2023 19:12:50 +0200 Subject: [PATCH 4/4] Correct typo --- compute_worker/compute_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 37301d022..875051090 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -383,7 +383,7 @@ def _get_bundle(self, url, destination, cache=True): if retries >= max_retries: raise # Re-raise the last caught BadZipFile exception else: - logger.info("Failed. Retrying in 30 seconds...") + logger.info("Failed. Retrying in 60 seconds...") time.sleep(60) # Wait 60 seconds before retrying # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file