From 86fccac88117a081e179a6c45419d38132a903a2 Mon Sep 17 00:00:00 2001 From: Abderrahim Kitouni Date: Mon, 16 Jan 2023 21:19:54 +0100 Subject: [PATCH 1/2] plugin: don't try to pickle the exceptions thrown by blocking_activity While this is technically an API break, the old promise of raising the error in the calling process never worked. --- src/buildstream/plugin.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index e1e7b1fb6..14e35b991 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -123,7 +123,6 @@ import multiprocessing.popen_forkserver # type: ignore import os -import pickle import queue import signal import subprocess @@ -184,13 +183,8 @@ def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callabl try: result = target(*args) result_queue.put((None, result)) - except Exception as exc: # pylint: disable=broad-except - try: - # Here we send the result again, just in case it was a PickleError - # in which case the same exception would be thrown down - result_queue.put((exc, result)) - except pickle.PickleError: - result_queue.put((traceback.format_exc(), None)) + except Exception: # pylint: disable=broad-except + result_queue.put((traceback.format_exc(), None)) class Plugin: @@ -606,7 +600,8 @@ def blocking_activity( in order to avoid starving the scheduler. The function, its arguments and return value must all be pickleable, - as it will be run in another process. + as it will be run in another process. The function should not raise + an exception. This should be used whenever there is a potential for a blocking syscall to not return in a reasonable (<1s) amount of time. @@ -676,12 +671,7 @@ def resume_proc(): raise PluginError("Background process didn't exit after 15 seconds and got killed.") if err is not None: - if isinstance(err, str): - # This was a pickle error, this is a bug - raise PluginError( - "An error happened while returning the result from a blocking activity", detail=err - ) - raise err + raise PluginError("An error happened while running a blocking activity", detail=err) return result From 37785226b6449c52960c2b4895dbb07fb4a42902 Mon Sep 17 00:00:00 2001 From: Abderrahim Kitouni Date: Fri, 20 Jan 2023 14:02:57 +0100 Subject: [PATCH 2/2] downloadablefilesource: handle exceptions in _download_file Don't rely on the ability to get pickled exceptions from blocking_activity as that doesn't seem to work with any supported python version. May help with https://github.com/apache/buildstream/issues/1766 --- src/buildstream/downloadablefilesource.py | 111 +++++++++++----------- 1 file changed, 57 insertions(+), 54 deletions(-) diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index d36da6749..2e261cb6e 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -97,22 +97,36 @@ def _download_file(opener, url, etag, directory): if etag is not None: request.add_header("If-None-Match", etag) - with contextlib.closing(opener.open(request)) as response: - info = response.info() + try: + with contextlib.closing(opener.open(request)) as response: + info = response.info() - # some servers don't honor the 'If-None-Match' header - if etag and info["ETag"] == etag: - return None, None + # some servers don't honor the 'If-None-Match' header + if etag and info["ETag"] == etag: + return None, None, None + + etag = info["ETag"] + + filename = info.get_filename(default_name) + filename = os.path.basename(filename) + local_file = os.path.join(directory, filename) + with open(local_file, "wb") as dest: + shutil.copyfileobj(response, dest) - etag = info["ETag"] + except urllib.error.HTTPError as e: + if e.code == 304: + # 304 Not Modified. + # Because we use etag only for matching ref, currently specified ref is what + # we would have downloaded. + return None, None, None - filename = info.get_filename(default_name) - filename = os.path.basename(filename) - local_file = os.path.join(directory, filename) - with open(local_file, "wb") as dest: - shutil.copyfileobj(response, dest) + return None, None, str(e) + except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e: + # Note that urllib.request.Request in the try block may throw a + # ValueError for unknown url types, so we handle it here. + return None, None, str(e) - return local_file, etag + return local_file, etag, None class DownloadableFileSource(Source): @@ -206,50 +220,39 @@ def _store_etag(self, ref, etag): def _ensure_mirror(self, activity_name: str): # Downloads from the url and caches it according to its sha256sum. - try: - with self.tempdir() as td: - # We do not use etag in case what we have in cache is - # not matching ref in order to be able to recover from - # corrupted download. - if self.ref and not self.is_cached(): - # Do not re-download the file if the ETag matches. - etag = self._get_etag(self.ref) - else: - etag = None - - local_file, new_etag = self.blocking_activity( - _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name - ) - - if local_file is None: - return self.ref - - # Make sure url-specific mirror dir exists. - if not os.path.isdir(self._mirror_dir): - os.makedirs(self._mirror_dir) - - # Store by sha256sum - sha256 = utils.sha256sum(local_file) - # Even if the file already exists, move the new file over. - # In case the old file was corrupted somehow. - os.rename(local_file, self._get_mirror_file(sha256)) - - if new_etag: - self._store_etag(sha256, new_etag) - return sha256 - - except urllib.error.HTTPError as e: - if e.code == 304: - # 304 Not Modified. - # Because we use etag only for matching ref, currently specified ref is what - # we would have downloaded. + with self.tempdir() as td: + # We do not use etag in case what we have in cache is + # not matching ref in order to be able to recover from + # corrupted download. + if self.ref and not self.is_cached(): + # Do not re-download the file if the ETag matches. + etag = self._get_etag(self.ref) + else: + etag = None + + local_file, new_etag, error = self.blocking_activity( + _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name + ) + + if error: + raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, error), temporary=True) + + if local_file is None: return self.ref - raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e - except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e: - # Note that urllib.request.Request in the try block may throw a - # ValueError for unknown url types, so we handle it here. - raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e + # Make sure url-specific mirror dir exists. + if not os.path.isdir(self._mirror_dir): + os.makedirs(self._mirror_dir) + + # Store by sha256sum + sha256 = utils.sha256sum(local_file) + # Even if the file already exists, move the new file over. + # In case the old file was corrupted somehow. + os.rename(local_file, self._get_mirror_file(sha256)) + + if new_etag: + self._store_etag(sha256, new_etag) + return sha256 def _get_mirror_file(self, sha=None): if sha is not None: