Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,4 @@ paperspace-python.zip
bin/
lib64
share/
/tmp
287 changes: 145 additions & 142 deletions gradient/commands/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,144 +558,146 @@ def update_status():
pool.put(self._get, url=pre_signed.url, path=path)


MULTIPART_CHUNK_SIZE = int(15e6) # 15MB
PUT_TIMEOUT = 300 # 5 minutes


class PutDatasetFilesCommand(BaseDatasetFilesCommand):

# @classmethod
def _put(self, path, url, content_type, dataset_version_id=None, key=None):
def _put(self, session, path, url, content_type, dataset_version_id=None, key=None):
size = os.path.getsize(path)
with requests.Session() as session:
headers = {'Content-Type': content_type}
headers = {'Content-Type': content_type}

try:
if size <= 0:
headers.update({'Content-Size': '0'})
r = session.put(url, data='', headers=headers, timeout=5)
# for files under half a GB
elif size <= (10e8) / 2:
with open(path, 'rb') as f:
r = session.put(
url, data=f, headers=headers, timeout=5)
# # for chonky files, use a multipart upload
else:
# Chunks need to be at least 5MB or AWS throws an
# EntityTooSmall error; we'll arbitrarily choose a
# 15MB chunksize
#
# Note also that AWS limits the max number of chunkc
# in a multipart upload to 10000, so this setting
# currently enforces a hard limit on 150GB per file.
#
# We can dynamically assign a larger part size if needed,
# but for the majority of use cases we should be fine
# as-is
part_minsize = int(15e6)
dataset_id, _, version = dataset_version_id.partition(":")
mpu_url = f'/datasets/{dataset_id}/versions/{version}/s3/preSignedUrls'

api_client = http_client.API(
api_url=config.CONFIG_HOST,
api_key=self.api_key,
ps_client_name=CLI_PS_CLIENT_NAME
)

mpu_create_res = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'createMultipartUpload',
'params': {'Key': key}
}]
}
)
mpu_data = json.loads(mpu_create_res.text)[0]['url']

parts = []
with open(path, 'rb') as f:
# we +2 the number of parts since we're doing floor
# division, which will cut off any trailing part
# less than the part_minsize, AND we want to 1-index
# our range to match what AWS expects for part
# numbers
for part in range(1, (size // part_minsize) + 2):
presigned_url_res = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'uploadPart',
'params': {
'Key': key,
'UploadId': mpu_data['UploadId'],
'PartNumber': part
}
}]
}
)

presigned_url = json.loads(
presigned_url_res.text
)[0]['url']

chunk = f.read(part_minsize)
for attempt in range(0, 5):
part_res = session.put(
presigned_url,
data=chunk,
timeout=5)
if part_res.status_code == 200:
break

if part_res.status_code != 200:
# Why do we silence exceptions that get
# explicitly raised? Mystery for the ages, but
# there you have it I guess...
print(f'\nUnable to complete upload of {path}')
raise ApplicationError(
f'Unable to complete upload of {path}')
etag = part_res.headers['ETag'].replace('"', '')
parts.append({'ETag': etag, 'PartNumber': part})
# This is a pretty jank way to get about multipart
# upload status updates, but we structure the Halo
# spinner to report on the number of completed
# tasks dispatched to the workers in the pool.
# Since it's more of a PITA to properly distribute
# this MPU among all workers than I really want to
# deal with, that means we can't easily plug into
# Halo for these updates. But we can print to
# console! Which again, jank and noisy, but arguably
# better than a task sitting forever, never either
# completing or emitting an error message.
if len(parts) % 7 == 0: # About every 100MB
print(
f'\nUploaded {len(parts) * part_minsize / 10e5}MB '
f'of {int(size / 10e5)}MB for '
f'{path}'
)

r = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'completeMultipartUpload',
'params': {
'Key': key,
'UploadId': mpu_data['UploadId'],
'MultipartUpload': {'Parts': parts}
}
}]
}
)

self.validate_s3_response(r)
except requests.exceptions.ConnectionError as e:
return self.report_connection_error(e)
except Exception as e:
return e
try:
if size <= 0:
headers.update({'Content-Size': '0'})
r = session.put(url, data='', headers=headers, timeout=5)
# for files under 15MB
elif size <= (MULTIPART_CHUNK_SIZE):
with open(path, 'rb') as f:
r = session.put(
url, data=f, headers=headers, timeout=PUT_TIMEOUT)
# # for chonky files, use a multipart upload
else:
# Chunks need to be at least 5MB or AWS throws an
# EntityTooSmall error; we'll arbitrarily choose a
# 15MB chunksize
#
# Note also that AWS limits the max number of chunks
# in a multipart upload to 10000, so this setting
# currently enforces a hard limit on 150GB per file.
#
# We can dynamically assign a larger part size if needed,
# but for the majority of use cases we should be fine
# as-is
part_minsize = MULTIPART_CHUNK_SIZE
dataset_id, _, version = dataset_version_id.partition(":")
mpu_url = f'/datasets/{dataset_id}/versions/{version}/s3/preSignedUrls'
api_client = http_client.API(
api_url=config.CONFIG_HOST,
api_key=self.api_key,
ps_client_name=CLI_PS_CLIENT_NAME
)

