Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ def make_output_path(self):

def _update_output_units(self):
parameter = self.parameter.lower()
if parameter == "ph":
if parameter == PH:
self.analyte_output_units = ""
elif parameter in [CONDUCTIVITY, SPECIFIC_CONDUCTANCE]:
self.analyte_output_units = MICROSIEMENS_PER_CENTIMETER
Expand Down
2 changes: 2 additions & 0 deletions backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@


WATERLEVELS = "waterlevels"
# Record-type / parameter-group label for the non-waterlevel parameters.
ANALYTES = "analytes"
ARSENIC = "arsenic"
BICARBONATE = "bicarbonate"
CALCIUM = "calcium"
Expand Down
3 changes: 2 additions & 1 deletion backend/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
CONVERSION_FACTOR,
SOURCE_DATASTREAM_LINK,
FEET,
WATERLEVELS,
)


Expand Down Expand Up @@ -64,7 +65,7 @@ def _get_sigfig_formatted_value(self, attr):

# both analyte and water level tables have the same fields, but the
# rounding should only occur for water level tables
if self._payload.get("record_type") == "waterlevels":
if self._payload.get("record_type") == WATERLEVELS:
field_sigfigs.append((PARAMETER_VALUE, 2))

for field, sigfigs in field_sigfigs:
Expand Down
10 changes: 6 additions & 4 deletions backend/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
DTW,
EARLIEST,
LATEST,
WATERLEVELS,
ANALYTES,
)
from backend.geo_utils import datum_transform, ALLOWED_DATUMS
from backend.logger import make_logger
Expand Down Expand Up @@ -209,7 +211,7 @@ def do_transform(
return None
klassed_record = self._apply_elevation_transform(klassed_record)
klassed_record = self._apply_well_depth_transform(klassed_record)
elif klassed_record.record_type in ("analytes", "waterlevels"):
elif klassed_record.record_type in (ANALYTES, WATERLEVELS):
klassed_record = self._apply_unit_conversion(klassed_record)
return klassed_record

Expand Down Expand Up @@ -269,7 +271,7 @@ def _apply_well_depth_transform(self, klassed_record):
def _apply_unit_conversion(self, klassed_record):
output_units = (
self.config.analyte_output_units
if klassed_record.record_type == "analytes"
if klassed_record.record_type == ANALYTES
else self.config.waterlevel_output_units
)
source_result = klassed_record.parameter_value
Expand Down Expand Up @@ -538,7 +540,7 @@ def _get_record_klass(self) -> type[ParameterRecord] | type[SummaryRecord]:
return SummaryRecord if self.config.output_summary else ParameterRecord

def _get_record_type(self) -> str:
return "waterlevels"
return WATERLEVELS

def _get_parameter_name_and_units(self) -> tuple:
"""
Expand All @@ -557,7 +559,7 @@ def _get_record_klass(self) -> type[ParameterRecord] | type[SummaryRecord]:
return SummaryRecord if self.config.output_summary else ParameterRecord

def _get_record_type(self) -> str:
return "analytes"
return ANALYTES

def _get_parameter_name_and_units(self) -> tuple:
"""
Expand Down
14 changes: 7 additions & 7 deletions docs/cleanup-todo.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# DIE Cleanup TODO

> **Status:** Tier 1 (all), plus 2.1 and 2.2, are **DONE** (this PR). Remaining:
> Tier 2 items 2.3–2.7, all of Tier 3, all of Tier 4.
> **Status:** Tier 1 (all) and **all of Tier 2 except 2.6** are **DONE**. 2.6 is
> **deferred** (see note below). Remaining: 2.6, all of Tier 3, all of Tier 4.

Prioritized cleanup backlog from a code-analysis sweep (backend + frontend +
orchestration). Each item: location, effort (S/M/L), risk, and whether it
Expand Down Expand Up @@ -36,11 +36,11 @@ registry). Tiers are ordered by safety — Tier 1 is batchable into one no-risk
|---|----------|--------|--------|----------|---|
| **2.1 (requested) ✅** | `backend/config.py:79-97` + reads at `config.py`, `orchestration/assets/products.py:169`, 2 tests | **Flatten `PARAMETER_SOURCE_MAP`**: `param: {"agencies":[...]}` → `param: [...]`. `"agencies"` is the only key ever present; every read is `["agencies"]`. | S | none | ✓ |
| 2.2 ✅ | `backend/unifier.py` | `type(site_records) == list` → `isinstance(...)` | S | none | ✓ |
| 2.3 | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | |
| 2.4 | `backend/transformer.py:272`, `backend/record.py:70` | `record_type == "analytes"/"waterlevels"` string compares → constants | S | none | |
| 2.5 | `frontend/api/app.py` | Hoist magic strings to constants: bucket `"die_cache"` (×3), queue `"die-queue"`, header casing | S | none | |
| 2.6 | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP` instead of hardcoded 2-item list | M | none | |
| 2.7 | `orchestration/definitions.py:155,204` | Hoist default cron `"0 6 * * *"` + timezone `"America/Denver"` to constants | S | none | |
| 2.3 | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | |
| 2.4 | `backend/transformer.py`, `backend/record.py` | `record_type == "analytes"/"waterlevels"` → `ANALYTES`/`WATERLEVELS` constants (added `ANALYTES` to `constants.py`) | S | none | |
| 2.5 | `frontend/api/app.py` | Bucket `"die_cache"` (×3) + queue `"die-queue"` → `_CACHE_BUCKET` / `_TASK_QUEUE` module constants | S | none | |
| 2.6 ⏸ DEFERRED | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP`. **Deferred**: would force the lean API service to import the whole `backend.config` (all connectors + shapely) just for a display list, and changes the endpoint's response shape (`dtw`/`tds` → param keys). Needs a lightweight parameter registry or coordination with the frontend that consumes `/parameters`. | M | yes | |
| 2.7 | `orchestration/definitions.py` | Default cron `"0 6 * * *"` + timezone `"America/Denver"` → `_DEFAULT_CRON` / `_SCHEDULE_TIMEZONE` constants | S | none | |

