From f11989b153cf3cca29334003b47b7025b26018cb Mon Sep 17 00:00:00 2001 From: jakeross Date: Mon, 29 Jun 2026 18:55:18 -0600 Subject: [PATCH] Cleanup Tier 2.3-2.7: replace magic strings/values with constants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 2.3: config.py `parameter == "ph"` → `PH` constant. - 2.4: record_type string compares (`"analytes"`/`"waterlevels"`) → `ANALYTES`/`WATERLEVELS` constants in transformer.py + record.py; added `ANALYTES = "analytes"` to constants.py. - 2.5: api/app.py bucket `"die_cache"` (×3) and queue `"die-queue"` → module constants `_CACHE_BUCKET` / `_TASK_QUEUE`. - 2.7: definitions.py default cron `"0 6 * * *"` and timezone `"America/Denver"` → `_DEFAULT_CRON` / `_SCHEDULE_TIMEZONE` constants. 2.6 (router_parameters from PARAMETER_SOURCE_MAP) deferred: it would force the lean API service to import all of backend.config (every connector + shapely) for a display list, and changes the endpoint's response shape. Documented in docs/cleanup-todo.md. No behavior change. Full suite (311) + dg check defs clean. Co-Authored-By: Claude Opus 4.8 --- backend/config.py | 2 +- backend/constants.py | 2 ++ backend/record.py | 3 ++- backend/transformer.py | 10 ++++++---- docs/cleanup-todo.md | 14 +++++++------- frontend/api/app.py | 13 +++++++++---- orchestration/definitions.py | 9 +++++++-- 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/backend/config.py b/backend/config.py index 5ff5430..7ca139a 100644 --- a/backend/config.py +++ b/backend/config.py @@ -587,7 +587,7 @@ def make_output_path(self): def _update_output_units(self): parameter = self.parameter.lower() - if parameter == "ph": + if parameter == PH: self.analyte_output_units = "" elif parameter in [CONDUCTIVITY, SPECIFIC_CONDUCTANCE]: self.analyte_output_units = MICROSIEMENS_PER_CENTIMETER diff --git a/backend/constants.py b/backend/constants.py index 52ab27a..a87806f 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -18,6 +18,8 @@ WATERLEVELS = "waterlevels" +# Record-type / parameter-group label for the non-waterlevel parameters. +ANALYTES = "analytes" ARSENIC = "arsenic" BICARBONATE = "bicarbonate" CALCIUM = "calcium" diff --git a/backend/record.py b/backend/record.py index 4489a99..9b7ee5e 100644 --- a/backend/record.py +++ b/backend/record.py @@ -22,6 +22,7 @@ CONVERSION_FACTOR, SOURCE_DATASTREAM_LINK, FEET, + WATERLEVELS, ) @@ -64,7 +65,7 @@ def _get_sigfig_formatted_value(self, attr): # both analyte and water level tables have the same fields, but the # rounding should only occur for water level tables - if self._payload.get("record_type") == "waterlevels": + if self._payload.get("record_type") == WATERLEVELS: field_sigfigs.append((PARAMETER_VALUE, 2)) for field, sigfigs in field_sigfigs: diff --git a/backend/transformer.py b/backend/transformer.py index 32c89c5..78368a2 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -26,6 +26,8 @@ DTW, EARLIEST, LATEST, + WATERLEVELS, + ANALYTES, ) from backend.geo_utils import datum_transform, ALLOWED_DATUMS from backend.logger import make_logger @@ -209,7 +211,7 @@ def do_transform( return None klassed_record = self._apply_elevation_transform(klassed_record) klassed_record = self._apply_well_depth_transform(klassed_record) - elif klassed_record.record_type in ("analytes", "waterlevels"): + elif klassed_record.record_type in (ANALYTES, WATERLEVELS): klassed_record = self._apply_unit_conversion(klassed_record) return klassed_record @@ -269,7 +271,7 @@ def _apply_well_depth_transform(self, klassed_record): def _apply_unit_conversion(self, klassed_record): output_units = ( self.config.analyte_output_units - if klassed_record.record_type == "analytes" + if klassed_record.record_type == ANALYTES else self.config.waterlevel_output_units ) source_result = klassed_record.parameter_value @@ -538,7 +540,7 @@ def _get_record_klass(self) -> type[ParameterRecord] | type[SummaryRecord]: return SummaryRecord if self.config.output_summary else ParameterRecord def _get_record_type(self) -> str: - return "waterlevels" + return WATERLEVELS def _get_parameter_name_and_units(self) -> tuple: """ @@ -557,7 +559,7 @@ def _get_record_klass(self) -> type[ParameterRecord] | type[SummaryRecord]: return SummaryRecord if self.config.output_summary else ParameterRecord def _get_record_type(self) -> str: - return "analytes" + return ANALYTES def _get_parameter_name_and_units(self) -> tuple: """ diff --git a/docs/cleanup-todo.md b/docs/cleanup-todo.md index fd460cb..01f1853 100644 --- a/docs/cleanup-todo.md +++ b/docs/cleanup-todo.md @@ -1,7 +1,7 @@ # DIE Cleanup TODO -> **Status:** Tier 1 (all), plus 2.1 and 2.2, are **DONE** (this PR). Remaining: -> Tier 2 items 2.3–2.7, all of Tier 3, all of Tier 4. +> **Status:** Tier 1 (all) and **all of Tier 2 except 2.6** are **DONE**. 2.6 is +> **deferred** (see note below). Remaining: 2.6, all of Tier 3, all of Tier 4. Prioritized cleanup backlog from a code-analysis sweep (backend + frontend + orchestration). Each item: location, effort (S/M/L), risk, and whether it @@ -36,11 +36,11 @@ registry). Tiers are ordered by safety — Tier 1 is batchable into one no-risk |---|----------|--------|--------|----------|---| | **2.1 (requested) ✅** | `backend/config.py:79-97` + reads at `config.py`, `orchestration/assets/products.py:169`, 2 tests | **Flatten `PARAMETER_SOURCE_MAP`**: `param: {"agencies":[...]}` → `param: [...]`. `"agencies"` is the only key ever present; every read is `["agencies"]`. | S | none | ✓ | | 2.2 ✅ | `backend/unifier.py` | `type(site_records) == list` → `isinstance(...)` | S | none | ✓ | -| 2.3 | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | | -| 2.4 | `backend/transformer.py:272`, `backend/record.py:70` | `record_type == "analytes"/"waterlevels"` string compares → constants | S | none | | -| 2.5 | `frontend/api/app.py` | Hoist magic strings to constants: bucket `"die_cache"` (×3), queue `"die-queue"`, header casing | S | none | | -| 2.6 | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP` instead of hardcoded 2-item list | M | none | | -| 2.7 | `orchestration/definitions.py:155,204` | Hoist default cron `"0 6 * * *"` + timezone `"America/Denver"` to constants | S | none | | +| 2.3 ✅ | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | | +| 2.4 ✅ | `backend/transformer.py`, `backend/record.py` | `record_type == "analytes"/"waterlevels"` → `ANALYTES`/`WATERLEVELS` constants (added `ANALYTES` to `constants.py`) | S | none | | +| 2.5 ✅ | `frontend/api/app.py` | Bucket `"die_cache"` (×3) + queue `"die-queue"` → `_CACHE_BUCKET` / `_TASK_QUEUE` module constants | S | none | | +| 2.6 ⏸ DEFERRED | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP`. **Deferred**: would force the lean API service to import the whole `backend.config` (all connectors + shapely) just for a display list, and changes the endpoint's response shape (`dtw`/`tds` → param keys). Needs a lightweight parameter registry or coordination with the frontend that consumes `/parameters`. | M | yes | | +| 2.7 ✅ | `orchestration/definitions.py` | Default cron `"0 6 * * *"` + timezone `"America/Denver"` → `_DEFAULT_CRON` / `_SCHEDULE_TIMEZONE` constants | S | none | | --- diff --git a/frontend/api/app.py b/frontend/api/app.py index 7746122..c14d002 100644 --- a/frontend/api/app.py +++ b/frontend/api/app.py @@ -36,6 +36,11 @@ allow_headers=["*"], ) +# GCS bucket holding cached API outputs; Cloud Tasks queue the trigger endpoints +# enqueue worker jobs onto. +_CACHE_BUCKET = "die_cache" +_TASK_QUEUE = "die-queue" + class BboxModel(BaseModel): minLat: float @@ -66,10 +71,10 @@ def router_unify_waterlevels(item: ConfigModel): if not item.force: storage_client = storage.Client() if item.output_summary: - bucket = storage_client.bucket("die_cache") + bucket = storage_client.bucket(_CACHE_BUCKET) exists = bucket.blob(f"{itemhash}.csv").exists() else: - bucket = storage_client.bucket("die_cache") + bucket = storage_client.bucket(_CACHE_BUCKET) exists = bucket.blob(f"{itemhash}.zip").exists() response = None @@ -78,7 +83,7 @@ def router_unify_waterlevels(item: ConfigModel): project = os.getenv("PROJECT_ID") location = os.getenv("LOCATION") url = os.getenv("WORKER_URL") - queue = "die-queue" + queue = _TASK_QUEUE cfgobj["output_name"] = itemhash task = tasks_v2.Task( @@ -132,7 +137,7 @@ def router_status(task_id: str): def router_download_unified_waterlevels(downloadhash: str, output_summary: bool): storage_client = storage.Client() - bucket = storage_client.bucket("die_cache") + bucket = storage_client.bucket(_CACHE_BUCKET) if output_summary: blob = bucket.blob(f"{downloadhash}.csv") diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 9c9da36..299da18 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -71,6 +71,11 @@ def load_input(self, context: dg.InputContext): "ogc_waterlevel_change", } +# Cohort scheduling defaults: cron used when a product has no `schedule`, and the +# timezone all cohort schedules run in. +_DEFAULT_CRON = "0 6 * * *" +_SCHEDULE_TIMEZONE = "America/Denver" + def _load_products() -> dict: return yaml.safe_load(_PRODUCTS_PATH.read_text()) @@ -152,7 +157,7 @@ def _build_cohorts(products_config: dict, specs_by_pid: dict) -> dict: name = _cohort_name(_cohort_key(specs)) cohort = cohorts.setdefault(name, {"members": [], "cron": None}) cohort["members"].append(pid) - cron = product.get("schedule", "0 6 * * *") + cron = product.get("schedule", _DEFAULT_CRON) if cohort["cron"] is None or _cron_sort_key(cron) < _cron_sort_key(cohort["cron"]): cohort["cron"] = cron return cohorts @@ -201,7 +206,7 @@ def _build_schedules( name=f"schedule_{name}", job=cohort_jobs[name], cron_schedule=cohort["cron"], - execution_timezone="America/Denver", + execution_timezone=_SCHEDULE_TIMEZONE, ) for name, cohort in cohorts.items() ]