From 73a1aff1fc48ac947b2416c587d82b6524fc796c Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 22:09:45 +1100 Subject: [PATCH 1/2] feat: add support for continuous water level transfers and refactor transfer logic --- transfers/transfer.py | 141 ++++++++++++++++++++++++++++++++---------- 1 file changed, 107 insertions(+), 34 deletions(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index c4501002a..e84d28f78 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -272,48 +272,58 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} profile_artifacts: list[ProfileArtifact] = [] + water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) # ========================================================================= # PHASE 1: Foundation (Parallel - these are independent of each other) # ========================================================================= - message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") - foundational_tasks = [ - ("AquiferSystems", transfer_aquifer_systems), - ("GeologicFormations", transfer_geologic_formations), - ] - - with ThreadPoolExecutor(max_workers=2) as executor: - futures = { - executor.submit( - _execute_foundational_transfer_with_timing, name, func, limit - ): name - for name, func in foundational_tasks - } - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - logger.info( - f"Foundational transfer {result_name} completed in {elapsed:.2f}s" - ) - except Exception as e: - logger.critical(f"Foundational transfer {name} failed: {e}") - raise # Fail fast - foundational transfers must succeed - - message("TRANSFERRING WELLS") - use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False) - if use_parallel_wells: - logger.info("Using PARALLEL wells transfer") - transferer = WellTransferer(flags=flags) - transferer.transfer_parallel() - results = (transferer.input_df, transferer.cleaned_df, transferer.errors) + if water_levels_only: + logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") + _run_continuous_water_levels( + metrics, flags, profile_waterlevels, profile_artifacts + ) + return profile_artifacts else: - results = _execute_transfer(WellTransferer, flags=flags) - metrics.well_metrics(*results) + message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") + foundational_tasks = [ + ("AquiferSystems", transfer_aquifer_systems), + ("GeologicFormations", transfer_geologic_formations), + ] + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = { + executor.submit( + _execute_foundational_transfer_with_timing, name, func, limit + ): name + for name, func in foundational_tasks + } + + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + logger.info( + f"Foundational transfer {result_name} completed in {elapsed:.2f}s" + ) + except Exception as e: + logger.critical(f"Foundational transfer {name} failed: {e}") + raise # Fail fast - foundational transfers must succeed + + message("TRANSFERRING WELLS") + use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", True) + if use_parallel_wells: + logger.info("Using PARALLEL wells transfer") + transferer = WellTransferer(flags=flags) + transferer.transfer_parallel() + results = (transferer.input_df, transferer.cleaned_df, transferer.errors) + else: + results = _execute_transfer(WellTransferer, flags=flags) + metrics.well_metrics(*results) # Get transfer flags transfer_options = load_transfer_options() + transfer_options.transfer_pressure = False + transfer_options.transfer_acoustic = False use_parallel = get_bool_env("TRANSFER_PARALLEL", True) if use_parallel: @@ -338,6 +348,69 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): return profile_artifacts +def _run_water_level_transfers( + metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] +): + message("WATER LEVEL TRANSFERS ONLY") + + results = _execute_transfer(WaterLevelTransferer, flags=flags) + metrics.water_level_metrics(*results) + + if profile_waterlevels: + profiler = TransferProfiler("waterlevels_continuous_pressure") + results, artifact = profiler.run( + _execute_transfer, WaterLevelsContinuousPressureTransferer, flags + ) + profile_artifacts.append(artifact) + else: + results = _execute_transfer( + WaterLevelsContinuousPressureTransferer, flags=flags + ) + metrics.pressure_metrics(*results) + + if profile_waterlevels: + profiler = TransferProfiler("waterlevels_continuous_acoustic") + results, artifact = profiler.run( + _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags + ) + profile_artifacts.append(artifact) + else: + results = _execute_transfer( + WaterLevelsContinuousAcousticTransferer, flags=flags + ) + metrics.acoustic_metrics(*results) + + +def _run_continuous_water_levels( + metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] +): + message("CONTINUOUS WATER LEVEL TRANSFERS") + + if profile_waterlevels: + profiler = TransferProfiler("waterlevels_continuous_pressure") + results, artifact = profiler.run( + _execute_transfer, WaterLevelsContinuousPressureTransferer, flags + ) + profile_artifacts.append(artifact) + else: + results = _execute_transfer( + WaterLevelsContinuousPressureTransferer, flags=flags + ) + metrics.pressure_metrics(*results) + + if profile_waterlevels: + profiler = TransferProfiler("waterlevels_continuous_acoustic") + results, artifact = profiler.run( + _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags + ) + profile_artifacts.append(artifact) + else: + results = _execute_transfer( + WaterLevelsContinuousAcousticTransferer, flags=flags + ) + metrics.acoustic_metrics(*results) + + def _transfer_parallel( metrics, flags, From e0419e3236c1bf2b000ce80dfbbcb1775ef6a353 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 30 Jan 2026 22:15:04 +1100 Subject: [PATCH 2/2] fix: correct spelling of CONTINUOUS in water level transfer logic --- transfers/transfer.py | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/transfers/transfer.py b/transfers/transfer.py index e84d28f78..2d33176b2 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -272,14 +272,14 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True): flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit} profile_artifacts: list[ProfileArtifact] = [] - water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) + water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) # ========================================================================= # PHASE 1: Foundation (Parallel - these are independent of each other) # ========================================================================= if water_levels_only: - logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") - _run_continuous_water_levels( + logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers") + _run_continuous_water_level_transfers( metrics, flags, profile_waterlevels, profile_artifacts ) return profile_artifacts @@ -356,32 +356,12 @@ def _run_water_level_transfers( results = _execute_transfer(WaterLevelTransferer, flags=flags) metrics.water_level_metrics(*results) - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_pressure") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousPressureTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) - - if profile_waterlevels: - profiler = TransferProfiler("waterlevels_continuous_acoustic") - results, artifact = profiler.run( - _execute_transfer, WaterLevelsContinuousAcousticTransferer, flags - ) - profile_artifacts.append(artifact) - else: - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) + _run_continuous_water_level_transfers( + metrics, flags, profile_waterlevels, profile_artifacts + ) -def _run_continuous_water_levels( +def _run_continuous_water_level_transfers( metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] ): message("CONTINUOUS WATER LEVEL TRANSFERS")