BDMS-520-1-1-Cleanup-2.0#446
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new mode for running only continuous water level transfers and refactors water level transfer logic into dedicated helper functions. It also modifies default configuration values for parallel processing.
Changes:
- Added a
CONTINOUS_WATER_LEVELSenvironment variable flag to enable a specialized transfer mode that skips all other transfer types - Extracted water level transfer logic into two new helper functions:
_run_water_level_transfersand_run_continuous_water_levels - Changed the default value for parallel wells transfer from
FalsetoTrueand disabled pressure/acoustic transfers by default
| water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) | ||
|
|
||
| # ========================================================================= | ||
| # PHASE 1: Foundation (Parallel - these are independent of each other) | ||
| # ========================================================================= | ||
| message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") | ||
| foundational_tasks = [ | ||
| ("AquiferSystems", transfer_aquifer_systems), | ||
| ("GeologicFormations", transfer_geologic_formations), | ||
| ] | ||
|
|
||
| with ThreadPoolExecutor(max_workers=2) as executor: | ||
| futures = { | ||
| executor.submit( | ||
| _execute_foundational_transfer_with_timing, name, func, limit | ||
| ): name | ||
| for name, func in foundational_tasks | ||
| } | ||
|
|
||
| for future in as_completed(futures): | ||
| name = futures[future] | ||
| try: | ||
| result_name, result, elapsed = future.result() | ||
| logger.info( | ||
| f"Foundational transfer {result_name} completed in {elapsed:.2f}s" | ||
| ) | ||
| except Exception as e: | ||
| logger.critical(f"Foundational transfer {name} failed: {e}") | ||
| raise # Fail fast - foundational transfers must succeed | ||
|
|
||
| message("TRANSFERRING WELLS") | ||
| use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False) | ||
| if use_parallel_wells: | ||
| logger.info("Using PARALLEL wells transfer") | ||
| transferer = WellTransferer(flags=flags) | ||
| transferer.transfer_parallel() | ||
| results = (transferer.input_df, transferer.cleaned_df, transferer.errors) | ||
| if water_levels_only: | ||
| logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") |
There was a problem hiding this comment.
Corrected spelling of 'CONTINOUS_WATER_LEVELS' to 'CONTINUOUS_WATER_LEVELS'.
| water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) | |
| # ========================================================================= | |
| # PHASE 1: Foundation (Parallel - these are independent of each other) | |
| # ========================================================================= | |
| message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") | |
| foundational_tasks = [ | |
| ("AquiferSystems", transfer_aquifer_systems), | |
| ("GeologicFormations", transfer_geologic_formations), | |
| ] | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| futures = { | |
| executor.submit( | |
| _execute_foundational_transfer_with_timing, name, func, limit | |
| ): name | |
| for name, func in foundational_tasks | |
| } | |
| for future in as_completed(futures): | |
| name = futures[future] | |
| try: | |
| result_name, result, elapsed = future.result() | |
| logger.info( | |
| f"Foundational transfer {result_name} completed in {elapsed:.2f}s" | |
| ) | |
| except Exception as e: | |
| logger.critical(f"Foundational transfer {name} failed: {e}") | |
| raise # Fail fast - foundational transfers must succeed | |
| message("TRANSFERRING WELLS") | |
| use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False) | |
| if use_parallel_wells: | |
| logger.info("Using PARALLEL wells transfer") | |
| transferer = WellTransferer(flags=flags) | |
| transferer.transfer_parallel() | |
| results = (transferer.input_df, transferer.cleaned_df, transferer.errors) | |
| if water_levels_only: | |
| logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") | |
| water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) | |
| # ========================================================================= | |
| # PHASE 1: Foundation (Parallel - these are independent of each other) | |
| # ========================================================================= | |
| if water_levels_only: | |
| logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers") |
| water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) | ||
|
|
||
| # ========================================================================= | ||
| # PHASE 1: Foundation (Parallel - these are independent of each other) | ||
| # ========================================================================= | ||
| message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") | ||
| foundational_tasks = [ | ||
| ("AquiferSystems", transfer_aquifer_systems), | ||
| ("GeologicFormations", transfer_geologic_formations), | ||
| ] | ||
|
|
||
| with ThreadPoolExecutor(max_workers=2) as executor: | ||
| futures = { | ||
| executor.submit( | ||
| _execute_foundational_transfer_with_timing, name, func, limit | ||
| ): name | ||
| for name, func in foundational_tasks | ||
| } | ||
|
|
||
| for future in as_completed(futures): | ||
| name = futures[future] | ||
| try: | ||
| result_name, result, elapsed = future.result() | ||
| logger.info( | ||
| f"Foundational transfer {result_name} completed in {elapsed:.2f}s" | ||
| ) | ||
| except Exception as e: | ||
| logger.critical(f"Foundational transfer {name} failed: {e}") | ||
| raise # Fail fast - foundational transfers must succeed | ||
|
|
||
| message("TRANSFERRING WELLS") | ||
| use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False) | ||
| if use_parallel_wells: | ||
| logger.info("Using PARALLEL wells transfer") | ||
| transferer = WellTransferer(flags=flags) | ||
| transferer.transfer_parallel() | ||
| results = (transferer.input_df, transferer.cleaned_df, transferer.errors) | ||
| if water_levels_only: | ||
| logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") |
There was a problem hiding this comment.
Corrected spelling of 'CONTINOUS_WATER_LEVELS' to 'CONTINUOUS_WATER_LEVELS'.
| water_levels_only = get_bool_env("CONTINOUS_WATER_LEVELS", False) | |
| # ========================================================================= | |
| # PHASE 1: Foundation (Parallel - these are independent of each other) | |
| # ========================================================================= | |
| message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)") | |
| foundational_tasks = [ | |
| ("AquiferSystems", transfer_aquifer_systems), | |
| ("GeologicFormations", transfer_geologic_formations), | |
| ] | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| futures = { | |
| executor.submit( | |
| _execute_foundational_transfer_with_timing, name, func, limit | |
| ): name | |
| for name, func in foundational_tasks | |
| } | |
| for future in as_completed(futures): | |
| name = futures[future] | |
| try: | |
| result_name, result, elapsed = future.result() | |
| logger.info( | |
| f"Foundational transfer {result_name} completed in {elapsed:.2f}s" | |
| ) | |
| except Exception as e: | |
| logger.critical(f"Foundational transfer {name} failed: {e}") | |
| raise # Fail fast - foundational transfers must succeed | |
| message("TRANSFERRING WELLS") | |
| use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False) | |
| if use_parallel_wells: | |
| logger.info("Using PARALLEL wells transfer") | |
| transferer = WellTransferer(flags=flags) | |
| transferer.transfer_parallel() | |
| results = (transferer.input_df, transferer.cleaned_df, transferer.errors) | |
| if water_levels_only: | |
| logger.info("CONTINOUS_WATER_LEVELS set; running only continuous transfers") | |
| water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False) | |
| # ========================================================================= | |
| # PHASE 1: Foundation (Parallel - these are independent of each other) | |
| # ========================================================================= | |
| if water_levels_only: | |
| logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers") |
| transfer_options.transfer_pressure = False | ||
| transfer_options.transfer_acoustic = False |
There was a problem hiding this comment.
Hardcoded overrides of transfer_options may cause confusion since these values are already set via environment variables in load_transfer_options(). Consider using environment variables consistently or documenting why these overrides are necessary.
| transfer_options.transfer_pressure = False | |
| transfer_options.transfer_acoustic = False |
|
|
||
| def _run_water_level_transfers( | ||
| metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] | ||
| ): |
There was a problem hiding this comment.
Missing docstring for function _run_water_level_transfers. Add a docstring explaining the purpose, parameters, and behavior of this function.
| ): | |
| ): | |
| """ | |
| Run all water level–related data transfers and record associated metrics. | |
| This helper executes the discrete water level transfer and both continuous | |
| water level transfers (pressure and acoustic). For each transfer, it | |
| collects results, records metrics on the provided ``metrics`` object, and | |
| optionally profiles the continuous transfers, appending any generated | |
| profile artifacts to ``profile_artifacts``. | |
| Parameters | |
| ---------- | |
| metrics | |
| Metrics collector instance used to record water level, pressure, and | |
| acoustic transfer statistics via its metric methods. | |
| flags | |
| Configuration or command-line flags passed through to each transferer | |
| to control how the transfer is executed. | |
| profile_waterlevels : bool | |
| If ``True``, run continuous water level transfers under | |
| :class:`TransferProfiler` to collect performance profiles and store | |
| them in ``profile_artifacts``. If ``False``, run the transfers | |
| directly without profiling. | |
| profile_artifacts : list[ProfileArtifact] | |
| Mutable list that will be extended with any profile artifacts produced | |
| while profiling the continuous water level transfers. | |
| Returns | |
| ------- | |
| None | |
| This function operates via side effects (running transfers, updating | |
| metrics, and appending profile artifacts) and does not return a value. | |
| """ |
|
|
||
| def _run_continuous_water_levels( | ||
| metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact] | ||
| ): |
There was a problem hiding this comment.
Missing docstring for function _run_continuous_water_levels. Add a docstring explaining the purpose, parameters, and behavior of this function.
| ): | |
| ): | |
| """ | |
| Run continuous water level transfers (pressure and acoustic) and record metrics. | |
| This helper executes the continuous water level transferers for pressure and | |
| acoustic data. When ``profile_waterlevels`` is True, each transfer is wrapped | |
| in a :class:`TransferProfiler` and the resulting :class:`ProfileArtifact` | |
| instances are appended to ``profile_artifacts``. In all cases, the provided | |
| ``metrics`` object is updated with the results of each transfer. | |
| :param metrics: Metrics collector used to record transfer results for pressure | |
| and acoustic continuous water levels. | |
| :param flags: Configuration or command-line flags passed through to the | |
| underlying transferer classes. | |
| :param profile_waterlevels: If True, run the transfers under a profiler and | |
| collect profiling artifacts; if False, run the transfers without profiling. | |
| :param profile_artifacts: List that will be extended with any generated | |
| :class:`ProfileArtifact` objects when profiling is enabled. | |
| """ |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 73a1aff1fc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Why
This PR addresses the following problem / context:
How
Implementation summary - the following was changed / added / removed:
Notes
Any special considerations, workarounds, or follow-up work to note?