From f17e3418a21c6be996c683764e79ac4ce8b3716d Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Tue, 21 Jun 2022 20:30:48 -0600 Subject: [PATCH 1/2] fix(datasets): increase create version request timeout --- .gitignore | 1 + gradient/commands/datasets.py | 283 +++++++++++++++++----------------- 2 files changed, 142 insertions(+), 142 deletions(-) diff --git a/.gitignore b/.gitignore index 6232bbe0..d9c08895 100644 --- a/.gitignore +++ b/.gitignore @@ -110,3 +110,4 @@ paperspace-python.zip bin/ lib64 share/ +/tmp \ No newline at end of file diff --git a/gradient/commands/datasets.py b/gradient/commands/datasets.py index a296c6ff..ce018e3a 100644 --- a/gradient/commands/datasets.py +++ b/gradient/commands/datasets.py @@ -561,141 +561,139 @@ def update_status(): class PutDatasetFilesCommand(BaseDatasetFilesCommand): # @classmethod - def _put(self, path, url, content_type, dataset_version_id=None, key=None): + def _put(self, session, path, url, content_type, dataset_version_id=None, key=None): size = os.path.getsize(path) - with requests.Session() as session: - headers = {'Content-Type': content_type} + headers = {'Content-Type': content_type} - try: - if size <= 0: - headers.update({'Content-Size': '0'}) - r = session.put(url, data='', headers=headers, timeout=5) - # for files under half a GB - elif size <= (10e8) / 2: - with open(path, 'rb') as f: - r = session.put( - url, data=f, headers=headers, timeout=5) - # # for chonky files, use a multipart upload - else: - # Chunks need to be at least 5MB or AWS throws an - # EntityTooSmall error; we'll arbitrarily choose a - # 15MB chunksize - # - # Note also that AWS limits the max number of chunkc - # in a multipart upload to 10000, so this setting - # currently enforces a hard limit on 150GB per file. - # - # We can dynamically assign a larger part size if needed, - # but for the majority of use cases we should be fine - # as-is - part_minsize = int(15e6) - dataset_id, _, version = dataset_version_id.partition(":") - mpu_url = f'/datasets/{dataset_id}/versions/{version}/s3/preSignedUrls' - - api_client = http_client.API( - api_url=config.CONFIG_HOST, - api_key=self.api_key, - ps_client_name=CLI_PS_CLIENT_NAME - ) - - mpu_create_res = api_client.post( - url=mpu_url, - json={ - 'datasetId': dataset_id, - 'version': version, - 'calls': [{ - 'method': 'createMultipartUpload', - 'params': {'Key': key} - }] - } - ) - mpu_data = json.loads(mpu_create_res.text)[0]['url'] - - parts = [] - with open(path, 'rb') as f: - # we +2 the number of parts since we're doing floor - # division, which will cut off any trailing part - # less than the part_minsize, AND we want to 1-index - # our range to match what AWS expects for part - # numbers - for part in range(1, (size // part_minsize) + 2): - presigned_url_res = api_client.post( - url=mpu_url, - json={ - 'datasetId': dataset_id, - 'version': version, - 'calls': [{ - 'method': 'uploadPart', - 'params': { - 'Key': key, - 'UploadId': mpu_data['UploadId'], - 'PartNumber': part - } - }] - } - ) - - presigned_url = json.loads( - presigned_url_res.text - )[0]['url'] - - chunk = f.read(part_minsize) - for attempt in range(0, 5): - part_res = session.put( - presigned_url, - data=chunk, - timeout=5) - if part_res.status_code == 200: - break - - if part_res.status_code != 200: - # Why do we silence exceptions that get - # explicitly raised? Mystery for the ages, but - # there you have it I guess... - print(f'\nUnable to complete upload of {path}') - raise ApplicationError( - f'Unable to complete upload of {path}') - etag = part_res.headers['ETag'].replace('"', '') - parts.append({'ETag': etag, 'PartNumber': part}) - # This is a pretty jank way to get about multipart - # upload status updates, but we structure the Halo - # spinner to report on the number of completed - # tasks dispatched to the workers in the pool. - # Since it's more of a PITA to properly distribute - # this MPU among all workers than I really want to - # deal with, that means we can't easily plug into - # Halo for these updates. But we can print to - # console! Which again, jank and noisy, but arguably - # better than a task sitting forever, never either - # completing or emitting an error message. - if len(parts) % 7 == 0: # About every 100MB - print( - f'\nUploaded {len(parts) * part_minsize / 10e5}MB ' - f'of {int(size / 10e5)}MB for ' - f'{path}' - ) - - r = api_client.post( - url=mpu_url, - json={ - 'datasetId': dataset_id, - 'version': version, - 'calls': [{ - 'method': 'completeMultipartUpload', - 'params': { - 'Key': key, - 'UploadId': mpu_data['UploadId'], - 'MultipartUpload': {'Parts': parts} - } - }] - } - ) - - self.validate_s3_response(r) - except requests.exceptions.ConnectionError as e: - return self.report_connection_error(e) - except Exception as e: - return e + try: + if size <= 0: + headers.update({'Content-Size': '0'}) + r = session.put(url, data='', headers=headers, timeout=5) + # for files under 15MB + elif size <= (15e6): + with open(path, 'rb') as f: + r = session.put( + url, data=f, headers=headers, timeout=300) + # # for chonky files, use a multipart upload + else: + # Chunks need to be at least 5MB or AWS throws an + # EntityTooSmall error; we'll arbitrarily choose a + # 15MB chunksize + # + # Note also that AWS limits the max number of chunks + # in a multipart upload to 10000, so this setting + # currently enforces a hard limit on 150GB per file. + # + # We can dynamically assign a larger part size if needed, + # but for the majority of use cases we should be fine + # as-is + part_minsize = int(15e6) + dataset_id, _, version = dataset_version_id.partition(":") + mpu_url = f'/datasets/{dataset_id}/versions/{version}/s3/preSignedUrls' + api_client = http_client.API( + api_url=config.CONFIG_HOST, + api_key=self.api_key, + ps_client_name=CLI_PS_CLIENT_NAME + ) + + mpu_create_res = api_client.post( + url=mpu_url, + json={ + 'datasetId': dataset_id, + 'version': version, + 'calls': [{ + 'method': 'createMultipartUpload', + 'params': {'Key': key} + }] + } + ) + + mpu_data = mpu_create_res.json()[0]['url'] + + parts = [] + with open(path, 'rb') as f: + # we +2 the number of parts since we're doing floor + # division, which will cut off any trailing part + # less than the part_minsize, AND we want to 1-index + # our range to match what AWS expects for part + # numbers + for part in range(1, (size // part_minsize) + 2): + presigned_url_res = api_client.post( + url=mpu_url, + json={ + 'datasetId': dataset_id, + 'version': version, + 'calls': [{ + 'method': 'uploadPart', + 'params': { + 'Key': key, + 'UploadId': mpu_data['UploadId'], + 'PartNumber': part + } + }] + } + ) + + presigned_url = presigned_url_res.json()[0]['url'] + + chunk = f.read(part_minsize) + + for attempt in range(0, 5): + part_res = session.put( + presigned_url, + data=chunk, + headers=headers, + timeout=300) + + if part_res.status_code == 200: + break + + if part_res.status_code != 200: + # Why do we silence exceptions that get + # explicitly raised? Mystery for the ages, but + # there you have it I guess... + print(f'\nUnable to complete upload of {path}') + raise ApplicationError( + f'Unable to complete upload of {path}') + etag = part_res.headers['ETag'].replace('"', '') + parts.append({'ETag': etag, 'PartNumber': part}) + # This is a pretty jank way to get about multipart + # upload status updates, but we structure the Halo + # spinner to report on the number of completed + # tasks dispatched to the workers in the pool. + # Since it's more of a PITA to properly distribute + # this MPU among all workers than I really want to + # deal with, that means we can't easily plug into + # Halo for these updates. But we can print to + # console! Which again, jank and noisy, but arguably + # better than a task sitting forever, never either + # completing or emitting an error message. + print( + f'\nUploaded {len(parts) * part_minsize / 10e5}MB ' + f'of {int(size / 10e5)}MB for ' + f'{path}' + ) + + r = api_client.post( + url=mpu_url, + json={ + 'datasetId': dataset_id, + 'version': version, + 'calls': [{ + 'method': 'completeMultipartUpload', + 'params': { + 'Key': key, + 'UploadId': mpu_data['UploadId'], + 'MultipartUpload': {'Parts': parts} + } + }] + } + ) + + except requests.exceptions.ConnectionError as e: + return self.report_connection_error(e) + except Exception as e: + return e @staticmethod def _list_files(source_path): @@ -718,15 +716,16 @@ def _sign_and_put(self, dataset_version_id, pool, results, update_status): Key=r['key'], ContentType=r['mimetype'])) for r in results], ) - for pre_signed, result in zip(pre_signeds, results): - update_status() - pool.put( - self._put, - url=pre_signed.url, - path=result['path'], - content_type=result['mimetype'], - dataset_version_id=dataset_version_id, - key=result['key']) + with requests.Session() as session: + for pre_signed, result in zip(pre_signeds, results): + update_status() + pool.put(self._put, + session, + result['path'], + pre_signed.url, + content_type=result['mimetype'], + dataset_version_id=dataset_version_id, + key=result['key']) def execute(self, dataset_version_id, source_paths, target_path): self.assert_supported(dataset_version_id) From f03c29137f363591b4454284b6d689c5589013dd Mon Sep 17 00:00:00 2001 From: Jared Lunde Date: Wed, 22 Jun 2022 08:17:00 -0600 Subject: [PATCH 2/2] add constants --- gradient/commands/datasets.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/gradient/commands/datasets.py b/gradient/commands/datasets.py index ce018e3a..68d2c2d5 100644 --- a/gradient/commands/datasets.py +++ b/gradient/commands/datasets.py @@ -558,6 +558,10 @@ def update_status(): pool.put(self._get, url=pre_signed.url, path=path) +MULTIPART_CHUNK_SIZE = int(15e6) # 15MB +PUT_TIMEOUT = 300 # 5 minutes + + class PutDatasetFilesCommand(BaseDatasetFilesCommand): # @classmethod @@ -570,10 +574,10 @@ def _put(self, session, path, url, content_type, dataset_version_id=None, key=No headers.update({'Content-Size': '0'}) r = session.put(url, data='', headers=headers, timeout=5) # for files under 15MB - elif size <= (15e6): + elif size <= (MULTIPART_CHUNK_SIZE): with open(path, 'rb') as f: r = session.put( - url, data=f, headers=headers, timeout=300) + url, data=f, headers=headers, timeout=PUT_TIMEOUT) # # for chonky files, use a multipart upload else: # Chunks need to be at least 5MB or AWS throws an @@ -587,7 +591,7 @@ def _put(self, session, path, url, content_type, dataset_version_id=None, key=No # We can dynamically assign a larger part size if needed, # but for the majority of use cases we should be fine # as-is - part_minsize = int(15e6) + part_minsize = MULTIPART_CHUNK_SIZE dataset_id, _, version = dataset_version_id.partition(":") mpu_url = f'/datasets/{dataset_id}/versions/{version}/s3/preSignedUrls' api_client = http_client.API( @@ -643,7 +647,7 @@ def _put(self, session, path, url, content_type, dataset_version_id=None, key=No presigned_url, data=chunk, headers=headers, - timeout=300) + timeout=PUT_TIMEOUT) if part_res.status_code == 200: break