---

Expand Down
13 changes: 9 additions & 4 deletions frontend/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
allow_headers=["*"],
)

# GCS bucket holding cached API outputs; Cloud Tasks queue the trigger endpoints
# enqueue worker jobs onto.
_CACHE_BUCKET = "die_cache"
_TASK_QUEUE = "die-queue"


class BboxModel(BaseModel):
minLat: float
Expand Down Expand Up @@ -66,10 +71,10 @@ def router_unify_waterlevels(item: ConfigModel):
if not item.force:
storage_client = storage.Client()
if item.output_summary:
bucket = storage_client.bucket("die_cache")
bucket = storage_client.bucket(_CACHE_BUCKET)
exists = bucket.blob(f"{itemhash}.csv").exists()
else:
bucket = storage_client.bucket("die_cache")
bucket = storage_client.bucket(_CACHE_BUCKET)
exists = bucket.blob(f"{itemhash}.zip").exists()

response = None
Expand All @@ -78,7 +83,7 @@ def router_unify_waterlevels(item: ConfigModel):
project = os.getenv("PROJECT_ID")
location = os.getenv("LOCATION")
url = os.getenv("WORKER_URL")
queue = "die-queue"
queue = _TASK_QUEUE

cfgobj["output_name"] = itemhash
task = tasks_v2.Task(
Expand Down Expand Up @@ -132,7 +137,7 @@ def router_status(task_id: str):
def router_download_unified_waterlevels(downloadhash: str, output_summary: bool):

storage_client = storage.Client()
bucket = storage_client.bucket("die_cache")
bucket = storage_client.bucket(_CACHE_BUCKET)

if output_summary:
blob = bucket.blob(f"{downloadhash}.csv")
Expand Down
9 changes: 7 additions & 2 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def load_input(self, context: dg.InputContext):
"ogc_waterlevel_change",
}

# Cohort scheduling defaults: cron used when a product has no `schedule`, and the
# timezone all cohort schedules run in.
_DEFAULT_CRON = "0 6 * * *"
_SCHEDULE_TIMEZONE = "America/Denver"


def _load_products() -> dict:
return yaml.safe_load(_PRODUCTS_PATH.read_text())
Expand Down Expand Up @@ -152,7 +157,7 @@ def _build_cohorts(products_config: dict, specs_by_pid: dict) -> dict:
name = _cohort_name(_cohort_key(specs))
cohort = cohorts.setdefault(name, {"members": [], "cron": None})
cohort["members"].append(pid)
cron = product.get("schedule", "0 6 * * *")
cron = product.get("schedule", _DEFAULT_CRON)
if cohort["cron"] is None or _cron_sort_key(cron) < _cron_sort_key(cohort["cron"]):
cohort["cron"] = cron
return cohorts
Expand Down Expand Up @@ -201,7 +206,7 @@ def _build_schedules(
name=f"schedule_{name}",
job=cohort_jobs[name],
cron_schedule=cohort["cron"],
execution_timezone="America/Denver",
execution_timezone=_SCHEDULE_TIMEZONE,
)
for name, cohort in cohorts.items()
]
Expand Down
Loading