diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 6ac63c9d6..875051090 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -16,7 +16,7 @@ from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve -from zipfile import ZipFile +from zipfile import ZipFile, BadZipFile import requests import websockets @@ -151,6 +151,15 @@ def delete_files_in_folder(folder): shutil.rmtree(file_path) +def is_valid_zip(zip_path): + # Check zip integrity + try: + with ZipFile(zip_path, 'r') as zf: + return zf.testzip() is None + except BadZipFile: + return False + + class ExecutionTimeLimitExceeded(Exception): pass @@ -343,6 +352,7 @@ def _get_bundle(self, url, destination, cache=True): logger.info(f"Getting bundle {url} to unpack @ {destination}") download_needed = True + # Try to find the bundle in the cache of the worker if cache: # Hash url and download it if it doesn't exist url_without_params = url.split("?")[0] @@ -354,17 +364,28 @@ def _get_bundle(self, url, destination, cache=True): os.mkdir(self.bundle_dir) bundle_file = tempfile.NamedTemporaryFile(dir=self.bundle_dir, delete=False).name - if download_needed: + # Fetch and extract + retries, max_retries = (0, 10) + while retries < max_retries: + if download_needed: + try: + # Download the bundle + urlretrieve(url, bundle_file) + except HTTPError: + raise SubmissionException(f"Problem fetching {url} to put in {destination}") try: - urlretrieve(url, bundle_file) - except HTTPError: - raise SubmissionException(f"Problem fetching {url} to put in {destination}") - - # Extract the contents to destination directory - with ZipFile(bundle_file, 'r') as z: - z.extractall(os.path.join(self.root_dir, destination)) - - # Give back zip file path for other uses, i.e. md5'ing the zip to ID it + # Extract the contents to destination directory + with ZipFile(bundle_file, 'r') as z: + z.extractall(os.path.join(self.root_dir, destination)) + break # Break if the loop is successful + except BadZipFile: + retries += 1 + if retries >= max_retries: + raise # Re-raise the last caught BadZipFile exception + else: + logger.info("Failed. Retrying in 60 seconds...") + time.sleep(60) # Wait 60 seconds before retrying + # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file async def _run_container_engine_cmd(self, engine_cmd, kind): @@ -580,25 +601,40 @@ async def _run_program_directory(self, program_dir, kind, can_be_output=False): return await self._run_container_engine_cmd(engine_cmd, kind=kind) def _put_dir(self, url, directory): + """ Zip the directory and send it to the given URL using _put_file. + """ logger.info("Putting dir %s in %s" % (directory, url)) - - zip_path = make_archive(os.path.join(self.root_dir, str(uuid.uuid4())), 'zip', directory) - self._put_file(url, file=zip_path) + retries, max_retries = (0, 3) + while retries < max_retries: + # Zip the directory + start_time = time.time() + zip_path = make_archive(os.path.join(self.root_dir, str(uuid.uuid4())), 'zip', directory) + duration = time.time() - start_time + logger.info("Time needed to zip archive: {duration} seconds.") + if is_valid_zip(zip_path): # Check zip integrity + self._put_file(url, file=zip_path) # Send the file + break # Leave the loop in case of success + else: + retries += 1 + if retries >= max_retries: + raise Exception("ZIP file is corrupted or incomplete.") + else: + logger.info("Failed. Retrying in 30 seconds...") + time.sleep(30) # Wait 30 seconds before retrying def _put_file(self, url, file=None, raw_data=None, content_type='application/zip'): - + """ Send the file in the storage. + """ if file and raw_data: raise Exception("Cannot put both a file and raw_data") - + headers = { # For Azure only, other systems ignore these headers 'x-ms-blob-type': 'BlockBlob', 'x-ms-version': '2018-03-28', } - if content_type: headers['Content-Type'] = content_type - if file: logger.info("Putting file %s in %s" % (file, url)) data = open(file, 'rb')