Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: avoid stream.read() stuck by reading and exposing it as a Byte…
…sIO.
  • Loading branch information
gcavalcante8808 committed Apr 4, 2022
commit 6381e5c17627832c6297d801bde7b865a2022365
19 changes: 13 additions & 6 deletions tc_aws/aws/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) 2015, thumbor-community
# Use of this source code is governed by the MIT license that can be
# found in the LICENSE file.
import io

import aiobotocore.session
from botocore.client import Config
Expand All @@ -26,6 +27,7 @@ def __new__(cls, bucket, region, endpoint, *args, **kwargs):
"""
This handles all communication with AWS API
"""

def __init__(self, bucket, region, endpoint, max_retry=None):
"""
Constructor
Expand All @@ -46,6 +48,7 @@ def __init__(self, bucket, region, endpoint, max_retry=None):

self.region_name = region
self.endpoint_url = endpoint
self.session = aiobotocore.session.get_session()

async def exists(self, path):
"""
Expand All @@ -68,12 +71,16 @@ async def get(self, path):
Returns object at given path
:param string path: Path or 'key' to retrieve AWS object
"""
async with aiobotocore.session.get_session().create_client('s3', region_name=self.region_name,
endpoint_url=self.endpoint_url) as s3_client:
return await s3_client.get_object(
Bucket=self._bucket,
Key=self._clean_key(path),
)
async with self.session.create_client('s3', region_name=self.region_name,
endpoint_url=self.endpoint_url) as s3_client:
response = await s3_client.get_object(Bucket=self._bucket, Key=self._clean_key(path))
# TODO: Verify if it is possible to restore the original behavior were response['Body'] was a coroutine.
# Thumbor was getting stuck when response['Body'].read() was being called by s3_loader.load.
# To Avoid this, we read the Body content and expose it as a BytesIO to maintain the interface.
content = await response['Body'].read()
response['Body'] = io.BytesIO(content)

return response

async def get_url(self, path, method='GET', expiry=3600):
"""
Expand Down
5 changes: 3 additions & 2 deletions tc_aws/loaders/s3_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ async def load(context, url):
return result

result.successful = True
async with file_key['Body'] as stream:
result.buffer = await stream.read()
with file_key['Body'] as stream:
result.buffer = stream.read()
del stream

result.metadata.update(
size=file_key['ContentLength'],
Expand Down
6 changes: 4 additions & 2 deletions tc_aws/result_storages/s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ async def get(self, path = None):
return None

result = ResultStorageResult()
async with key['Body'] as stream:
result.buffer = await stream.read()
with key['Body'] as stream:
result.buffer = stream.read()
del stream

result.successful = True

result.metadata = {
Expand Down
13 changes: 7 additions & 6 deletions tc_aws/storages/s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ async def get_crypto(self, path):
logger.warn("[STORAGE] s3 key not found at %s" % crypto_path)
return None

async with file_key['Body'] as stream:
file_key = await stream.read()
with file_key['Body'] as stream:
file_key = stream.read()
del stream

return file_key.decode('utf-8')

Expand All @@ -130,8 +131,8 @@ async def get_detector_data(self, path):
if not file_key or self.is_expired(file_key) or 'Body' not in file_key:
return None

async with file_key['Body'] as stream:
return loads(await stream.read())
with file_key['Body'] as stream:
return loads(stream.read())

async def get(self, path):
"""
Expand All @@ -144,8 +145,8 @@ async def get(self, path):
except BotoCoreError:
return None

async with file['Body'] as stream:
return await stream.read()
with file['Body'] as stream:
return stream.read()

async def exists(self, path):
"""
Expand Down