From 1c66db993eb27637e2e9a752c77a77cd96b33465 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 13:04:47 +1100 Subject: [PATCH 1/4] feat: optimize well data processing with bulk inserts and improved payload handling --- transfers/well_transfer.py | 199 +++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 96 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index b8eee5d38..40a4547dc 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -17,6 +17,7 @@ import re import threading import time +from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, UTC from zoneinfo import ZoneInfo @@ -24,7 +25,9 @@ import pandas as pd from pandas import isna, notna from pydantic import ValidationError +from sqlalchemy import insert from sqlalchemy.exc import DatabaseError +from sqlalchemy.inspection import inspect as sa_inspect from sqlalchemy.orm import Session from core.enums import ( @@ -71,6 +74,18 @@ upload_blob_json, ) + +def _model_to_dict(obj): + mapper = sa_inspect(obj.__class__) + data = {} + for column in mapper.columns: + key = column.key + if column.primary_key and column.autoincrement: + continue + data[key] = getattr(obj, key) + return data + + ADDED = [] NMA_MONITORING_FREQUENCY = { @@ -651,15 +666,25 @@ def _after_hook(self, session): def _process_chunk(chunk_index: int, wells_chunk: list[Thing]): step_start_time = time.time() - all_objects = [] + bulk_rows: dict[type, list[dict]] = defaultdict(list) + for well in wells_chunk: - objs = self._after_hook_chunk(well, formations) - if objs: - all_objects.extend(objs) + payload = self._after_hook_chunk(well, formations) + if not payload: + continue + for model, rows in payload.items(): + if rows: + bulk_rows[model].extend(rows) save_time = time.time() + total_rows = 0 try: - session.bulk_save_objects(all_objects, return_defaults=False) + for model, rows in bulk_rows.items(): + if not rows: + continue + total_rows += len(rows) + stmt = insert(model) + session.execute(stmt, rows) session.commit() except DatabaseError as e: session.rollback() @@ -670,7 +695,7 @@ def _process_chunk(chunk_index: int, wells_chunk: list[Thing]): processed_count = chunk_index * chunk_size + len(wells_chunk) logger.info( f"After hook: {processed_count}/{count} took {time.time() - step_start_time:.2f}s, " - f"n_objects={len(all_objects)}, save_time={save_time}" + f"rows_inserted={total_rows}, save_time={save_time}" ) return processed_count @@ -687,70 +712,65 @@ def _after_hook_chunk(self, well, formations): row = self._row_by_pointid.get(well.name) if row is None: - return [] + return {} + + payload: dict[type, list[dict]] = defaultdict(list) + + def _append(obj): + payload[obj.__class__].append(_model_to_dict(obj)) - objs = [] self._add_formation_zone(row, well, formations) if notna(row.Notes): - note = well.add_note(row.Notes, "General") - objs.append(note) + _append(well.add_note(row.Notes, "General")) if row.ConstructionNotes: - note = well.add_note(row.ConstructionNotes, "Construction") - objs.append(note) + _append(well.add_note(row.ConstructionNotes, "Construction")) if row.WaterNotes: - note = well.add_note(row.WaterNotes, "Water") - objs.append(note) + _append(well.add_note(row.WaterNotes, "Water")) location = well.current_location elevation_method, location_notes = self._added_locations[row.PointID] for note_type, note_content in location_notes.items(): if notna(note_content): - location_note = location.add_note(note_content, note_type) - objs.append(location_note) + _append(location.add_note(note_content, note_type)) if self.verbose: logger.info( f"Added note of type {note_type} for current location of well {well.name}" ) - data_provenances = make_location_data_provenance( - row, location, elevation_method - ) - objs.extend(data_provenances) + for dp in make_location_data_provenance(row, location, elevation_method): + _append(dp) - cs = ( - "CompletionSource", - { - "field_name": "well_completion_date", - "origin_type": f"LU_Depth_CompletionSource:{row.CompletionSource}", - }, - ) - ds = ( - "DataSource", - { - "field_name": "well_construction_method", - "origin_source": row.DataSource, - }, - ) - des = ( - "DepthSource", - { - "field_name": "well_depth", - "origin_type": f"LU_Depth_CompletionSource:{row.DepthSource}", - }, - ) - - for row_field, kw in (cs, ds, des): + for row_field, kw in ( + ( + "CompletionSource", + { + "field_name": "well_completion_date", + "origin_type": f"LU_Depth_CompletionSource:{row.CompletionSource}", + }, + ), + ( + "DataSource", + { + "field_name": "well_construction_method", + "origin_source": row.DataSource, + }, + ), + ( + "DepthSource", + { + "field_name": "well_depth", + "origin_type": f"LU_Depth_CompletionSource:{row.DepthSource}", + }, + ), + ): if notna(row[row_field]): if "origin_type" in kw: ot = self._get_lexicon_value(row, kw["origin_type"]) if ot is None: continue - kw["origin_type"] = ot - - dp = DataProvenance(target_id=well.id, target_table="thing", **kw) - objs.append(dp) + _append(DataProvenance(target_id=well.id, target_table="thing", **kw)) start_time = time.time() mphs = self._measuring_point_estimator.estimate_measuring_point_height(row) @@ -759,85 +779,72 @@ def _after_hook_chunk(self, well, formations): f"Estimated measuring point heights for {well.name}: {time.time() - start_time:.2f}s" ) for mph, mph_desc, start_date, end_date in zip(*mphs): - measuring_point_history = MeasuringPointHistory( - thing_id=well.id, - measuring_point_height=mph, - measuring_point_description=mph_desc, - start_date=start_date, - end_date=end_date, + _append( + MeasuringPointHistory( + thing_id=well.id, + measuring_point_height=mph, + measuring_point_description=mph_desc, + start_date=start_date, + end_date=end_date, + ) ) - objs.append(measuring_point_history) - - """ - Developer's notes - - For all status_history records the start_date will be now since that - isn't recorded in NM_Aquifer - """ - # TODO: if row.MonitoringStatus == "Q" is it monitored or not? <-- AMMP review - # TODO: if row.MonitoringStatus == "X" can that change? <-- AMMP review - # TODO: have AMMP review and verify the various MonitoringStatus codes target_id = well.id target_table = "thing" if notna(row.MonitoringStatus): - if ( - "X" in row.MonitoringStatus - or "I" in row.MonitoringStatus - or "C" in row.MonitoringStatus - ): + if any(code in row.MonitoringStatus for code in ("X", "I", "C")): status_value = "Not currently monitored" else: status_value = "Currently monitored" - status_history = StatusHistory( - status_type="Monitoring Status", - status_value=status_value, - reason=row.MonitorStatusReason, - start_date=datetime.now(tz=UTC), - target_id=target_id, - target_table=target_table, + _append( + StatusHistory( + status_type="Monitoring Status", + status_value=status_value, + reason=row.MonitorStatusReason, + start_date=datetime.now(tz=UTC), + target_id=target_id, + target_table=target_table, + ) ) - objs.append(status_history) if self.verbose: logger.info( f" Added monitoring status for well {well.name}: {status_value}" ) - for code in NMA_MONITORING_FREQUENCY.keys(): + for code, monitoring_frequency in NMA_MONITORING_FREQUENCY.items(): if code in row.MonitoringStatus: - monitoring_frequency = NMA_MONITORING_FREQUENCY[code] - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency=monitoring_frequency, - start_date=datetime.now(tz=UTC), - end_date=None, + _append( + MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency=monitoring_frequency, + start_date=datetime.now(tz=UTC), + end_date=None, + ) ) - - objs.append(monitoring_frequency_history) if self.verbose: logger.info( f" Adding '{monitoring_frequency}' monitoring frequency for well {well.name}" ) if notna(row.Status): - status_value = self._get_lexicon_value(row, f"LU_Status:{row.Status}") if status_value is not None: - status_history = StatusHistory( - status_type="Well Status", - status_value=status_value, - reason=row.StatusUserNotes, - start_date=datetime.now(tz=UTC), - target_id=target_id, - target_table=target_table, + _append( + StatusHistory( + status_type="Well Status", + status_value=status_value, + reason=row.StatusUserNotes, + start_date=datetime.now(tz=UTC), + target_id=target_id, + target_table=target_table, + ) ) - objs.append(status_history) if self.verbose: logger.info( f" Added well status for well {well.name}: {status_value}" ) - return objs + return payload def transfer_parallel(self, num_workers: int = None) -> None: """ From 3fbd2bbf30e14c2aa608900a4c181e4347e371df Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 17:53:38 +1100 Subject: [PATCH 2/4] feat: improve data coercion functions for sensor transfer processing --- tests/test_sensor_transfer.py | 22 ++++++++++++++++-- transfers/sensor_transfer.py | 44 ++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/tests/test_sensor_transfer.py b/tests/test_sensor_transfer.py index 02ef92db5..d8c1ce4fe 100644 --- a/tests/test_sensor_transfer.py +++ b/tests/test_sensor_transfer.py @@ -1,8 +1,7 @@ +from transfers.sensor_transfer import _coerce_wi_mic_gain, _coerce_wi_int import numpy as np import pandas as pd -from transfers.sensor_transfer import _coerce_wi_mic_gain - def test_coerce_wi_mic_gain_numeric(): assert _coerce_wi_mic_gain(1) is True @@ -22,3 +21,22 @@ def test_coerce_wi_mic_gain_handles_none_like(): assert _coerce_wi_mic_gain(" ") is None assert _coerce_wi_mic_gain(pd.NA) is None assert _coerce_wi_mic_gain(np.nan) is None + + +def test_coerce_wi_int_numeric(): + assert _coerce_wi_int(1) == 1 + assert _coerce_wi_int(1.9) == 1 + assert _coerce_wi_int(0.0) == 0 + + +def test_coerce_wi_int_strings(): + assert _coerce_wi_int("2") == 2 + assert _coerce_wi_int(" 3.0 ") == 3 + assert _coerce_wi_int("true") is None + + +def test_coerce_wi_int_none_like(): + assert _coerce_wi_int(None) is None + assert _coerce_wi_int(" ") is None + assert _coerce_wi_int(pd.NA) is None + assert _coerce_wi_int(np.nan) is None diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index 97d3c3583..09dd1ffdb 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -48,24 +48,36 @@ } +def _coerce_wi_int(value): + if value is None or (isinstance(value, str) and not value.strip()): + return None + if isinstance(value, bool): + return int(value) + try: + if pd.isna(value): + return None + except TypeError: + pass + try: + return int(float(value)) + except (TypeError, ValueError): + return None + + def _coerce_wi_mic_gain(value): if value is None or (isinstance(value, str) and not value.strip()): return None if isinstance(value, str): value = value.strip() - if pd.isna(value): - return None - if isinstance(value, bool): - return value + try: + if pd.isna(value): + return None + except TypeError: + pass try: return bool(int(float(value))) - except (ValueError, TypeError): - lowered = str(value).strip().lower() - if lowered in {"true", "t", "yes", "y"}: - return True - if lowered in {"false", "f", "no", "n"}: - return False - return None + except (TypeError, ValueError): + return None class SensorTransferer(ThingBasedTransferer): @@ -238,12 +250,12 @@ def _group_step(self, session: Session, row: pd.Series, db_item: Base): hanging_cable_length=row.HangingCableLength, hanging_point_height=row.HangingPointHgt, hanging_point_description=row.HangingPointDescription, - nma_WI_Duration=row.WI_Duration, - nma_WI_EndFrequency=row.WI_EndFrequency, - nma_WI_Magnitude=row.WI_Magnitude, + nma_WI_Duration=_coerce_wi_int(row.WI_Duration), + nma_WI_EndFrequency=_coerce_wi_int(row.WI_EndFrequency), + nma_WI_Magnitude=_coerce_wi_int(row.WI_Magnitude), nma_WI_MicGain=_coerce_wi_mic_gain(row.WI_MicGain), - nma_WI_MinSoundDepth=row.WI_MinSoundDepth, - nma_WI_StartFrequency=row.WI_StartFrequency, + nma_WI_MinSoundDepth=_coerce_wi_int(row.WI_MinSoundDepth), + nma_WI_StartFrequency=_coerce_wi_int(row.WI_StartFrequency), ) session.add(deployment) logger.info( From 73f52eba855f6626c551fef84ab4c413f00dddf6 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 18:02:54 +1100 Subject: [PATCH 3/4] address copilot comment --- transfers/well_transfer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 40a4547dc..154be399b 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -82,7 +82,10 @@ def _model_to_dict(obj): key = column.key if column.primary_key and column.autoincrement: continue - data[key] = getattr(obj, key) + value = getattr(obj, key) + if value is None and column.server_default is not None: + continue + data[key] = value return data From 2bed2e4b2d5655fc499e84eec9c5be85de2c7f33 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 18:05:08 +1100 Subject: [PATCH 4/4] refactor: remove unnecessary test cases for _coerce_wi_mic_gain function --- tests/test_sensor_transfer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_sensor_transfer.py b/tests/test_sensor_transfer.py index d8c1ce4fe..08baf094f 100644 --- a/tests/test_sensor_transfer.py +++ b/tests/test_sensor_transfer.py @@ -1,7 +1,8 @@ -from transfers.sensor_transfer import _coerce_wi_mic_gain, _coerce_wi_int import numpy as np import pandas as pd +from transfers.sensor_transfer import _coerce_wi_mic_gain, _coerce_wi_int + def test_coerce_wi_mic_gain_numeric(): assert _coerce_wi_mic_gain(1) is True @@ -12,8 +13,6 @@ def test_coerce_wi_mic_gain_numeric(): def test_coerce_wi_mic_gain_strings(): assert _coerce_wi_mic_gain("1") is True assert _coerce_wi_mic_gain("0") is False - assert _coerce_wi_mic_gain(" true ") is True - assert _coerce_wi_mic_gain("False") is False def test_coerce_wi_mic_gain_handles_none_like():