From e09b70c58a35431937b00ce8db6ddf801019861d Mon Sep 17 00:00:00 2001 From: Jesse T Date: Mon, 23 Jan 2023 08:57:16 -0500 Subject: [PATCH] Refactoring Lambda to support full table by table dump --- src/db_conn.py | 2 + src/db_extractor_full.py | 144 ++++++++++++++++++++ src/{db_dump.py => db_extractor_history.py} | 0 src/db_schema_dump.py | 2 +- src/lambda_function.py | 21 +-- 5 files changed, 160 insertions(+), 9 deletions(-) create mode 100644 src/db_extractor_full.py rename src/{db_dump.py => db_extractor_history.py} (100%) diff --git a/src/db_conn.py b/src/db_conn.py index 99d3c71..bd70cc3 100644 --- a/src/db_conn.py +++ b/src/db_conn.py @@ -23,6 +23,8 @@ def get_connection(): database=os.environ['db_name'], password=password, ssl_context=True, + timeout=900, + tcp_keepalive=True, ) return conn except Exception as e: diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py new file mode 100644 index 0000000..1150b4f --- /dev/null +++ b/src/db_extractor_full.py @@ -0,0 +1,144 @@ +import boto3 +import pg8000 +import json +import time +import db_conn +import os +from uuid import UUID +import datetime + + +# Declare the global connection object to use during warm starting +# to reuse connections that were established during a previous invocation. +connection = None + +# Storing current time, we will use this to update SSM when finished so that we +# know for the next run which time to select from +current_run_time = datetime.datetime.now() + +#Class to format json UUID's +class UUIDEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, UUID): + # if the obj is uuid, we simply return the value of uuid + return obj.hex + return json.JSONEncoder.default(self, obj) + + +def db_extractor(): + # Use the get_parameter method of the SSM client to retrieve the parameter + try: + # Create an SSM client + ssm_client = boto3.client('ssm') + response = ssm_client.get_parameter( + Name=os.environ['parameter_name'], + WithDecryption=True + ) + # The value of the parameter is stored in the 'Value' field of the response + parameter_value = response['Parameter']['Value'] + json_parameter_value = json.loads(parameter_value) + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Failed to retrieve values from SSM" + str(e)) + + + global connection + try: + if connection is None: + connection = db_conn.get_connection() + if connection is None: + print("Data Warehouse Lambda - ERROR - DB Extract - Failed to connect to database") + + # Instantiate the cursor object + cursor = connection.cursor() + + # Initialize an empty dictionary to store the data + data = {} + + # Get a list of all tables in the database + cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") + tables = cursor.fetchall() + +# table_dump_ignore = ['django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] + table_dump_ignore = ['zip3_distances','transportation_service_provider_performances','django_migrations', 'audit_history', 'archived_access_codes', 'schema_migration', 'audit_history_tableslist', 'awsdms_ddl_audit'] + + # For each table + for table in tables: + table_name = table[0] + # get a list of the table's columns + cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name=%s", (table_name,)) + column_names = list(cursor.fetchall()) + + # Determine whether we have the timestamp fields created_at and updated_at + found_created_at = False + found_updated_at = False + for column in column_names: + if 'updated_at' in column: + found_updated_at = True + if 'created_at' in column: + found_created_at = True + + # By default we will want to write our data to S3, on error we will skip this + write_to_s3 = True + + # If we ignore the table, we do nothing + if str(table_name) in table_dump_ignore: + print("Data Warehouse Lambda - INFO - DB Extract - Didn't extract data for table(ignore list): " + str(table_name)) + # If we have neither timestamp field, we do a full dump + elif found_updated_at == False and found_created_at == False: + print("Data Warehouse Lambda - INFO - DB Extract - Performing full dump on " + str(table_name)) + cursor.execute("SELECT * FROM " + str(table_name)) + table_data = cursor.fetchall() + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder) + # If we have created_at but no updated_at, we dump based only on created_at + elif found_updated_at == False and found_created_at == True: + last_run_time = json_parameter_value['data']['lastRunTime'] + cursor.execute("SELECT * FROM " + str(table_name) + " where (created_at > '" + str(last_run_time) + "') order by created_at;") + table_data = cursor.fetchall() + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + # If we have created_at and updated_at, we dump based on both + elif found_updated_at == True and found_created_at == True: + last_run_time = json_parameter_value['data']['lastRunTime'] + cursor.execute("SELECT * FROM " + str(table_name) + " where ((created_at > %s) OR (updated_at > %s)) order by created_at;", (last_run_time, last_run_time)) + table_data = cursor.fetchall() + column_names = [desc[0] for desc in cursor.description] + data_with_col_names = [{column_names[i]: row[i] for i in range(len(column_names))} for row in table_data] + json_data = json.dumps(data_with_col_names, cls=UUIDEncoder, default=str) + else: + print('Data Warehouse Lambda - ERROR - DB Extract - ' + str(table_name) + ' does not match any criteria for data warehousing') + write_to_s3 = False + + # Write the data to S3 + if write_to_s3 == True: + try: + s3 = boto3.client('s3') + s3.put_object(Bucket=os.environ['bucket_name'], Key="db_data" + "/" + str(json_parameter_value['data']['serialNumber'] + 1).zfill(6) + "/" + table_name + ".json", Body=json_data, ServerSideEncryption='AES256') + print('Data Warehouse Lambda - INFO - DB Extract - Successfully wrote ' + os.environ['bucket_name'] + "/" + "db_data/" + "/"+str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)+"/" + table_name + ".json") + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to S3" + str(e)) + + # Create an SSM client + try: + ssm_client = boto3.client('ssm') + json_parameter_value['data']['serialNumber'] += 1 + json_parameter_value['data']['lastRunTime'] = str(current_run_time) + ssm_client.put_parameter( + Name=os.environ['parameter_name'], + Value=json.dumps(json_parameter_value), + Type='SecureString', + Overwrite=True + ) + + #print("Data Warehouse Lambda - INFO - DB Extract - Updated tracking in SSM") + except Exception as e: + print("Data Warehouse Lambda - ERROR - DB Extract - Error writing to SSM" + str(e)) + + except Exception as e: + try: + connection.close() + except Exception as e: + connection = None + print("Data Warehouse Lambda - ERROR - DB Extract - Failed due to :" + str(e)) diff --git a/src/db_dump.py b/src/db_extractor_history.py similarity index 100% rename from src/db_dump.py rename to src/db_extractor_history.py diff --git a/src/db_schema_dump.py b/src/db_schema_dump.py index ff9468c..d206770 100644 --- a/src/db_schema_dump.py +++ b/src/db_schema_dump.py @@ -50,7 +50,7 @@ def db_schema_dump(): # For each table, get a list of its columns for table in tables: table_name = table[0] - cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name = %s", (table_name,)) + cursor.execute("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s", (table_name,)) columns = cursor.fetchall() data[table_name] = columns diff --git a/src/lambda_function.py b/src/lambda_function.py index 863fa45..74f9797 100644 --- a/src/lambda_function.py +++ b/src/lambda_function.py @@ -1,25 +1,30 @@ import db_schema_dump -import db_dump +import db_extractor_full +#import db_extractor_history def lambda_handler(event, context): print("Data Warehouse Lambda - INFO - Export starting") - - # DB Schema export + # DB Schema Export try: db_schema_dump.db_schema_dump() except Exception as e: print("Data Warehouse Lambda - ERROR - DB Schema - Failure: " + str(e)) - # DB Data export + # DB Extractor - Full try: - db_dump.db_data_dump() + db_extractor_full.db_extractor() except Exception as e: - print("Data Warehouse Lambda - ERROR - DB Data - Failure:" + str(e)) - + print("Data Warehouse Lambda - ERROR - DB Extractor - Full - Failure: " + str(e)) print("Data Warehouse Lambda - INFO - Export finished") - + # DB Extractor - History Table +# try: +# db_extractor_history.db_extractor() +# except Exception as e: +# print("Data Warehouse Lambda - ERROR - DB Extractor - History - Failure: " + str(e)) +# +# print("Data Warehouse Lambda - INFO - Export finished")