Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Adding chunk_size to Blob._initiate_resumable_upload.
This is to avoid monkey-patching the instance when "pure" behavior
will suffice.

Also removed the transport from Blob._get_upload_arguments().
  • Loading branch information
dhermes committed May 4, 2017
commit 4fcbe8a4b536b053027535f345889ea73fd16444
54 changes: 26 additions & 28 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def _get_writable_metadata(self):

return object_metadata

def _get_upload_arguments(self, client, content_type):
def _get_upload_arguments(self, content_type):
"""Get required arguments for performing an upload.

The content type returned will be determined in order of precedence:
Expand All @@ -568,27 +568,20 @@ def _get_upload_arguments(self, client, content_type):
- The value stored on the current blob
- The default value ('application/octet-stream')

:type client: :class:`~google.cloud.storage.client.Client`
:param client: (Optional) The client to use. If not passed, falls back
to the ``client`` stored on the blob's bucket.

:type content_type: str
:param content_type: Type of content being uploaded (or :data:`None`).

:rtype: tuple
:returns: A quadruple of
:returns: A triple of

* An
:class:`~google.auth.transport.requests.AuthorizedSession`
* A header dictionary
* An object metadata dictionary
* The ``content_type`` as a string (according to precedence)
"""
transport = self._make_transport(client)
headers = _get_encryption_headers(self._encryption_key)
object_metadata = self._get_writable_metadata()
content_type = self._get_content_type(content_type)
return transport, headers, object_metadata, content_type
return headers, object_metadata, content_type

def _do_multipart_upload(self, client, stream, content_type, size):
"""Perform a multipart upload.
Expand Down Expand Up @@ -631,8 +624,9 @@ def _do_multipart_upload(self, client, stream, content_type, size):
msg = _READ_LESS_THAN_SIZE.format(size, len(data))
raise ValueError(msg)

info = self._get_upload_arguments(client, content_type)
transport, headers, object_metadata, content_type = info
transport = self._make_transport(client)
info = self._get_upload_arguments(content_type)
headers, object_metadata, content_type = info

upload_url = _MULTIPART_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
Expand All @@ -643,11 +637,9 @@ def _do_multipart_upload(self, client, stream, content_type, size):
return response

def _initiate_resumable_upload(self, client, stream, content_type,
size, extra_headers=None):
size, extra_headers=None, chunk_size=None):
"""Initiate a resumable upload.

Assumes ``chunk_size`` is not :data:`None` on the current blob.

The content type of the upload will be determined in order
of precedence:

Expand All @@ -674,6 +666,13 @@ def _initiate_resumable_upload(self, client, stream, content_type,
:param extra_headers: (Optional) Extra headers to add to standard
headers.

:type chunk_size: int
:param chunk_size:
(Optional) Chunk size to use when creating a
:class:`~google.resumable_media.requests.ResumableUpload`.
If not passed, will fall back to the chunk size on the
current blob.

:rtype: tuple
:returns:
Pair of
Expand All @@ -682,14 +681,18 @@ def _initiate_resumable_upload(self, client, stream, content_type,
that was created
* The ``transport`` used to initiate the upload.
"""
info = self._get_upload_arguments(client, content_type)
transport, headers, object_metadata, content_type = info
if chunk_size is None:
chunk_size = self.chunk_size

transport = self._make_transport(client)
info = self._get_upload_arguments(content_type)
headers, object_metadata, content_type = info
if extra_headers is not None:
headers.update(extra_headers)

upload_url = _RESUMABLE_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
upload = ResumableUpload(upload_url, self.chunk_size, headers=headers)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)
upload.initiate(
transport, stream, object_metadata, content_type,
total_bytes=size, stream_final=False)
Expand Down Expand Up @@ -996,24 +999,19 @@ def create_resumable_upload_session(
# determines the origins allowed for CORS.
extra_headers['Origin'] = origin

curr_chunk_size = self.chunk_size
try:
# Temporarily patch the chunk size. A user should still be able
# to initiate an upload session without setting the chunk size.
# The chunk size only matters when **sending** bytes to an upload.
self.chunk_size = self._CHUNK_SIZE_MULTIPLE

dummy_stream = BytesIO(b'')
# Send a fake the chunk size which we **know** will be acceptable
# to the `ResumableUpload` constructor. The chunk size only
# matters when **sending** bytes to an upload.
upload, _ = self._initiate_resumable_upload(
client, dummy_stream, content_type, size,
extra_headers=extra_headers)
extra_headers=extra_headers,
chunk_size=self._CHUNK_SIZE_MULTIPLE)

return upload.resumable_url
except resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)
finally:
# Put back the original chunk size.
self.chunk_size = curr_chunk_size

def get_iam_policy(self, client=None):
"""Retrieve the IAM policy for the object.
Expand Down
31 changes: 19 additions & 12 deletions storage/tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,12 @@ def test__get_upload_arguments(self):
name = u'blob-name'
key = b'[pXw@,p@@AfBfrR3x-2b2SCHR,.?YwRO'
blob = self._make_one(name, bucket=None, encryption_key=key)
blob._make_transport = mock.Mock(spec=[])
blob.content_disposition = 'inline'

client = mock.sentinel.mock
content_type = u'image/jpeg'
info = blob._get_upload_arguments(client, content_type)
info = blob._get_upload_arguments(content_type)

transport, headers, object_metadata, new_content_type = info
self.assertIs(transport, blob._make_transport.return_value)
headers, object_metadata, new_content_type = info
header_key_value = 'W3BYd0AscEBAQWZCZnJSM3gtMmIyU0NIUiwuP1l3Uk8='
header_key_hash_value = 'G0++dxF4q5rG4o9kE8gvEKn15RH6wLm0wXV1MgAlXOg='
expected_headers = {
Expand All @@ -758,8 +755,6 @@ def test__get_upload_arguments(self):
self.assertEqual(object_metadata, expected_metadata)
self.assertEqual(new_content_type, content_type)

blob._make_transport.assert_called_once_with(client)

def _mock_transport(self, status_code, headers, content=b''):
fake_transport = mock.Mock(spec=['request'])
fake_response = self._mock_requests_response(
Expand Down Expand Up @@ -838,7 +833,8 @@ def test__do_multipart_upload_bad_size(self):
'was specified but the file-like object only had', exc_contents)
self.assertEqual(stream.tell(), len(data))

def _initiate_resumable_helper(self, size=None, extra_headers=None):
def _initiate_resumable_helper(self, size=None, extra_headers=None,
chunk_size=None):
from google.resumable_media.requests import ResumableUpload

bucket = mock.Mock(path='/b/whammy', spec=[u'path'])
Expand Down Expand Up @@ -866,7 +862,8 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None):
stream = io.BytesIO(data)
content_type = u'text/plain'
upload, transport = blob._initiate_resumable_upload(
client, stream, content_type, size, extra_headers=extra_headers)
client, stream, content_type, size,
extra_headers=extra_headers, chunk_size=chunk_size)

# Check the returned values.
self.assertIsInstance(upload, ResumableUpload)
Expand All @@ -881,7 +878,11 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None):
self.assertEqual(upload._headers, extra_headers)
self.assertIsNot(upload._headers, extra_headers)
self.assertFalse(upload.finished)
self.assertEqual(upload._chunk_size, blob.chunk_size)
if chunk_size is None:
self.assertEqual(upload._chunk_size, blob.chunk_size)
else:
self.assertNotEqual(blob.chunk_size, chunk_size)
self.assertEqual(upload._chunk_size, chunk_size)
self.assertIs(upload._stream, stream)
if size is None:
self.assertIsNone(upload._total_bytes)
Expand Down Expand Up @@ -914,6 +915,10 @@ def test__initiate_resumable_upload_no_size(self):
def test__initiate_resumable_upload_with_size(self):
self._initiate_resumable_helper(size=10000)

def test__initiate_resumable_upload_with_chunk_size(self):
one_mb = 1048576
self._initiate_resumable_helper(chunk_size=one_mb)

def test__initiate_resumable_upload_with_extra_headers(self):
extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'}
self._initiate_resumable_helper(extra_headers=extra_headers)
Expand Down Expand Up @@ -1222,7 +1227,8 @@ def _create_resumable_upload_session_helper(self, origin=None,
side_effect=None):
bucket = mock.Mock(path='/b/alex-trebek', spec=[u'path'])
blob = self._make_one('blob-name', bucket=bucket)
blob.chunk_size = 99 * blob._CHUNK_SIZE_MULTIPLE
chunk_size = 99 * blob._CHUNK_SIZE_MULTIPLE
blob.chunk_size = chunk_size

# Create mocks to be checked for doing transport.
resumable_url = 'http://test.invalid?upload_id=clean-up-everybody'
Expand All @@ -1241,8 +1247,9 @@ def _create_resumable_upload_session_helper(self, origin=None,
content_type=content_type, size=size,
origin=origin, client=client)

# Check the returned value.
# Check the returned value and (lack of) side-effect.
self.assertEqual(new_url, resumable_url)
self.assertEqual(blob.chunk_size, chunk_size)

# Check the mocks.
blob._make_transport.assert_called_once_with(client)
Expand Down