mpu_create_res = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'createMultipartUpload',
'params': {'Key': key}
}]
}
)

mpu_data = mpu_create_res.json()[0]['url']

parts = []
with open(path, 'rb') as f:
# we +2 the number of parts since we're doing floor
# division, which will cut off any trailing part
# less than the part_minsize, AND we want to 1-index
# our range to match what AWS expects for part
# numbers
Comment on lines +619 to +623
Copy link
Copy Markdown

@ghost ghost Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Shouldn't you use ceil? I have a gradient version create and gradient files put where it crashes when there is a file that evenly divides the 15mb chunk size (75mb). I suspect that it's trying to read an extra chunk that doesn't exist. The progress bar says 90mb/75mb when it crashes. Does that make sense? I could be misreading things.

Also I think you mean to say that you 1+index because you start counting from 1 and you need to correct for range. Not entirely sure. Could you check this out?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

@ghost ghost Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was a known bug in a previous PR but intentionally left in. 🥇 The effect of reading past the end of the file was not predicted though.

#384 (comment)

for part in range(1, (size // part_minsize) + 2):
presigned_url_res = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'uploadPart',
'params': {
'Key': key,
'UploadId': mpu_data['UploadId'],
'PartNumber': part
}
}]
}
)

presigned_url = presigned_url_res.json()[0]['url']

chunk = f.read(part_minsize)

for attempt in range(0, 5):
part_res = session.put(
presigned_url,
data=chunk,
headers=headers,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add headers

timeout=PUT_TIMEOUT)

if part_res.status_code == 200:
break

if part_res.status_code != 200:
# Why do we silence exceptions that get
# explicitly raised? Mystery for the ages, but
# there you have it I guess...
print(f'\nUnable to complete upload of {path}')
raise ApplicationError(
f'Unable to complete upload of {path}')
etag = part_res.headers['ETag'].replace('"', '')
parts.append({'ETag': etag, 'PartNumber': part})
# This is a pretty jank way to get about multipart
# upload status updates, but we structure the Halo
# spinner to report on the number of completed
# tasks dispatched to the workers in the pool.
# Since it's more of a PITA to properly distribute
# this MPU among all workers than I really want to
# deal with, that means we can't easily plug into
# Halo for these updates. But we can print to
# console! Which again, jank and noisy, but arguably
# better than a task sitting forever, never either
# completing or emitting an error message.
print(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report every chunk

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't too noisy now that we removed branch around it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think the previous one was noisy enough honestly. It feels like nothing is happening. In general, we just need a better progress bar.

f'\nUploaded {len(parts) * part_minsize / 10e5}MB '
f'of {int(size / 10e5)}MB for '
f'{path}'
)

r = api_client.post(
url=mpu_url,
json={
'datasetId': dataset_id,
'version': version,
'calls': [{
'method': 'completeMultipartUpload',
'params': {
'Key': key,
'UploadId': mpu_data['UploadId'],
'MultipartUpload': {'Parts': parts}
}
}]
}
)

except requests.exceptions.ConnectionError as e:
return self.report_connection_error(e)
except Exception as e:
return e

@staticmethod
def _list_files(source_path):
Expand All @@ -718,15 +720,16 @@ def _sign_and_put(self, dataset_version_id, pool, results, update_status):
Key=r['key'], ContentType=r['mimetype'])) for r in results],
)

for pre_signed, result in zip(pre_signeds, results):
update_status()
pool.put(
self._put,
url=pre_signed.url,
path=result['path'],
content_type=result['mimetype'],
dataset_version_id=dataset_version_id,
key=result['key'])
with requests.Session() as session:
Copy link
Copy Markdown
Contributor Author

@jared-paperspace jared-paperspace Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use connection pooling from urllib3. Previously we weren't utilizing this feature, only the context.

for pre_signed, result in zip(pre_signeds, results):
update_status()
pool.put(self._put,
session,
result['path'],
pre_signed.url,
content_type=result['mimetype'],
dataset_version_id=dataset_version_id,
key=result['key'])

def execute(self, dataset_version_id, source_paths, target_path):
self.assert_supported(dataset_version_id)
Expand Down