From 593c25fcb3aec8767c722198acf294c22243b15d Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Fri, 14 Mar 2025 14:45:27 -0400 Subject: [PATCH 1/5] swap if statement for readability --- src/db_extractor_full.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 9f11a98..f731df6 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -64,14 +64,13 @@ def fetch_and_upload_cursor_results( map_row_to_columns(row, column_names) for row in batch ) for record in data_with_col_names: - if not first_record: + if first_record: + # Set to False after processing the first record + first_record = False + else: # Add comma before each record except the first buffer.write(b",") buffer_size += 1 - else: - # Set to False after processing the first record - first_record = False - json_line = json.dumps(record, cls=UUIDEncoder, default=str) json_line_bytes = json_line.encode("utf-8") buffer.write(json_line_bytes) From 5434faa883f5a2e59a1b6f44a7532574db81dcde Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Mon, 17 Mar 2025 15:38:51 -0400 Subject: [PATCH 2/5] Initial implementation of table extraction parallel processing support. WARNING: This commit currently thread locks and is not 1-2 processor friendly --- src/db_extractor_full.py | 229 +++++++++++++++++++++++++-------------- 1 file changed, 145 insertions(+), 84 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index f731df6..309ce4e 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -17,7 +17,7 @@ current_run_time = datetime.datetime.now() # Define batch size fetching of records for json dumps -batch_size = 500000 +batch_size = 100000 # Lambda global connection for warm starts # This connection is only used to grab the table names @@ -25,6 +25,7 @@ # connection connection = None +max_processes = multiprocessing.cpu_count() # Processor limit # Class to format json UUID's class UUIDEncoder(json.JSONEncoder): @@ -34,6 +35,25 @@ def default(self, obj): return obj.hex return json.JSONEncoder.default(self, obj) +def parallel_worker(worker_conn, batch, column_names): + # Handle process worker -> back to json mapping and + # respond it back via the pipe. We'll convert a batch + # of rows into JSON and then close the pipe with our response + try: + fragment = convert_batch_to_json(batch, column_names) + worker_conn.send(("fragment", fragment)) + except Exception as e: + worker_conn.send(("error", f"ERROR: {e}")) + finally: + worker_conn.close() + +def convert_batch_to_json(batch, column_names): + # Convert a batch of rows to comma delimited JSON fragments (No start/end brackets) + records = [] + for row in batch: + as_dict = map_row_to_columns(row, column_names) + records.append(json.dumps(as_dict, cls=UUIDEncoder, default=str)) + return ",".join(records).encode("utf-8") # Return as bytes # Helper function to batch fetch data and use multipart uploading with S3 def fetch_and_upload_cursor_results( @@ -52,74 +72,25 @@ def fetch_and_upload_cursor_results( buffer = io.BytesIO() buffer_size = 0 min_part_size = 50 * 1024 * 1024 # 50 MB - first_record = True # Track the first record for JSON formatting - - try: - # Write the opening bracket for the JSON array - buffer.write(b"[") - buffer_size += 1 - # Fetch the results of the cursor query - for batch in fetch_batches(cursor): - data_with_col_names = ( - map_row_to_columns(row, column_names) for row in batch - ) - for record in data_with_col_names: - if first_record: - # Set to False after processing the first record - first_record = False - else: - # Add comma before each record except the first - buffer.write(b",") - buffer_size += 1 - json_line = json.dumps(record, cls=UUIDEncoder, default=str) - json_line_bytes = json_line.encode("utf-8") - buffer.write(json_line_bytes) - buffer_size += len(json_line_bytes) - - # Upload part if buffer reaches the minimum part size of 5MB - if buffer_size >= min_part_size: - # This part is now finished being appended too - # Trigger upload, reset buffer, and proceed with the next one - buffer.seek(0) - response = s3_client.upload_part( - Bucket=bucket_name, - Key=key_name, - PartNumber=part_number, - UploadId=upload_id, - Body=buffer, - ) - parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) - part_number += 1 - buffer.close() - buffer = io.BytesIO() - buffer_size = 0 - - gc.collect() - - if first_record: - # No records were written, make a new buffer and write an empty array - buffer = io.BytesIO() + + batches = list(fetch_batches(cursor)) + total_batches = len(batches) + print(f"Data Warehouse Lambda - INFO - {total_batches} batches found for {key_name}") + + # Handle case of no records to convert to JSON + if total_batches == 0: + # No records, make a new buffer and write an empty array for the dump + try: buffer.write(b"[]") - buffer_size = 2 - else: - # Write closing bracket to close the JSON array - buffer.write(b"]") - buffer_size += 1 - - # Upload the final part (Or only part if no records) - buffer.seek(0) - response = s3_client.upload_part( - Bucket=bucket_name, - Key=key_name, - PartNumber=part_number, - UploadId=upload_id, - Body=buffer, - ) - parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) - buffer.close() - - if parts: - # Complete multipart upload + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) s3_client.complete_multipart_upload( Bucket=bucket_name, Key=key_name, @@ -129,25 +100,110 @@ def fetch_and_upload_cursor_results( print( f"Data Warehouse Lambda - INFO - DB Extract - Successfully wrote {bucket_name}/{key_name}" ) - else: - # Abort the multipart upload if no parts were uploaded + except Exception as e: + # Abort multipart upload in case of failure s3_client.abort_multipart_upload( Bucket=bucket_name, Key=key_name, UploadId=upload_id ) print( - f"Data Warehouse Lambda - INFO - No data to upload for table {key_name}" + f"Data Warehouse Lambda - ERROR - DB Extract - Error during multipart upload: {e}" ) + raise e + # Early return + return + + # Create a manager -> worker process for each batch in parallel + # This makes a manager and worker combo for every batch that we will await later + workers: List[Tuple[Process, multi_processing_connection, int]] = [] + for i, batch in enumerate(batches): + manager, worker = Pipe() + p = Process(target=parallel_worker, args=(worker, batch, column_names)) + workers.append((p, manager, i)) + p.start() + + # + # Start our JSON output document + # + # Begin with our bracket as we are + # going to format this manually + buffer.write(b"[") + buffer_size += 1 + + # Concurrently wait for the workers to be complete + while workers: + for i in range(len(workers) - 1, -1, -1): # Backwards loop because we pop list entries + process, manager, batch_index = workers[i] + # Loop over the workers infinitely until we are out of workers + # or until we find a worker whose process has completed + if not process.is_alive(): + print( + f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} worker for batch index {batch_index} complete, polling it now. {len(workers)} workers remain" + ) + is_last_worker = len(workers) == 1 + # Process completed and worker has a letter for us + if manager.poll(): + # Have the manager receive the worker's data + msg_type, data = manager.recv() + if msg_type == "error": + print(f"Data Warehouse Lambda - ERROR - Worker {batch_index} failed: {data}") + raise RuntimeError(f"Worker {batch_index} error: {data}") + print( + f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} worker for batch index {batch_index} polled successfully. {len(workers)} workers remain" + ) + # Add data fragment to buffer + buffer.write(data) + buffer_size += len(data) + + if not is_last_worker: + # If it's not the last worker, append `,` to support a final + # [${worker_1_json}, ${worker_2_json}, ${worker_3_json}] output + buffer.write(b",") + buffer_size += 1 + + # Upload part if buffer reaches the minimum part size or + # if this is the final worker + if buffer_size >= min_part_size or is_last_worker: + if is_last_worker: + # Last call, close it up! + buffer.write(b"]") + buffer_size += 1 + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + part_number += 1 + buffer.close() + buffer = io.BytesIO() + buffer_size = 0 + + gc.collect() + manager.close() # Free thread + process.join() # Make sure worker is done + workers.pop(i) # Clear the worker + + + print( + f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} workers complete, announcing multipart completion" + ) - except Exception as e: - # Abort multipart upload in case of failure - s3_client.abort_multipart_upload( - Bucket=bucket_name, Key=key_name, UploadId=upload_id - ) - print( - f"Data Warehouse Lambda - ERROR - DB Extract - Error during multipart upload: {e}" - ) - raise e - + # Workers have completed and there is nothing left to upload to the multi-part + buffer.close() + del buffer + # Announce multipart upload completion + s3_client.complete_multipart_upload( + Bucket=bucket_name, + Key=key_name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, + ) + print( + f"Data Warehouse Lambda - INFO - DB Extract - Successfully wrote {bucket_name}/{key_name}" + ) # Helper func to yield results rather than return # to boost processing efficiency @@ -324,13 +380,18 @@ def db_extractor(): # for the number of processors we have cursor.close() # Close the current cursor, we are done with it. Child processes make new ones - # Process each table in a separate Process with Pipe - max_processes = multiprocessing.cpu_count() # Retrieve processor limit + # Process each table in a separate Process with Pipe table_index = iter(tables) processes: List[Tuple[Process, multi_processing_connection]] = ( [] ) # Holds the individual processes in tuples with the parent pipes - + + # TODO: Logic to handle only 1-2 processors + # or fail if there are 2 processors or less + print( + f"Data Warehouse Lambda - INFO - {max_processes} processors available for use" + ) + while True: while len(processes) < max_processes: try: From b961ab8a59e1137f184e230e889e89a0e8d7b95c Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Tue, 18 Mar 2025 13:12:45 -0400 Subject: [PATCH 3/5] yield processors for workers --- src/db_extractor_full.py | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 309ce4e..66ad046 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -3,6 +3,7 @@ import boto3 import pg8000 import json +import time import io import db_conn import os @@ -16,12 +17,17 @@ # know for the next run which time to select from current_run_time = datetime.datetime.now() +# Called to sleep the current thread and allow the workers +# more processors +def yield_for_workers(): + time.sleep(15) + # Define batch size fetching of records for json dumps batch_size = 100000 # Lambda global connection for warm starts # This connection is only used to grab the table names -# When multiprocessing, the child processes each create their own +# When multiprocessing, the worker processes each create their own # connection connection = None @@ -133,6 +139,8 @@ def fetch_and_upload_cursor_results( while workers: for i in range(len(workers) - 1, -1, -1): # Backwards loop because we pop list entries process, manager, batch_index = workers[i] + # Temporarily free this processor so the workers can use it + yield_for_workers() # Loop over the workers infinitely until we are out of workers # or until we find a worker whose process has completed if not process.is_alive(): @@ -224,19 +232,19 @@ def table_extractor( table_name, json_parameter_value, bucket_name, - child_conn: multi_processing_connection, + worker: multi_processing_connection, ): try: connection = db_conn.get_connection() if connection is None: error_msg = ( - f"Failed to connect to database during child process for {table_name}" + f"Failed to connect to database during worker process for {table_name}" ) print(f"Data Warehouse Lambda - ERROR - DB Extract - {error_msg}") - child_conn.send(error_msg) + worker.send(error_msg) return print( - f"Data Warehouse Lambda - INFO - Child extraction process created for table {table_name}" + f"Data Warehouse Lambda - INFO - Worker extraction process created for table {table_name}" ) s3_key = f"db_data/{str(json_parameter_value['data']['serialNumber'] + 1).zfill(6)}/{table_name}.json" cursor = connection.cursor() # type: ignore @@ -297,14 +305,14 @@ def table_extractor( + " does not match any criteria for data warehousing" ) - # Main child process complete - child_conn.send(f"Successfully processed {table_name}") + # Worker process complete + worker.send(f"Successfully processed {table_name}") except Exception as e: error_msg = f"Error processing {table_name}: {e}" print(f"Data Warehouse Lambda - ERROR - DB Extract - {error_msg}") - child_conn.send(error_msg) + worker.send(error_msg) finally: - child_conn.close() + worker.close() def db_extractor(): @@ -378,13 +386,13 @@ def db_extractor(): # Create multi processes for the number of tables we have # for the number of processors we have - cursor.close() # Close the current cursor, we are done with it. Child processes make new ones + cursor.close() # Close the current cursor, we are done with it. Worker processes make new ones # Process each table in a separate Process with Pipe table_index = iter(tables) processes: List[Tuple[Process, multi_processing_connection]] = ( [] - ) # Holds the individual processes in tuples with the parent pipes + ) # Holds the individual processes in tuples with the parent/manager pipes # TODO: Logic to handle only 1-2 processors # or fail if there are 2 processors or less @@ -399,19 +407,21 @@ def db_extractor(): except StopIteration: # This exception is thrown when the next function can't find anything break - parent_conn, child_conn = Pipe() + manager, worker = Pipe() process = Process( target=table_extractor, - args=(table_name, json_parameter_value, bucket_name, child_conn), + args=(table_name, json_parameter_value, bucket_name, worker), ) - processes.append((process, parent_conn)) + processes.append((process, manager)) process.start() if not processes: # No remaining processes are active break - + # Manage processes concurrently without sequential waiting for i in range(len(processes) - 1, -1, -1): + # Temporarily free this processor so the workers can use it + yield_for_workers() process, pipe = processes[i] if not process.is_alive(): # Process has finished, poll it and receive its message From 4cd0dea0671219b4d3ed7b4e8bb80ef3acb22741 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Wed, 19 Mar 2025 10:59:01 -0400 Subject: [PATCH 4/5] proper manager waiting and multi threading --- src/db_extractor_full.py | 188 ++++++++++++++++++--------------------- 1 file changed, 87 insertions(+), 101 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 66ad046..05f1b31 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -1,29 +1,25 @@ import multiprocessing from multiprocessing.connection import Connection as multi_processing_connection +from multiprocessing.connection import wait +from multiprocessing import Process, Pipe +from socket import socket import boto3 import pg8000 import json -import time import io import db_conn import os from uuid import UUID import datetime import gc -from multiprocessing import Process, Pipe from typing import List, Tuple # Storing current time, we will use this to update SSM when finished so that we # know for the next run which time to select from current_run_time = datetime.datetime.now() -# Called to sleep the current thread and allow the workers -# more processors -def yield_for_workers(): - time.sleep(15) - -# Define batch size fetching of records for json dumps -batch_size = 100000 +# How many SQL records a worker will fetch at a time +batch_size = 50000 # Lambda global connection for warm starts # This connection is only used to grab the table names @@ -115,7 +111,7 @@ def fetch_and_upload_cursor_results( f"Data Warehouse Lambda - ERROR - DB Extract - Error during multipart upload: {e}" ) raise e - # Early return + # Early return, mark process to no longer be alive return # Create a manager -> worker process for each batch in parallel @@ -126,7 +122,7 @@ def fetch_and_upload_cursor_results( p = Process(target=parallel_worker, args=(worker, batch, column_names)) workers.append((p, manager, i)) p.start() - + # # Start our JSON output document # @@ -134,69 +130,65 @@ def fetch_and_upload_cursor_results( # going to format this manually buffer.write(b"[") buffer_size += 1 + + print(f'Data Warehouse Lambda - INFO - {key_name} has {len(workers)} workers') # Concurrently wait for the workers to be complete while workers: - for i in range(len(workers) - 1, -1, -1): # Backwards loop because we pop list entries - process, manager, batch_index = workers[i] - # Temporarily free this processor so the workers can use it - yield_for_workers() - # Loop over the workers infinitely until we are out of workers - # or until we find a worker whose process has completed - if not process.is_alive(): - print( - f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} worker for batch index {batch_index} complete, polling it now. {len(workers)} workers remain" - ) - is_last_worker = len(workers) == 1 - # Process completed and worker has a letter for us - if manager.poll(): - # Have the manager receive the worker's data - msg_type, data = manager.recv() - if msg_type == "error": - print(f"Data Warehouse Lambda - ERROR - Worker {batch_index} failed: {data}") - raise RuntimeError(f"Worker {batch_index} error: {data}") - print( - f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} worker for batch index {batch_index} polled successfully. {len(workers)} workers remain" - ) - # Add data fragment to buffer - buffer.write(data) - buffer_size += len(data) - - if not is_last_worker: - # If it's not the last worker, append `,` to support a final - # [${worker_1_json}, ${worker_2_json}, ${worker_3_json}] output - buffer.write(b",") - buffer_size += 1 + finished_workers: List[multi_processing_connection | socket | int] = wait([mgr for (proc, mgr, bIndex) in workers], timeout=None) + for finished_worker in finished_workers: + is_last_worker = len(workers) == 1 + for i, (proc, mgr, batch_index) in enumerate(workers): + if mgr is finished_worker: + if mgr.poll(): + msg_type, data = mgr.recv() + mgr.close() # Free thread + proc.join() # Make sure worker is done + workers.pop(i) # Clear the worker + + if msg_type == "error": + print(f"Data Warehouse Lambda - ERROR - Worker {batch_index} failed: {data}") + raise RuntimeError(f"Worker {batch_index} error: {data}") + print( + f"Data Warehouse Lambda - INFO - DB Extract - {key_name} worker for batch index {batch_index} polled successfully. {len(workers)} workers remain" + ) + # Add data fragment to buffer + buffer.write(data) + buffer_size += len(data) - # Upload part if buffer reaches the minimum part size or - # if this is the final worker - if buffer_size >= min_part_size or is_last_worker: - if is_last_worker: - # Last call, close it up! - buffer.write(b"]") + if not is_last_worker: + # If it's not the last worker, append `,` to support a final + # [${worker_1_json}, ${worker_2_json}, ${worker_3_json}] output + buffer.write(b",") buffer_size += 1 - buffer.seek(0) - response = s3_client.upload_part( - Bucket=bucket_name, - Key=key_name, - PartNumber=part_number, - UploadId=upload_id, - Body=buffer, - ) - parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) - part_number += 1 - buffer.close() - buffer = io.BytesIO() - buffer_size = 0 - - gc.collect() - manager.close() # Free thread - process.join() # Make sure worker is done - workers.pop(i) # Clear the worker - + + # Upload part if buffer reaches the minimum part size or + # if this is the final worker + if buffer_size >= min_part_size or is_last_worker: + if is_last_worker: + # Last call, close it up! + buffer.write(b"]") + buffer_size += 1 + buffer.seek(0) + response = s3_client.upload_part( + Bucket=bucket_name, + Key=key_name, + PartNumber=part_number, + UploadId=upload_id, + Body=buffer, + ) + parts.append({"PartNumber": part_number, "ETag": response["ETag"]}) + part_number += 1 + buffer.close() + buffer = io.BytesIO() + buffer_size = 0 + + gc.collect() + + print( - f"Data Warehouse Lambda - DEBUG - DB Extract - {key_name} workers complete, announcing multipart completion" + f"Data Warehouse Lambda - INFO - DB Extract - {key_name} workers complete, announcing multipart completion" ) # Workers have completed and there is nothing left to upload to the multi-part @@ -227,7 +219,6 @@ def fetch_batches(cursor: pg8000.Cursor): def map_row_to_columns(row, column_names): return {column_names[i]: row[i] for i in range(len(column_names))} - def table_extractor( table_name, json_parameter_value, @@ -390,49 +381,44 @@ def db_extractor(): # Process each table in a separate Process with Pipe table_index = iter(tables) - processes: List[Tuple[Process, multi_processing_connection]] = ( + workers: List[Tuple[Process, multi_processing_connection]] = ( [] ) # Holds the individual processes in tuples with the parent/manager pipes - # TODO: Logic to handle only 1-2 processors - # or fail if there are 2 processors or less print( f"Data Warehouse Lambda - INFO - {max_processes} processors available for use" ) while True: - while len(processes) < max_processes: - try: - table_name = next(table_index) - except StopIteration: - # This exception is thrown when the next function can't find anything - break - manager, worker = Pipe() - process = Process( - target=table_extractor, - args=(table_name, json_parameter_value, bucket_name, worker), - ) - processes.append((process, manager)) - process.start() - if not processes: - # No remaining processes are active + try: + table_name = next(table_index) + except StopIteration: + # This exception is thrown when the next function can't find anything break + manager, worker = Pipe() + process = Process( + target=table_extractor, + args=(table_name, json_parameter_value, bucket_name, worker), + ) + workers.append((process, manager)) + process.start() - # Manage processes concurrently without sequential waiting - for i in range(len(processes) - 1, -1, -1): - # Temporarily free this processor so the workers can use it - yield_for_workers() - process, pipe = processes[i] - if not process.is_alive(): - # Process has finished, poll it and receive its message - if pipe.poll(): - message = pipe.recv() - print(f"Data Warehouse Lambda - INFO - {message}") - pipe.close() - process.join() - processes.pop( - i - ) # Remove the finished process to free up processor capacity + if not workers: + # No remaining workers have jobs, db export complete + break + + for i in range(len(workers) - 1, -1, -1): + finished_workers: List[multi_processing_connection | socket | int] = wait([mgr for (proc, mgr) in workers], timeout=None) + for finished_worker in finished_workers: + # find the matching worker + for i, (proc, mgr) in enumerate(workers): + if mgr is finished_worker: + if mgr.poll(): + message = mgr.recv() + print(f"Data Warehouse Lambda - INFO - {message}") + mgr.close() + proc.join() + workers.pop(i) # Create an SSM client try: From 9d65076a585f50b7657e2546be1ad2910ed4b4c7 Mon Sep 17 00:00:00 2001 From: cameroncaci Date: Wed, 19 Mar 2025 13:13:12 -0400 Subject: [PATCH 5/5] pass batch as a generator to workers and improve logging --- src/db_extractor_full.py | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/db_extractor_full.py b/src/db_extractor_full.py index 05f1b31..f136fcf 100644 --- a/src/db_extractor_full.py +++ b/src/db_extractor_full.py @@ -12,6 +12,7 @@ from uuid import UUID import datetime import gc +import traceback from typing import List, Tuple # Storing current time, we will use this to update SSM when finished so that we @@ -37,7 +38,7 @@ def default(self, obj): return obj.hex return json.JSONEncoder.default(self, obj) -def parallel_worker(worker_conn, batch, column_names): +def parallel_worker(worker_conn, batch, column_names, key_name): # Handle process worker -> back to json mapping and # respond it back via the pipe. We'll convert a batch # of rows into JSON and then close the pipe with our response @@ -45,7 +46,8 @@ def parallel_worker(worker_conn, batch, column_names): fragment = convert_batch_to_json(batch, column_names) worker_conn.send(("fragment", fragment)) except Exception as e: - worker_conn.send(("error", f"ERROR: {e}")) + tb = traceback.format_exc() + worker_conn.send(("error", f"ERROR: {e}\nKEY: {key_name}\nTrace: {tb}")) finally: worker_conn.close() @@ -75,12 +77,16 @@ def fetch_and_upload_cursor_results( buffer_size = 0 min_part_size = 50 * 1024 * 1024 # 50 MB - batches = list(fetch_batches(cursor)) - total_batches = len(batches) - print(f"Data Warehouse Lambda - INFO - {total_batches} batches found for {key_name}") + batch_generator = fetch_batches(cursor) - # Handle case of no records to convert to JSON - if total_batches == 0: + # Since a generator doesn't return a list, check the first case of if it is empty + try: + first_batch = next(batch_generator) + except StopIteration: + # This exception WILL be thrown if no SQL records return + first_batch = None + + if first_batch is None: # No records, make a new buffer and write an empty array for the dump try: buffer.write(b"[]") @@ -117,10 +123,17 @@ def fetch_and_upload_cursor_results( # Create a manager -> worker process for each batch in parallel # This makes a manager and worker combo for every batch that we will await later workers: List[Tuple[Process, multi_processing_connection, int]] = [] - for i, batch in enumerate(batches): + # Make sure we still process the first batch + # fb = first batch + fbManager, fbWorker = Pipe() + fbp = Process(target=parallel_worker, args=(fbWorker, first_batch, column_names, key_name)) + workers.append((fbp, fbManager, 1)) + fbp.start() + # Process remaining batches from the generator + for i, batch in enumerate(batch_generator): manager, worker = Pipe() - p = Process(target=parallel_worker, args=(worker, batch, column_names)) - workers.append((p, manager, i)) + p = Process(target=parallel_worker, args=(worker, batch, column_names, key_name)) + workers.append((p, manager, i+1)) # +1 because of first batch preceding this p.start() # @@ -299,7 +312,7 @@ def table_extractor( # Worker process complete worker.send(f"Successfully processed {table_name}") except Exception as e: - error_msg = f"Error processing {table_name}: {e}" + error_msg = f"Error processing {table_name}: {repr(e)}" print(f"Data Warehouse Lambda - ERROR - DB Extract - {error_msg}") worker.send(error_msg) finally: