Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 57 additions & 54 deletions src/buildstream/downloadablefilesource.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,36 @@ def _download_file(opener, url, etag, directory):
if etag is not None:
request.add_header("If-None-Match", etag)

with contextlib.closing(opener.open(request)) as response:
info = response.info()
try:
with contextlib.closing(opener.open(request)) as response:
info = response.info()

# some servers don't honor the 'If-None-Match' header
if etag and info["ETag"] == etag:
return None, None
# some servers don't honor the 'If-None-Match' header
if etag and info["ETag"] == etag:
return None, None, None

etag = info["ETag"]

filename = info.get_filename(default_name)
filename = os.path.basename(filename)
local_file = os.path.join(directory, filename)
with open(local_file, "wb") as dest:
shutil.copyfileobj(response, dest)

etag = info["ETag"]
except urllib.error.HTTPError as e:
if e.code == 304:
# 304 Not Modified.
# Because we use etag only for matching ref, currently specified ref is what
# we would have downloaded.
return None, None, None

filename = info.get_filename(default_name)
filename = os.path.basename(filename)
local_file = os.path.join(directory, filename)
with open(local_file, "wb") as dest:
shutil.copyfileobj(response, dest)
return None, None, str(e)
except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e:
# Note that urllib.request.Request in the try block may throw a
# ValueError for unknown url types, so we handle it here.
return None, None, str(e)

return local_file, etag
return local_file, etag, None


class DownloadableFileSource(Source):
Expand Down Expand Up @@ -206,50 +220,39 @@ def _store_etag(self, ref, etag):

def _ensure_mirror(self, activity_name: str):
# Downloads from the url and caches it according to its sha256sum.
try:
with self.tempdir() as td:
# We do not use etag in case what we have in cache is
# not matching ref in order to be able to recover from
# corrupted download.
if self.ref and not self.is_cached():
# Do not re-download the file if the ETag matches.
etag = self._get_etag(self.ref)
else:
etag = None

local_file, new_etag = self.blocking_activity(
_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
)

if local_file is None:
return self.ref

# Make sure url-specific mirror dir exists.
if not os.path.isdir(self._mirror_dir):
os.makedirs(self._mirror_dir)

# Store by sha256sum
sha256 = utils.sha256sum(local_file)
# Even if the file already exists, move the new file over.
# In case the old file was corrupted somehow.
os.rename(local_file, self._get_mirror_file(sha256))

if new_etag:
self._store_etag(sha256, new_etag)
return sha256

except urllib.error.HTTPError as e:
if e.code == 304:
# 304 Not Modified.
# Because we use etag only for matching ref, currently specified ref is what
# we would have downloaded.
with self.tempdir() as td:
# We do not use etag in case what we have in cache is
# not matching ref in order to be able to recover from
# corrupted download.
if self.ref and not self.is_cached():
# Do not re-download the file if the ETag matches.
etag = self._get_etag(self.ref)
else:
etag = None

local_file, new_etag, error = self.blocking_activity(
_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
)

if error:
raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, error), temporary=True)

if local_file is None:
return self.ref
raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e

except (urllib.error.URLError, urllib.error.ContentTooShortError, OSError, ValueError) as e:
# Note that urllib.request.Request in the try block may throw a
# ValueError for unknown url types, so we handle it here.
raise SourceError("{}: Error mirroring {}: {}".format(self, self.url, e), temporary=True) from e
# Make sure url-specific mirror dir exists.
if not os.path.isdir(self._mirror_dir):
os.makedirs(self._mirror_dir)

# Store by sha256sum
sha256 = utils.sha256sum(local_file)
# Even if the file already exists, move the new file over.
# In case the old file was corrupted somehow.
os.rename(local_file, self._get_mirror_file(sha256))

if new_etag:
self._store_etag(sha256, new_etag)
return sha256

def _get_mirror_file(self, sha=None):
if sha is not None:
Expand Down
20 changes: 5 additions & 15 deletions src/buildstream/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
import multiprocessing.popen_forkserver # type: ignore

import os
import pickle
import queue
import signal
import subprocess
Expand Down Expand Up @@ -184,13 +183,8 @@ def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callabl
try:
result = target(*args)
result_queue.put((None, result))
except Exception as exc: # pylint: disable=broad-except
try:
# Here we send the result again, just in case it was a PickleError
# in which case the same exception would be thrown down
result_queue.put((exc, result))
except pickle.PickleError:
result_queue.put((traceback.format_exc(), None))
except Exception: # pylint: disable=broad-except
result_queue.put((traceback.format_exc(), None))


class Plugin:
Expand Down Expand Up @@ -606,7 +600,8 @@ def blocking_activity(
in order to avoid starving the scheduler.

The function, its arguments and return value must all be pickleable,
as it will be run in another process.
as it will be run in another process. The function should not raise
an exception.

This should be used whenever there is a potential for a blocking
syscall to not return in a reasonable (<1s) amount of time.
Expand Down Expand Up @@ -676,12 +671,7 @@ def resume_proc():
raise PluginError("Background process didn't exit after 15 seconds and got killed.")

if err is not None:
if isinstance(err, str):
# This was a pickle error, this is a bug
raise PluginError(
"An error happened while returning the result from a blocking activity", detail=err
)
raise err
raise PluginError("An error happened while running a blocking activity", detail=err)

return result

Expand Down