-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_cloud_upload.py
More file actions
57 lines (43 loc) · 1.49 KB
/
test_cloud_upload.py
File metadata and controls
57 lines (43 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import boto3
import falcon
import falcon.testing
import moto
import pytest
FORM_MIMETYPE = 'multipart/form-data; boundary=boundary1337'
FORM_FILE_CONTENT = b'PeregrineFalcon\n' * 64 * 1024
FORM = (
b'--boundary1337\r\n'
b'Content-Disposition: form-data; name="myfile"; filename="bytes"\r\n'
b'Content-Type: application/x-falcon\r\n\r\n' +
FORM_FILE_CONTENT +
b'\r\n'
b'--boundary1337--\r\n'
)
class CloudUpload:
def on_post(self, req, resp):
s3 = boto3.client('s3')
for part in req.media:
if part.name == 'myfile':
s3.upload_fileobj(part.stream, 'mybucket', 'mykey')
resp.location = '/files/mybucket/mykey'
resp.status = falcon.HTTP_CREATED
@pytest.fixture
def api_client():
handlers = falcon.media.Handlers({
falcon.MEDIA_JSON: falcon.media.JSONHandler(),
falcon.MEDIA_MULTIPART: falcon.media.MultipartFormHandler(),
})
api = falcon.App()
api.req_options.media_handlers = handlers
api.add_route('/uploads', CloudUpload())
return falcon.testing.TestClient(api)
@moto.mock_s3
def test_upload_to_mock_s3(api_client):
conn = boto3.resource('s3', region_name='eu-north-1')
conn.create_bucket(Bucket='mybucket')
resp = api_client.simulate_post(
'/uploads', headers={'Content-Type': FORM_MIMETYPE}, body=FORM)
assert resp.status_code == 201
s3_obj = conn.Object('mybucket', 'mykey')
Body = s3_obj.get()['Body']
assert Body.read() == FORM_FILE_CONTENT