-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharch-mgr.py
More file actions
executable file
·214 lines (165 loc) · 7.99 KB
/
arch-mgr.py
File metadata and controls
executable file
·214 lines (165 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
import hashlib
from pathlib import Path
import boto3
import fire
import tomli
import cloud_watch_logger
logger = None
def transition_all_objects_to_archive(ingest_bucket = None,
archive_bucket = None,
archive_storage_class = None,
remove_from_ingest_bucket = None):
ingest_bucket = ingest_bucket or configuration['ingest_bucket']
logger.log("Entering transition_all_objects_to_archive")
logger.log("Listing ingest bucket...")
response = s3.list_objects_v2(Bucket=ingest_bucket)
if response['IsTruncated']:
raise "Too many objects in ingest bucket"
keys = [obj['Key'] for obj in response['Contents']]
logger.log(str(len(keys)) + " keys retrieved from ingest bucket")
for key in keys:
logger.log("Processing key: " + key)
metadata = s3.head_object(Bucket=ingest_bucket, Key=key)['Metadata']
if "md5sum" in metadata:
expected_md5_sum = metadata["md5sum"]
transition_object_to_archive(key=key,
expected_md5_sum=expected_md5_sum,
ingest_bucket=ingest_bucket,
archive_bucket=archive_bucket,
archive_storage_class=archive_storage_class,
remove_from_ingest_bucket=remove_from_ingest_bucket)
else:
logger.log("Skipping key " + key + " because expected md5sum is not found")
logger.log("Exiting transition_all_objects_to_archive")
def transition_object_to_archive(key,
expected_md5_sum = None,
ingest_bucket = None,
archive_bucket = None,
archive_storage_class = None,
remove_from_ingest_bucket = None):
class CopyCallbackManager():
def __init__(self):
self.byte_counter = 0
self.last_logged_byte_count = 0
def increment_byte_counter(self, byte_count):
self.byte_counter += byte_count
if (self.byte_counter - self.last_logged_byte_count) >= 1024 * 1024 * 1024:
self.flush()
def flush(self):
if self.byte_counter > self.last_logged_byte_count:
# It seems that the copy callbacks are asynchronous and re-entrant.
# This can create problems with the AWS CloudWatch logger and the use of the
# serially issued sequence tokens.
# As a workaround, we only accumulate log entries while in a copy operation,
# and post them all to the logger when we are done; so use add_event() and not
# log() here.
logger.add_event(str(self.byte_counter) + " bytes copied so far.")
self.last_logged_byte_count = self.byte_counter
def reset(self):
self.flush()
self.byte_counter = 0
self.last_logged_byte_count = 0
logger.flush()
ingest_bucket = ingest_bucket or configuration['ingest_bucket']
archive_bucket = archive_bucket or configuration['archive_bucket']
archive_storage_class = archive_storage_class or configuration['archive_storage_class']
remove_from_ingest_bucket = remove_from_ingest_bucket or configuration['remove_from_ingest_bucket']
if not key:
raise ValueError('key not specified')
if not ingest_bucket:
raise ValueError('ingest_bucket not specified')
if not archive_bucket:
raise ValueError('archive_bucket not specified')
if remove_from_ingest_bucket is None:
raise ValueError('remove_from_ingest_bucket not specified')
logger.log("Entering transition_object_to_archive")
logger.log("KEY: " + key)
logger.log("INGEST_BUCKET: " + ingest_bucket)
logger.log("ARCHIVE_BUCKET: " + archive_bucket)
logger.log("ARCHIVE_STORAGE_CLASS: " + archive_storage_class)
logger.log("REMOVE_FROM_INGEST_BUCKET: " + str(remove_from_ingest_bucket))
if not expected_md5_sum:
logger.log("Checking ingested object metadata for md5sum")
metadata = s3.head_object(Bucket=ingest_bucket, Key=key)['Metadata']
if 'md5sum' in metadata:
expected_md5_sum = metadata['md5sum']
if expected_md5_sum:
logger.log("EXPECTED_MD5_SUM: " + expected_md5_sum)
else:
raise ValueError('expected_md5_sum not specified (must be specified or read from ingest object metadata)')
hexdigest = compute_object_md5_sum(key = key, bucket = ingest_bucket)
if hexdigest != expected_md5_sum:
raise ValueError('object does not have expected md5 checksum')
logger.log("Starting copy to archive bucket")
copy_callback_manager = CopyCallbackManager()
s3.copy({'Bucket': ingest_bucket, 'Key': key},
archive_bucket, key,
ExtraArgs = {'Metadata': {'md5sum': hexdigest},
'MetadataDirective': 'REPLACE',
'StorageClass': archive_storage_class},
Callback = lambda byte_count: copy_callback_manager.increment_byte_counter(byte_count)
)
copy_callback_manager.flush()
logger.log("Ending copy to archive bucket")
if remove_from_ingest_bucket:
logger.log("Starting remove from ingest bucket")
s3.delete_object(Bucket=ingest_bucket, Key=key)
logger.log("Ending remove from ingest bucket")
logger.log("Exiting transition_object_to_archive")
def compute_object_md5_sum(key = None,
bucket = None):
logger.log("Entering compute_object_md5_sum")
bucket = bucket or configuration['ingest_bucket']
if not bucket:
raise ValueError('bucket not specified')
if not key:
raise ValueError('key not specified')
chunk_size = 16 * 1024 * 1024 # 16 MByte
logger.log("CHUNKSIZE: " + str(chunk_size / 1024 / 1024) + " MB")
logger.log("BUCKET: " + bucket)
logger.log("KEY: " + key)
body = s3.get_object(Bucket=bucket, Key=key)['Body']
object_hash = hashlib.md5()
chunk_count = 0
for chunk in body.iter_chunks(chunk_size = chunk_size):
object_hash.update(chunk)
chunk_count += 1
if chunk_count % 100 == 0:
logger.log("CHUNK " + str(chunk_count) + " PROCESSED")
hexdigest = object_hash.hexdigest()
logger.log("CHUNK_COUNT: " + str(chunk_count))
logger.log("MD5: " + hexdigest)
logger.log("Exiting compute_object_md5_sum")
return(hexdigest)
def get_configuration():
def augment_config(current_config, new_info_path):
if new_info_path.is_file():
with open(new_info_path, mode="rb") as fp:
# This is will not work if we ever support nested attributes in the config
current_config = {**current_config, **tomli.load(fp)}
return current_config
default_config = {
"ingest_bucket": None,
"archive_bucket": None,
"archive_storage_class": "STANDARD",
"remove_from_ingest_bucket": False,
}
config = default_config
config = augment_config(config, Path.home() / "arch-mgr.cfg")
config = augment_config(config, Path.cwd() / "arch-mgr.cfg")
return config
if __name__ == "__main__":
s3 = boto3.client('s3')
configuration = get_configuration()
aws_region = None
cloudwatch_log_group = None
if ("cloudwatch_log_group" in configuration) and ("aws_region" in configuration):
cloudwatch_log_group = configuration['cloudwatch_log_group']
aws_region = configuration['aws_region']
with cloud_watch_logger.CloudWatchLogger(log_group_name=cloudwatch_log_group,
region=aws_region,
app_name='s3_archive_manager',
enable_exception_logging = True) as l:
logger = l
fire.Fire()