From 02a80eabfb94658e5c143e82f4656cc9ca46790d Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Thu, 24 Oct 2024 08:28:53 -0500 Subject: [PATCH 01/13] isolate TAC for processing and validation --- src/db_extractor_full.py | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 988f7e2..888d85d 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -74,6 +74,7 @@ def db_extractor(): # table_dump_ignore = ['django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] table_dump_ignore = ['zip3_distances', 'transportation_service_provider_performances','move' ,'move_to_gbloc' , 'archived_access_codes', 'schema_migration', 'audit_history_tableslist'] + table_large = ['transportation_accounting_codes'] # For each table for table in tables: @@ -101,6 +102,46 @@ def db_extractor(): if str(table_name) in table_dump_ignore: print("Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + str(table_name)) # If we have neither timestamp field, we do a full dump + elif str(table_name) in table_large: + # Handle these table differently + print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) + cursor.execute("SELECT * FROM " + str(table_name)) + chunk_size = 800000 + file_increment = 0 + filelist = [] + while True: + results = cursor.fetchmany(chunk_size) + if not results: + break + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + if write_to_s3 == True: + try: + s3 = boto3.client('s3') + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') + filelist.append("db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json") + file_increment += 1 + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + # #create single file + # combinedfile = [] + # #get contents of multifile due to memory + # fileliststring = ','.join(str(x) for x in filelist) + # print('Data Warehouse Lambda - INFO - DB Extract - fileliststring ' + fileliststring) + # # Read and append data from each file + # for file in filelist: + # print('Data Warehouse Lambda - INFO - DB Extract - file ' + str(file)) + # data = s3.get_object(Bucket=os.environ['bucket_name'], Key=file) + # print('Data Warehouse Lambda - INFO - DB Extract - data ' + str(data)) + # content = json.loads(data['Body'].read().decode("utf-8")) + # combinedfile.append(content) + # #finally write the new file + # s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=combinedfile, ServerSideEncryption='AES256') + #clean up partial files + #for file in filelist: + #s3.Object(os.environ['bucket_name'], file).delete() elif found_updated_at == False and found_created_at == False: print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) cursor.execute("SELECT * FROM " + str(table_name)) From 40cfd8748a35abf97507cc5d5e5beee9ef4855d7 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Mon, 28 Oct 2024 10:23:07 -0500 Subject: [PATCH 02/13] convert full to use fetch many --- src/db_extractor_full.py | 47 ++++++++++------------------------------ 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 888d85d..5a92163 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -74,7 +74,6 @@ def db_extractor(): # table_dump_ignore = ['django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] table_dump_ignore = ['zip3_distances', 'transportation_service_provider_performances','move' ,'move_to_gbloc' , 'archived_access_codes', 'schema_migration', 'audit_history_tableslist'] - table_large = ['transportation_accounting_codes'] # For each table for table in tables: @@ -101,14 +100,12 @@ def db_extractor(): # If we ignore the table, we do nothing if str(table_name) in table_dump_ignore: print("Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + str(table_name)) - # If we have neither timestamp field, we do a full dump - elif str(table_name) in table_large: - # Handle these table differently + # If we have neither timestamp field, we do a full dump + elif found_updated_at == False and found_created_at == False: print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) cursor.execute("SELECT * FROM " + str(table_name)) chunk_size = 800000 - file_increment = 0 - filelist = [] + file_increment = 1 while True: results = cursor.fetchmany(chunk_size) if not results: @@ -119,36 +116,16 @@ def db_extractor(): if write_to_s3 == True: try: s3 = boto3.client('s3') - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') - filelist.append("db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json") - file_increment += 1 - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + if file_increment == 1: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") + else: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + file_increment += 1 except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) - # #create single file - # combinedfile = [] - # #get contents of multifile due to memory - # fileliststring = ','.join(str(x) for x in filelist) - # print('Data Warehouse Lambda - INFO - DB Extract - fileliststring ' + fileliststring) - # # Read and append data from each file - # for file in filelist: - # print('Data Warehouse Lambda - INFO - DB Extract - file ' + str(file)) - # data = s3.get_object(Bucket=os.environ['bucket_name'], Key=file) - # print('Data Warehouse Lambda - INFO - DB Extract - data ' + str(data)) - # content = json.loads(data['Body'].read().decode("utf-8")) - # combinedfile.append(content) - # #finally write the new file - # s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=combinedfile, ServerSideEncryption='AES256') - #clean up partial files - #for file in filelist: - #s3.Object(os.environ['bucket_name'], file).delete() - elif found_updated_at == False and found_created_at == False: - print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) - cursor.execute("SELECT * FROM " + str(table_name)) - table_data = cursor.fetchall() - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + write_to_s3 = False # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: last_run_time = json_parameter_value['data']['lastRunTime'] From e339ffdef0d57b5d22cd5bd811e8a9b3ac9fb7c3 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Mon, 28 Oct 2024 10:40:35 -0500 Subject: [PATCH 03/13] add break in ex --- src/db_extractor_full.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 5a92163..4ba182d 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -125,6 +125,8 @@ def db_extractor(): file_increment += 1 except Exception as e: print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + write_to_s3 = False + break write_to_s3 = False # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: From 8fdaac34306f62b1ff2d07f70bbd1b963cf3f2e5 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Mon, 28 Oct 2024 15:24:37 -0500 Subject: [PATCH 04/13] cleanup for release --- src/db_extractor_full.py | 48 ++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 4ba182d..8dd0d0a 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -104,30 +104,30 @@ def db_extractor(): elif found_updated_at == False and found_created_at == False: print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) cursor.execute("SELECT * FROM " + str(table_name)) - chunk_size = 800000 - file_increment = 1 - while True: - results = cursor.fetchmany(chunk_size) - if not results: - break - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) - if write_to_s3 == True: - try: - s3 = boto3.client('s3') - if file_increment == 1: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") - else: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") - file_increment += 1 - except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) - write_to_s3 = False - break - write_to_s3 = False + chunk_size = 800000 + file_increment = 1 + while True: + results = cursor.fetchmany(chunk_size) + if not results: + break + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + if write_to_s3 == True: + try: + s3 = boto3.client('s3') + if file_increment == 1: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") + else: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + file_increment += 1 + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + write_to_s3 = False + break + write_to_s3 = False # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: last_run_time = json_parameter_value['data']['lastRunTime'] From 7a70255032628850177ab2b8493f41e51da69663 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Mon, 28 Oct 2024 15:47:03 -0500 Subject: [PATCH 05/13] cleanup for release --- src/db_extractor_full.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 8dd0d0a..1f6a5e8 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -107,26 +107,26 @@ def db_extractor(): chunk_size = 800000 file_increment = 1 while True: - results = cursor.fetchmany(chunk_size) - if not results: - break - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) - if write_to_s3 == True: - try: - s3 = boto3.client('s3') - if file_increment == 1: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") - else: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") - file_increment += 1 - except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) - write_to_s3 = False - break + results = cursor.fetchmany(chunk_size) + if not results: + break + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + if write_to_s3 == True: + try: + s3 = boto3.client('s3') + if file_increment == 1: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") + else: + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + file_increment += 1 + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + write_to_s3 = False + break write_to_s3 = False # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: From 54cc7b79f2ae140e10d9e91225b1004bed0a7628 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Tue, 29 Oct 2024 12:48:27 -0500 Subject: [PATCH 06/13] change condition --- src/db_extractor_full.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 1f6a5e8..202f4a9 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -74,6 +74,8 @@ def db_extractor(): # table_dump_ignore = ['django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] table_dump_ignore = ['zip3_distances', 'transportation_service_provider_performances','move' ,'move_to_gbloc' , 'archived_access_codes', 'schema_migration', 'audit_history_tableslist'] + #larger tables handle differently + table_size_large = ['audit_history','transportation_accounting_codes'] # For each table for table in tables: @@ -101,12 +103,12 @@ def db_extractor(): if str(table_name) in table_dump_ignore: print("Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + str(table_name)) # If we have neither timestamp field, we do a full dump - elif found_updated_at == False and found_created_at == False: + elif str(table_name) in table_size_large: print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) cursor.execute("SELECT * FROM " + str(table_name)) - chunk_size = 800000 - file_increment = 1 - while True: + chunk_size = 800000 + file_increment = 1 + while True: results = cursor.fetchmany(chunk_size) if not results: break @@ -118,16 +120,23 @@ def db_extractor(): s3 = boto3.client('s3') if file_increment == 1: s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") + print('Data Warehouse Lambda - INFO - DB Large Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") else: s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") + print('Data Warehouse Lambda - INFO - DB Large Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") file_increment += 1 except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + print("Data Warehouse Lambda - ERROR - DB Large Extract - Error writing to S3" + str(e)) write_to_s3 = False break - write_to_s3 = False + write_to_s3 = False + elif found_updated_at == False and found_created_at == False: + print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) + cursor.execute("SELECT * FROM " + str(table_name)) + table_data = cursor.fetchall() + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: last_run_time = json_parameter_value['data']['lastRunTime'] From 0b2b579582f9cae027e90a4ed3f5366921232481 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Wed, 30 Oct 2024 16:13:19 -0400 Subject: [PATCH 07/13] initial refactor for multi part uploading --- src/db_extractor_full.py | 311 +++++++++++++++++++++++++++------------ src/requirements.txt | 2 + 2 files changed, 215 insertions(+), 98 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 202f4a9..e112a98 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -1,12 +1,12 @@ import boto3 import pg8000 import json -import time +import io import db_conn import os from uuid import UUID import datetime - +import gc # Declare the global connection object to use during warm starting # to reuse connections that were established during a previous invocation. @@ -16,7 +16,11 @@ # know for the next run which time to select from current_run_time = datetime.datetime.now() -#Class to format json UUID's +# Define batch size fetching of records for json dumps +batch_size = 100000 + + +# Class to format json UUID's class UUIDEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, UUID): @@ -25,38 +29,150 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) +# Helper function to batch fetch data and use multipart uploading with S3 +def fetch_and_upload_cursor_results( + cursor: pg8000.Cursor, bucket_name, key_name, column_names +): + s3_client = boto3.client("s3") + # Initiate multipart upload + multipart_upload = s3_client.create_multipart_upload( + Bucket=bucket_name, + Key=key_name, + ServerSideEncryption="AES256", + ) + upload_id = multipart_upload["UploadId"] + parts = [] + part_number = 1 + buffer = io.BytesIO() + buffer_size = 0 + min_part_size = 5 * 1024 * 1024 # 5 MB + + try: + # Fetch the results of the cursor query + for batch in fetch_batches(cursor): + data_with_col_names = ( + map_row_to_columns(row, column_names) for row in batch + ) + for record in data_with_col_names: + json_line = json.dumps(record, cls=UUIDEncoder, default=str) + "\n" + json_line_bytes = json_line.encode("utf-8") + buffer.write(json_line_bytes) + buffer_size += len(json_line_bytes) + + # Upload part if buffer reaches the minimum part size of 5MB + if buffer_size >= min_part_size: + # This part is now finished being appended too + # Trigger upload, reset buffer, and proceed with the next one + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + part_number += 1 + buffer.close() + buffer = io.BytesIO() + buffer_size = 0 + + gc.collect() + + # Upload any remaining data in the buffer (Eg, the last batch finished with a part < 5MB) + if buffer_size > 0: + # Final part upload + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + buffer.close() + + if parts: + # Complete multipart upload + s3_client.complete_multipart_upload( + Bucket=bucket_name, + Key=key_name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + print( + f"Data Warehouse Lambda - INFO - DB Extract - Successfully wrote {bucket_name}/{key_name}" + ) + else: + # Abort the multipart upload if no parts were uploaded + s3_client.abort_multipart_upload( + Bucket=bucket_name, Key=key_name, UploadId=upload_id + ) + print( + f"Data Warehouse Lambda - INFO - No data to upload for table {key_name}" + ) + + except Exception as e: + # Abort multipart upload in case of failure + s3_client.abort_multipart_upload( + Bucket=bucket_name, Key=key_name, UploadId=upload_id + ) + print( + f"Data Warehouse Lambda - ERROR - DB Extract - Error during multipart upload: {e}" + ) + raise e + + +# Helper func to yield results rather than return +# to boost processing efficiency +def fetch_batches(cursor: pg8000.Cursor): + while True: + batch = cursor.fetchmany(batch_size) + if not batch: + # No more batch results + break + yield batch + + +def map_row_to_columns(row, column_names): + return {column_names[i]: row[i] for i in range(len(column_names))} + + def db_extractor(): # Use the get_parameter method of the SSM client to retrieve the parameter try: # Create an SSM client - ssm_client = boto3.client('ssm') + ssm_client = boto3.client("ssm") response = ssm_client.get_parameter( - Name=os.environ['parameter_name'], - WithDecryption=True + Name=os.environ["parameter_name"], WithDecryption=True ) # The value of the parameter is stored in the 'Value' field of the response - parameter_value = response['Parameter']['Value'] + parameter_value = response["Parameter"]["Value"] json_parameter_value = json.loads(parameter_value) except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Failed to retrieve values from SSM" + str(e)) - + print( + "Data Warehouse Lambda - ERROR - DB Extract - Failed to retrieve values from SSM" + + str(e) + ) global connection try: if connection is None: connection = db_conn.get_connection() if connection is None: - print("Data Warehouse Lambda - ERROR - DB Extract - Failed to connect to database") + print( + "Data Warehouse Lambda - ERROR - DB Extract - Failed to connect to database" + ) + return # Instantiate the cursor object - cursor = connection.cursor() - - # Initialize an empty dictionary to store the data - data = {} + cursor = connection.cursor() # type: ignore (Type none is handled) # Get a list of all tables in the database - cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") - #tmp_tables = cursor.fetchall() + cursor.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" + ) tables_tmp = cursor.fetchall() # Sanitize the table name's @@ -64,127 +180,126 @@ def db_extractor(): for tmp_table_name in tables_tmp: tmp_table_name = tmp_table_name[0] # remove any special characters or whitespaces - tmp_table_name = ''.join(e for e in tmp_table_name if e.isalnum() or e == '_') + tmp_table_name = "".join( + e for e in tmp_table_name if e.isalnum() or e == "_" + ) # make sure the table name is lowercase tmp_table_name = tmp_table_name.lower() # add the sanitized table name to the list tables_list.append(tmp_table_name) tables = tuple(tables_list) + # List tables that should be excluded from the dump + table_dump_ignore = [ + "zip3_distances", + "transportation_service_provider_performances", + "move", + "move_to_gbloc", + "archived_access_codes", + "schema_migration", + "audit_history_tableslist", + ] -# table_dump_ignore = ['django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] - table_dump_ignore = ['zip3_distances', 'transportation_service_provider_performances','move' ,'move_to_gbloc' , 'archived_access_codes', 'schema_migration', 'audit_history_tableslist'] - #larger tables handle differently - table_size_large = ['audit_history','transportation_accounting_codes'] + # S3 bucket upload location + bucket_name = os.environ["bucket_name"] - # For each table + # Loop over each table and dump its data. Handling initial dump and incremental dumps on reruns for table in tables: + s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" + table_name = table # get a list of the table's columns - cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", (table_name,)) + cursor.execute( + "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", + (table_name,), + ) column_names = list(cursor.fetchall()) # Determine whether we have the timestamp fields created_at and updated_at found_created_at = False found_updated_at = False for column in column_names: - if 'updated_at' in column: + if "updated_at" in column: found_updated_at = True - if 'created_at' in column: + if "created_at" in column: found_created_at = True - # By default we will want to write our data to S3, on error we will skip this - write_to_s3 = True - # Set the statement timeout to 600 seconds for this session cursor.execute("SET statement_timeout = '600s'") # If we ignore the table, we do nothing if str(table_name) in table_dump_ignore: - print("Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + str(table_name)) - # If we have neither timestamp field, we do a full dump - elif str(table_name) in table_size_large: - print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) - cursor.execute("SELECT * FROM " + str(table_name)) - chunk_size = 800000 - file_increment = 1 - while True: - results = cursor.fetchmany(chunk_size) - if not results: - break - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in results] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) - if write_to_s3 == True: - try: - s3 = boto3.client('s3') - if file_increment == 1: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Large Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") - else: - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + "_" + str(file_increment) + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Large Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + "_" + str(file_increment) + ".json") - file_increment += 1 - except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Large Extract - Error writing to S3" + str(e)) - write_to_s3 = False - break - write_to_s3 = False + print( + "Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + + str(table_name) + ) + # Handle if the table being iterated on does not have updated_at or created_at + # Since we do not have timestamps to compare to, we must full dump the table without updated or created at elif found_updated_at == False and found_created_at == False: - print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) + print( + "Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + + str(table_name) + ) + # Tell the database to execute this query, we will ingest it in chunks cursor.execute("SELECT * FROM " + str(table_name)) - table_data = cursor.fetchall() - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + # Fetch cursor results and upload to S3 + fetch_and_upload_cursor_results( + cursor, bucket_name, s3_key, column_names + ) # If we have created_at but no updated_at, we dump based only on created_at elif found_updated_at == False and found_created_at == True: - last_run_time = json_parameter_value['data']['lastRunTime'] - cursor.execute("SELECT * FROM " + str(table_name) + " where (created_at > '" + str(last_run_time) + "') order by created_at;") - table_data = cursor.fetchall() - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + last_run_time = json_parameter_value["data"]["lastRunTime"] + cursor.execute( + "SELECT * FROM " + + str(table_name) + + " where (created_at > '" + + str(last_run_time) + + "') order by created_at;" + ) + fetch_and_upload_cursor_results( + cursor, bucket_name, s3_key, column_names + ) # If we have created_at and updated_at, we dump based on both elif found_updated_at == True and found_created_at == True: - last_run_time = json_parameter_value['data']['lastRunTime'] - cursor.execute("SELECT * FROM " + str(table_name) + " where ((created_at > %s) OR (updated_at > %s)) order by created_at;", (last_run_time, last_run_time)) - table_data = cursor.fetchall() - column_names = [desc[0] for desc in cursor.description] - data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] - json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + last_run_time = json_parameter_value["data"]["lastRunTime"] + cursor.execute( + "SELECT * FROM " + + str(table_name) + + " where ((created_at > %s) OR (updated_at > %s)) order by created_at;", + (last_run_time, last_run_time), + ) + fetch_and_upload_cursor_results( + cursor, bucket_name, s3_key, column_names + ) else: - print('Data Warehouse Lambda - ERROR - DB Extract - ' + str(table_name) + ' does not match any criteria for data warehousing') - write_to_s3 = False - - # Write the data to S3 - if write_to_s3 == True: - try: - s3 = boto3.client('s3') - s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') - print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") - except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) - - # Create an SSM client + print( + "Data Warehouse Lambda - ERROR - DB Extract - " + + str(table_name) + + " does not match any criteria for data warehousing" + ) + + # Create an SSM client try: - ssm_client = boto3.client('ssm') - json_parameter_value['data']['serialNumber'] += 1 - json_parameter_value['data']['lastRunTime'] = str(current_run_time) + ssm_client = boto3.client("ssm") + json_parameter_value["data"]["serialNumber"] += 1 + json_parameter_value["data"]["lastRunTime"] = str(current_run_time) ssm_client.put_parameter( - Name=os.environ['parameter_name'], - Value=json.dumps(json_parameter_value), - Type='SecureString', - Overwrite=True - ) + Name=os.environ["parameter_name"], + Value=json.dumps(json_parameter_value), + Type="SecureString", + Overwrite=True, + ) - #print("Data Warehouse Lambda - INFO - DB Extract - Updated tracking in SSM") + print("Data Warehouse Lambda - INFO - DB Extract - Updated tracking in SSM") except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to SSM" + str(e)) + print( + "Data Warehouse Lambda - ERROR - DB Extract - Error writing to SSM" + + str(e) + ) except Exception as e: try: - connection.close() + connection.close() # type: ignore (Type none is handled) except Exception as e: connection = None print("Data Warehouse Lambda - ERROR - DB Extract - Failed due to :" + str(e)) diff --git a/src/requirements.txt b/src/requirements.txt index 8f5d900..8baffd2 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -2,3 +2,5 @@ pg8000==1.29.4 scramp>=1.4.3 six>=1.5 asn1crypto==1.5.1 +boto3>=1.35.51 +boto3-stubs \ No newline at end of file From a28ca554bb44ed3e67ee285386caf01b97071702 Mon Sep 17 00:00:00 2001 From: josiahzimmerman-caci Date: Thu, 31 Oct 2024 12:01:15 -0500 Subject: [PATCH 08/13] move table_name earlier --- src/db_extractor_full.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index e112a98..7eed1aa 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -205,9 +205,9 @@ def db_extractor(): # Loop over each table and dump its data. Handling initial dump and incremental dumps on reruns for table in tables: - s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" - table_name = table + + s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" # get a list of the table's columns cursor.execute( "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", From 271b934cea45c2a8b41a528aa66bc7808e3171e0 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Thu, 31 Oct 2024 13:53:21 -0400 Subject: [PATCH 09/13] convert tuple list to string list --- src/db_extractor_full.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index e112a98..75e2963 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -213,7 +213,7 @@ def db_extractor(): "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", (table_name,), ) - column_names = list(cursor.fetchall()) + column_names = [column[0] for column in cursor.fetchall()] # Determine whether we have the timestamp fields created_at and updated_at found_created_at = False From 99328f085f195c5b853765334788cac217849f91 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Fri, 1 Nov 2024 11:26:12 -0400 Subject: [PATCH 10/13] initial introduction of multithreading --- src/db_extractor_full.py | 217 +++++++++++++++++++++++++-------------- 1 file changed, 140 insertions(+), 77 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 8347d08..7f55985 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -1,3 +1,5 @@ +import multiprocessing +from multiprocessing.connection import Connection as multi_processing_connection import boto3 import pg8000 import json @@ -7,10 +9,8 @@ from uuid import UUID import datetime import gc - -# Declare the global connection object to use during warm starting -# to reuse connections that were established during a previous invocation. -connection = None +from multiprocessing import Process, Pipe +from typing import List, Tuple # Storing current time, we will use this to update SSM when finished so that we # know for the next run which time to select from @@ -19,6 +19,11 @@ # Define batch size fetching of records for json dumps batch_size = 100000 +# Lambda global connection for warm starts +# This connection is only used to grab the table names +# When multiprocessing, the child processes each create their own +# connection +connection = None # Class to format json UUID's class UUIDEncoder(json.JSONEncoder): @@ -139,6 +144,93 @@ def map_row_to_columns(row, column_names): return {column_names[i]: row[i] for i in range(len(column_names))} +def table_extractor( + table_name, + json_parameter_value, + bucket_name, + child_conn: multi_processing_connection, +): + try: + connection = db_conn.get_connection() + if connection is None: + error_msg = ( + f"Failed to connect to database during child process for {table_name}" + ) + print(f"Data Warehouse Lambda - ERROR - DB Extract - {error_msg}") + child_conn.send(error_msg) + return + print( + f"Data Warehouse Lambda - INFO - Child extraction process created for table {table_name}" + ) + s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" + cursor = connection.cursor() # type: ignore + cursor.execute( + "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", + (table_name,), + ) + column_names = [column[0] for column in cursor.fetchall()] + + # Determine whether we have the timestamp fields created_at and updated_at + found_created_at = False + found_updated_at = False + for column in column_names: + if "updated_at" in column: + found_updated_at = True + if "created_at" in column: + found_created_at = True + + # Set the statement timeout to 600 seconds for this session + cursor.execute("SET statement_timeout = '600s'") + + # Handle if the table being iterated on does not have updated_at or created_at + # Since we do not have timestamps to compare to, we must full dump the table without updated or created at + if found_updated_at == False and found_created_at == False: + print( + "Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + + str(table_name) + ) + # Tell the database to execute this query, we will ingest it in chunks + cursor.execute("SELECT * FROM " + str(table_name)) + # Fetch cursor results and upload to S3 + fetch_and_upload_cursor_results(cursor, bucket_name, s3_key, column_names) + # If we have created_at but no updated_at, we dump based only on created_at + elif found_updated_at == False and found_created_at == True: + last_run_time = json_parameter_value["data"]["lastRunTime"] + cursor.execute( + "SELECT * FROM " + + str(table_name) + + " where (created_at > '" + + str(last_run_time) + + "') order by created_at;" + ) + fetch_and_upload_cursor_results(cursor, bucket_name, s3_key, column_names) + # If we have created_at and updated_at, we dump based on both + elif found_updated_at == True and found_created_at == True: + last_run_time = json_parameter_value["data"]["lastRunTime"] + cursor.execute( + "SELECT * FROM " + + str(table_name) + + " where ((created_at > %s) OR (updated_at > %s)) order by created_at;", + (last_run_time, last_run_time), + ) + fetch_and_upload_cursor_results(cursor, bucket_name, s3_key, column_names) + else: + print( + "Data Warehouse Lambda - ERROR - DB Extract - " + + str(table_name) + + " does not match any criteria for data warehousing" + ) + + # Main child process complete + child_conn.send(f"Successfully processed {table_name}") + except Exception as e: + error_msg = f"Error processing {table_name}: {e}" + print(f"Data Warehouse Lambda - ERROR - DB Extract - {error_msg}") + child_conn.send(error_msg) + finally: + child_conn.close() + + def db_extractor(): # Use the get_parameter method of the SSM client to retrieve the parameter try: @@ -203,80 +295,51 @@ def db_extractor(): # S3 bucket upload location bucket_name = os.environ["bucket_name"] - # Loop over each table and dump its data. Handling initial dump and incremental dumps on reruns - for table in tables: - table_name = table - - s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" - # get a list of the table's columns - cursor.execute( - "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", - (table_name,), - ) - column_names = [column[0] for column in cursor.fetchall()] - - # Determine whether we have the timestamp fields created_at and updated_at - found_created_at = False - found_updated_at = False - for column in column_names: - if "updated_at" in column: - found_updated_at = True - if "created_at" in column: - found_created_at = True - - # Set the statement timeout to 600 seconds for this session - cursor.execute("SET statement_timeout = '600s'") - - # If we ignore the table, we do nothing - if str(table_name) in table_dump_ignore: - print( - "Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " - + str(table_name) - ) - # Handle if the table being iterated on does not have updated_at or created_at - # Since we do not have timestamps to compare to, we must full dump the table without updated or created at - elif found_updated_at == False and found_created_at == False: - print( - "Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " - + str(table_name) - ) - # Tell the database to execute this query, we will ingest it in chunks - cursor.execute("SELECT * FROM " + str(table_name)) - # Fetch cursor results and upload to S3 - fetch_and_upload_cursor_results( - cursor, bucket_name, s3_key, column_names - ) - # If we have created_at but no updated_at, we dump based only on created_at - elif found_updated_at == False and found_created_at == True: - last_run_time = json_parameter_value["data"]["lastRunTime"] - cursor.execute( - "SELECT * FROM " - + str(table_name) - + " where (created_at > '" - + str(last_run_time) - + "') order by created_at;" - ) - fetch_and_upload_cursor_results( - cursor, bucket_name, s3_key, column_names - ) - # If we have created_at and updated_at, we dump based on both - elif found_updated_at == True and found_created_at == True: - last_run_time = json_parameter_value["data"]["lastRunTime"] - cursor.execute( - "SELECT * FROM " - + str(table_name) - + " where ((created_at > %s) OR (updated_at > %s)) order by created_at;", - (last_run_time, last_run_time), - ) - fetch_and_upload_cursor_results( - cursor, bucket_name, s3_key, column_names - ) - else: - print( - "Data Warehouse Lambda - ERROR - DB Extract - " - + str(table_name) - + " does not match any criteria for data warehousing" + # Filter out ignored tables + tables = [table for table in tables_list if table not in table_dump_ignore] + + # Create multi processes for the number of tables we have + # for the number of processors we have + cursor.close() # Close the current cursor, we are done with it. Child processes make new ones + + # Process each table in a separate Process with Pipe + max_processes = multiprocessing.cpu_count() # Retrieve processor limit + table_index = iter(tables) + processes: List[Tuple[Process, multi_processing_connection]] = ( + [] + ) # Holds the individual processes in tuples with the parent pipes + + while True: + while len(processes) < max_processes: + try: + table_name = next(table_index) + except StopIteration: + # This exception is thrown when the next function can't find anything + break + parent_conn, child_conn = Pipe() + process = Process( + target=table_extractor, + args=(table_name, json_parameter_value, bucket_name, child_conn), ) + processes.append((process, parent_conn)) + process.start() + if not processes: + # No remaining processes are active + break + + # Manage processes concurrently without sequential waiting + for i in range(len(processes) - 1, -1, -1): + process, pipe = processes[i] + if not process.is_alive(): + # Process has finished, poll it and receive its message + if pipe.poll(): + message = pipe.recv() + print(f"Data Warehouse Lambda - INFO - {message}") + pipe.close() + process.join() + processes.pop( + i + ) # Remove the finished process to free up processor capacity # Create an SSM client try: From 69e554c7a1fb2d053557e9db3529d48c29187b51 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Fri, 1 Nov 2024 12:13:30 -0400 Subject: [PATCH 11/13] json formatting for multithreading --- src/db_extractor_full.py | 49 ++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 7f55985..33d56e8 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -51,15 +51,28 @@ def fetch_and_upload_cursor_results( buffer = io.BytesIO() buffer_size = 0 min_part_size = 5 * 1024 * 1024 # 5 MB + first_record = True # Track the first record for JSON formatting + try: + # Write the opening bracket for the JSON array + buffer.write(b'[') + buffer_size += 1 # Fetch the results of the cursor query for batch in fetch_batches(cursor): data_with_col_names = ( map_row_to_columns(row, column_names) for row in batch ) for record in data_with_col_names: - json_line = json.dumps(record, cls=UUIDEncoder, default=str) + "\n" + if not first_record: + # Add comma before each record except the first + buffer.write(b',') + buffer_size += 1 + else: + # Set to False after processing the first record + first_record = False + + json_line = json.dumps(record, cls=UUIDEncoder, default=str) json_line_bytes = json_line.encode("utf-8") buffer.write(json_line_bytes) buffer_size += len(json_line_bytes) @@ -84,19 +97,27 @@ def fetch_and_upload_cursor_results( gc.collect() - # Upload any remaining data in the buffer (Eg, the last batch finished with a part < 5MB) - if buffer_size > 0: - # Final part upload - buffer.seek(0) - response = s3_client.upload_part( - Bucket=bucket_name, - Key=key_name, - PartNumber=part_number, - UploadId=upload_id, - Body=buffer, - ) - parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) - buffer.close() + if first_record: + # No records were written, make a new buffer and write an empty array + buffer = io.BytesIO() + buffer.write(b'[]') + buffer_size = 2 + else: + # Write closing bracket to close the JSON array + buffer.write(b']') + buffer_size += 1 + + # Upload the final part (Or only part if no records) + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + buffer.close() if parts: # Complete multipart upload From b30daf2b8b3322bc4d11531fd680996ff15a0a1d Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Fri, 1 Nov 2024 12:13:48 -0400 Subject: [PATCH 12/13] formatting --- src/db_extractor_full.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 33d56e8..12ada34 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -25,6 +25,7 @@ # connection connection = None + # Class to format json UUID's class UUIDEncoder(json.JSONEncoder): def default(self, obj): @@ -51,12 +52,11 @@ def fetch_and_upload_cursor_results( buffer = io.BytesIO() buffer_size = 0 min_part_size = 5 * 1024 * 1024 # 5 MB - first_record = True # Track the first record for JSON formatting - + first_record = True # Track the first record for JSON formatting try: # Write the opening bracket for the JSON array - buffer.write(b'[') + buffer.write(b"[") buffer_size += 1 # Fetch the results of the cursor query for batch in fetch_batches(cursor): @@ -66,12 +66,12 @@ def fetch_and_upload_cursor_results( for record in data_with_col_names: if not first_record: # Add comma before each record except the first - buffer.write(b',') + buffer.write(b",") buffer_size += 1 else: # Set to False after processing the first record - first_record = False - + first_record = False + json_line = json.dumps(record, cls=UUIDEncoder, default=str) json_line_bytes = json_line.encode("utf-8") buffer.write(json_line_bytes) @@ -100,11 +100,11 @@ def fetch_and_upload_cursor_results( if first_record: # No records were written, make a new buffer and write an empty array buffer = io.BytesIO() - buffer.write(b'[]') + buffer.write(b"[]") buffer_size = 2 else: # Write closing bracket to close the JSON array - buffer.write(b']') + buffer.write(b"]") buffer_size += 1 # Upload the final part (Or only part if no records) From 69dfe861226b0a5318a8f44fa95f103809a278c1 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Fri, 1 Nov 2024 13:13:39 -0400 Subject: [PATCH 13/13] reduce upload bottleneck --- src/db_extractor_full.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 12ada34..f0aaf6c 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -17,7 +17,7 @@ current_run_time = datetime.datetime.now() # Define batch size fetching of records for json dumps -batch_size = 100000 +batch_size = 500000 # Lambda global connection for warm starts # This connection is only used to grab the table names @@ -51,7 +51,7 @@ def fetch_and_upload_cursor_results( part_number = 1 buffer = io.BytesIO() buffer_size = 0 - min_part_size = 5 * 1024 * 1024 # 5 MB + min_part_size = 50 * 1024 * 1024 # 50 MB first_record = True # Track the first record for JSON formatting try: