Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 87 additions & 34 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,48 +272,58 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True):
flags = {"TRANSFER_ALL_WELLS": True, "LIMIT": limit}

profile_artifacts: list[ProfileArtifact] = []
water_levels_only = get_bool_env("CONTINUOUS_WATER_LEVELS", False)

# =========================================================================
# PHASE 1: Foundation (Parallel - these are independent of each other)
# =========================================================================
message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)")
foundational_tasks = [
("AquiferSystems", transfer_aquifer_systems),
("GeologicFormations", transfer_geologic_formations),
]

with ThreadPoolExecutor(max_workers=2) as executor:
futures = {
executor.submit(
_execute_foundational_transfer_with_timing, name, func, limit
): name
for name, func in foundational_tasks
}

for future in as_completed(futures):
name = futures[future]
try:
result_name, result, elapsed = future.result()
logger.info(
f"Foundational transfer {result_name} completed in {elapsed:.2f}s"
)
except Exception as e:
logger.critical(f"Foundational transfer {name} failed: {e}")
raise # Fail fast - foundational transfers must succeed

message("TRANSFERRING WELLS")
use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", False)
if use_parallel_wells:
logger.info("Using PARALLEL wells transfer")
transferer = WellTransferer(flags=flags)
transferer.transfer_parallel()
results = (transferer.input_df, transferer.cleaned_df, transferer.errors)
if water_levels_only:
logger.info("CONTINUOUS_WATER_LEVELS set; running only continuous transfers")
_run_continuous_water_level_transfers(
metrics, flags, profile_waterlevels, profile_artifacts
)
return profile_artifacts
else:
results = _execute_transfer(WellTransferer, flags=flags)
metrics.well_metrics(*results)
message("PHASE 1: FOUNDATIONAL TRANSFERS (PARALLEL)")
foundational_tasks = [
("AquiferSystems", transfer_aquifer_systems),
("GeologicFormations", transfer_geologic_formations),
]

with ThreadPoolExecutor(max_workers=2) as executor:
futures = {
executor.submit(
_execute_foundational_transfer_with_timing, name, func, limit
): name
for name, func in foundational_tasks
}

for future in as_completed(futures):
name = futures[future]
try:
result_name, result, elapsed = future.result()
logger.info(
f"Foundational transfer {result_name} completed in {elapsed:.2f}s"
)
except Exception as e:
logger.critical(f"Foundational transfer {name} failed: {e}")
raise # Fail fast - foundational transfers must succeed

message("TRANSFERRING WELLS")
use_parallel_wells = get_bool_env("TRANSFER_PARALLEL_WELLS", True)
if use_parallel_wells:
logger.info("Using PARALLEL wells transfer")
transferer = WellTransferer(flags=flags)
transferer.transfer_parallel()
results = (transferer.input_df, transferer.cleaned_df, transferer.errors)
else:
results = _execute_transfer(WellTransferer, flags=flags)
metrics.well_metrics(*results)

# Get transfer flags
transfer_options = load_transfer_options()
transfer_options.transfer_pressure = False
transfer_options.transfer_acoustic = False
Comment on lines +325 to +326

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded overrides of transfer_options may cause confusion since these values are already set via environment variables in load_transfer_options(). Consider using environment variables consistently or documenting why these overrides are necessary.

Suggested change
transfer_options.transfer_pressure = False
transfer_options.transfer_acoustic = False

Copilot uses AI. Check for mistakes.
Comment thread
jirhiker marked this conversation as resolved.
use_parallel = get_bool_env("TRANSFER_PARALLEL", True)

if use_parallel:
Expand All @@ -338,6 +348,49 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True):
return profile_artifacts


def _run_water_level_transfers(
metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact]
):

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring for function _run_water_level_transfers. Add a docstring explaining the purpose, parameters, and behavior of this function.

Suggested change
):
):
"""
Run all water levelrelated data transfers and record associated metrics.
This helper executes the discrete water level transfer and both continuous
water level transfers (pressure and acoustic). For each transfer, it
collects results, records metrics on the provided ``metrics`` object, and
optionally profiles the continuous transfers, appending any generated
profile artifacts to ``profile_artifacts``.
Parameters
----------
metrics
Metrics collector instance used to record water level, pressure, and
acoustic transfer statistics via its metric methods.
flags
Configuration or command-line flags passed through to each transferer
to control how the transfer is executed.
profile_waterlevels : bool
If ``True``, run continuous water level transfers under
:class:`TransferProfiler` to collect performance profiles and store
them in ``profile_artifacts``. If ``False``, run the transfers
directly without profiling.
profile_artifacts : list[ProfileArtifact]
Mutable list that will be extended with any profile artifacts produced
while profiling the continuous water level transfers.
Returns
-------
None
This function operates via side effects (running transfers, updating
metrics, and appending profile artifacts) and does not return a value.
"""

Copilot uses AI. Check for mistakes.
message("WATER LEVEL TRANSFERS ONLY")

results = _execute_transfer(WaterLevelTransferer, flags=flags)
metrics.water_level_metrics(*results)

_run_continuous_water_level_transfers(
metrics, flags, profile_waterlevels, profile_artifacts
)


def _run_continuous_water_level_transfers(
metrics, flags, profile_waterlevels: bool, profile_artifacts: list[ProfileArtifact]
):

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring for function _run_continuous_water_levels. Add a docstring explaining the purpose, parameters, and behavior of this function.

Suggested change
):
):
"""
Run continuous water level transfers (pressure and acoustic) and record metrics.
This helper executes the continuous water level transferers for pressure and
acoustic data. When ``profile_waterlevels`` is True, each transfer is wrapped
in a :class:`TransferProfiler` and the resulting :class:`ProfileArtifact`
instances are appended to ``profile_artifacts``. In all cases, the provided
``metrics`` object is updated with the results of each transfer.
:param metrics: Metrics collector used to record transfer results for pressure
and acoustic continuous water levels.
:param flags: Configuration or command-line flags passed through to the
underlying transferer classes.
:param profile_waterlevels: If True, run the transfers under a profiler and
collect profiling artifacts; if False, run the transfers without profiling.
:param profile_artifacts: List that will be extended with any generated
:class:`ProfileArtifact` objects when profiling is enabled.
"""

Copilot uses AI. Check for mistakes.
message("CONTINUOUS WATER LEVEL TRANSFERS")

if profile_waterlevels:
profiler = TransferProfiler("waterlevels_continuous_pressure")
results, artifact = profiler.run(
_execute_transfer, WaterLevelsContinuousPressureTransferer, flags
)
profile_artifacts.append(artifact)
else:
results = _execute_transfer(
WaterLevelsContinuousPressureTransferer, flags=flags
)
metrics.pressure_metrics(*results)

if profile_waterlevels:
profiler = TransferProfiler("waterlevels_continuous_acoustic")
results, artifact = profiler.run(
_execute_transfer, WaterLevelsContinuousAcousticTransferer, flags
)
profile_artifacts.append(artifact)
else:
results = _execute_transfer(
WaterLevelsContinuousAcousticTransferer, flags=flags
)
metrics.acoustic_metrics(*results)


def _transfer_parallel(
metrics,
flags,
Expand Down
Loading