Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,30 @@
from backend.logger import make_logger


# Which sources report each parameter (empirical availability). A plain
# parameter -> [source_key, ...] map; the waterlevels list mirrors the sources
# with a waterlevel class in the SOURCES registry (asserted by
# tests/test_source_registry.py), while the analyte lists are authored because
# they encode which analytes each agency actually reports.
PARAMETER_SOURCE_MAP = {
WATERLEVELS: {"agencies": ["bernco", "cabq", "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", "wqp"]},
CARBONATE: {"agencies": ["nmbgmr_amp", "wqp"]},
ARSENIC: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"]},
URANIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"]},
SPECIFIC_CONDUCTANCE: {"agencies": ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
CONDUCTIVITY: {"agencies": ["bor", "nmose_isc_seven_rivers", "wqp"]},
BICARBONATE: {"agencies": ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
CALCIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
CHLORIDE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
FLUORIDE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
MAGNESIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
NITRATE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
PH: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
POTASSIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
SILICA: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
SODIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
SULFATE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
TDS: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]},
WATERLEVELS: ["bernco", "cabq", "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", "wqp"],
CARBONATE: ["nmbgmr_amp", "wqp"],
ARSENIC: ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"],
URANIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"],
SPECIFIC_CONDUCTANCE: ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
CONDUCTIVITY: ["bor", "nmose_isc_seven_rivers", "wqp"],
BICARBONATE: ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
CALCIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
CHLORIDE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
FLUORIDE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
MAGNESIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
NITRATE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
PH: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
POTASSIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
SILICA: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
SODIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
SULFATE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
TDS: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"],
}

@dataclass(frozen=True)
Expand Down Expand Up @@ -280,10 +285,9 @@ def _load_from_yaml(self, path):
self.warn(f"Config file {path} not found")

def get_config_and_false_agencies(self):
entry = PARAMETER_SOURCE_MAP.get(self.parameter)
if entry is None:
config_agencies = PARAMETER_SOURCE_MAP.get(self.parameter)
if config_agencies is None:
raise ValueError(f"Unknown parameter {self.parameter!r}. Valid parameters: {sorted(PARAMETER_SOURCE_MAP)}")
config_agencies = entry["agencies"]
false_agencies = [a for a in SOURCE_KEYS if a not in config_agencies]
return config_agencies, false_agencies

Expand All @@ -309,9 +313,6 @@ def all_site_sources(self):
source.set_config(self)
sources.append((source, None))

# pods = NMOSEPODSiteSource()
# pods.set_config(self)
# sources.append((pods, None))
return sources

def analyte_sources(self):
Expand Down
1 change: 0 additions & 1 deletion backend/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def setup_logging(level=None, log_format=None, path=None):
else:
path = os.path.join(path, "die.log")

# shandler = logging.StreamHandler()
rhandler = RotatingFileHandler(path, maxBytes=1e8, backupCount=50)
_managed_handlers.append(rhandler)

Expand Down
3 changes: 0 additions & 3 deletions backend/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ def update(self, **kw):
self._payload.update(kw)

def _get_sigfig_formatted_value(self, attr):
# v = self._payload.get(attr)
# if v is None and self.defaults:
# v = self.defaults.get(attr)
v = self.__getattr__(attr)

field_sigfigs = [
Expand Down
40 changes: 1 addition & 39 deletions backend/unifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,44 +81,6 @@ def unify_sites(config):
return True


# def _perister_factory(config):
# """
# Determines the type of persister to use based on the configuration. The
# persister types are:

# - CSVPersister
# - CloudStoragePersister
# - GeoJSONPersister

# Parameters
# -------
# config: Config
# The configuration object

# Returns
# -------
# Persister
# The persister object to use
# """
# persister_klass = CSVPersister
# if config.use_cloud_storage:
# persister_klass = CloudStoragePersister
# elif config.output_format == OutputFormat.CSV:
# persister_klass = CSVPersister
# elif config.output_format == OutputFormat.GEOJSON:
# persister_klass = GeoJSONPersister
# elif config.output_format == OutputFormat.GEOSERVER:
# persister_klass = GeoServerPersister

# return persister_klass(config)


# def _unify_wrapper(config, func):
# persister = _perister_factory(config)
# func(persister)
# persister.save(config.output_path)


def _site_wrapper(site_source, parameter_source, persister, config, raise_errors=False):

try:
Expand Down Expand Up @@ -162,7 +124,7 @@ def _site_wrapper(site_source, parameter_source, persister, config, raise_errors
persister.sites.extend(sites)
else:
for site_records in site_source.chunks(sites):
if type(site_records) == list:
if isinstance(site_records, list):
n = len(site_records)
if first_flag:
first_flag = False
Expand Down
9 changes: 0 additions & 9 deletions backend/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,6 @@ def county_bounds_handler():
return make_cors_response({"wkt": bounds})


# @app.route("/sources_in_polygon")
# def sources_in_polygon_handler():
# from backend.unifier import get_sources_in_polygon
# polygon = request.args.get("wkt")
# sources = get_sources_in_polygon(polygon)
#
# return make_cors_response({"sources": sources})


@app.route("/unify_analytes", methods=["POST"])
def unify_analytes_handler():
from backend.unifier import unify_analytes
Expand Down
72 changes: 72 additions & 0 deletions docs/cleanup-todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# DIE Cleanup TODO

> **Status:** Tier 1 (all), plus 2.1 and 2.2, are **DONE** (this PR). Remaining:
> Tier 2 items 2.3–2.7, all of Tier 3, all of Tier 4.

Prioritized cleanup backlog from a code-analysis sweep (backend + frontend +
orchestration). Each item: location, effort (S/M/L), risk, and whether it
changes behavior. Items marked ✓ were confirmed against current `main`
(post #95 dual-fetch, #99 descriptions, #100 new products, #101/#102 config
registry). Tiers are ordered by safety — Tier 1 is batchable into one no-risk PR.

---

## Tier 1 — Dead / commented-out code (no behavior change, safe) — ✅ DONE

| # | Location | Action | Effort | ✓ |
|---|----------|--------|--------|---|
| 1.1 | `backend/unifier.py:84-113` | Delete commented `_perister_factory()` block (~30 lines) | S | ✓ |
| 1.2 | `backend/unifier.py:116-120` | Delete commented `_unify_wrapper()` | S | ✓ |
| 1.3 | `backend/config.py` `all_site_sources()` | Delete commented `pods` wiring (3 lines) | S | ✓ |
| 1.4 | `frontend/api/app.py:60-82` | Delete commented `create_queue()` (~23 lines) | S | ✓ |
| 1.5 | `frontend/api/app.py:101-103, 139-147` | Delete commented cache-check + alt Cloud Tasks payload | S | ✓ |
| 1.6 | `frontend/api/app.py:87` | Remove `print("unify waterlevels", item)` debug | S | ✓ |
| 1.7 | `frontend/api/app.py:114-119` | Remove dead `task_id is not None` path (never set) | S | ✓ |
| 1.8 | `frontend/api/app.py` `ConfigModel.sources` | Remove field — accepted on POST but never read | S | ✓ |
| 1.9 | `orchestration/definitions.py:211` | Drop `_all_specs` from `_build_graph` return + unpack — unused | S | ✓ |
| 1.10 | `orchestration/resources/die_config.py:44-56` | Drop `is_summary`/`output_format` mapping — **now dead**: source asset always passes synth `output_type="ogc_timeseries"` and `unify_source_both` ignores both. (Also resolves the missing `ogc_hardness`/`ogc_water_type` in the `is_summary` tuple — harmless today, but the whole block goes.) | S | ✓ |
| 1.11 | `backend/worker.py:91-97` | Delete commented `sources_in_polygon_handler()` | S | |
| 1.12 | `backend/record.py:52-55`, `frontend/cli.py:28`, `backend/logger.py:75` | Delete misc commented lines | S | |

---

## Tier 2 — Consistency / small tidy (low risk)

| # | Location | Action | Effort | Behavior | ✓ |
|---|----------|--------|--------|----------|---|
| **2.1 (requested) ✅** | `backend/config.py:79-97` + reads at `config.py`, `orchestration/assets/products.py:169`, 2 tests | **Flatten `PARAMETER_SOURCE_MAP`**: `param: {"agencies":[...]}` → `param: [...]`. `"agencies"` is the only key ever present; every read is `["agencies"]`. | S | none | ✓ |
| 2.2 ✅ | `backend/unifier.py` | `type(site_records) == list` → `isinstance(...)` | S | none | ✓ |
| 2.3 | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | |
| 2.4 | `backend/transformer.py:272`, `backend/record.py:70` | `record_type == "analytes"/"waterlevels"` string compares → constants | S | none | |
| 2.5 | `frontend/api/app.py` | Hoist magic strings to constants: bucket `"die_cache"` (×3), queue `"die-queue"`, header casing | S | none | |
| 2.6 | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP` instead of hardcoded 2-item list | M | none | |
| 2.7 | `orchestration/definitions.py:155,204` | Hoist default cron `"0 6 * * *"` + timezone `"America/Denver"` to constants | S | none | |

---

## Tier 3 — De-duplicate knowledge across layers (medium)

| # | Location | Action | Effort | ✓ |
|---|----------|--------|--------|---|
| 3.1 | `frontend/cli.py:36-121` (`--no-X` flags) + `cli.py:385-398` (hardcoded agency list in `sites()`) | Derive from `backend.config.SOURCE_KEYS` — the remaining hardcoded source list after #101/#102. Click flags are harder to generate dynamically; at minimum dedup the `sites()` list. | M | ✓ |
| 3.2 | `orchestration/definitions.py:60-68` (`_SUPPORTED_OUTPUT_TYPES`) vs `products.py` combine `if/elif` chain | Single registry mapping `output_type → (dumper, is_summary)`; both the supported-set and the dispatch derive from it. Adding an output type → one entry. | M | |
| 3.3 | `backend/config.py:79` `PARAMETER_SOURCE_MAP[WATERLEVELS]` | Derive the waterlevels agency list from the `SOURCES` registry (== sources with a waterlevel class; the desync test already asserts equality). Keep analyte entries authored. | S | ✓ |

---

## Tier 4 — Larger refactors (separate efforts, not quick wins)

- **`OutputMode` enum** — replace the 3 output bools (`output_summary` / `output_timeseries_unified` / `output_timeseries_separated`) + stringly-typed dispatch in `cli.py` and `die_config.py`, and make `output_format` an `OutputFormat` enum rather than a bare string. (Touches CLI/API/orchestration; the validation in #101 is a stopgap.)
- **Merge `read_summary` / `read_timeseries`** (`backend/source.py:375-433`) — ~85% identical; extract the shared fetch/clean/iterate skeleton.
- **`config.validate()` `sys.exit(2)` → raise** — library code shouldn't exit the process; raise a `ConfigError` and let CLI translate to an exit code. (Affects callers; needs care.)
- **Hoist connector `_extract_*` duplication** — `_extract_source_parameter_results/dates/units` repeat dict/list access across connectors; lift common shapes to a base.
- **Spatial-filter precedence** — `bbox_bounding_points` (bbox first) vs `bounding_wkt` (wkt first) resolve multiple filters differently; #101 warns, but unify the precedence. Consider a small `Scope` value object and drop the `wkt=None`-means-statewide magic in `die_config`.
- **Remove deprecated shims** — `transformer_klass`, `_SubclassValidatorShim`, `_validate_record` (`backend/source.py`). Verify no connector still relies on the override path first.

---

### Suggested execution order
1. Tier 1 as one PR (pure deletions; tests + `dg check` are the gate).
2. Tier 2 incl. the requested flatten (2.1) as one small PR.
3. Tier 3 piecemeal.
4. Tier 4 each as its own scoped PR.
48 changes: 0 additions & 48 deletions frontend/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,14 @@ class ConfigModel(BaseModel):
wkt: str = ""
site_limit: int = 0
force: bool = False
sources: list = []
output_name: str = ""
output_summary: bool = True
start_date: str = ""
end_date: str = ""


# def create_queue(project: str, location: str, queue_id: str) -> tasks_v2.Queue:
# """Create a queue.
# Args:
# project: The project ID to create the queue in.
# location: The location to create the queue in.
# queue_id: The ID to use for the new queue.
#
# Returns:
# The newly created queue.
# """
#
# # Create a client.
# client = tasks_v2.CloudTasksClient()
# queue_path = client.queue_path(project, location, queue_id)
# queue = client.get_queue(name=queue_path)
# if not queue:
# # Use the client to send a CreateQueueRequest.
# client.create_queue(
# tasks_v2.CreateQueueRequest(
# parent=client.common_location_path(project, location),
# queue=tasks_v2.Queue(name=queue_path),
# )
# )


@app.post("/trigger_unify_waterlevels")
def router_unify_waterlevels(item: ConfigModel):
print("unify waterlevels", item)

exists = False

cfgobj = item.model_dump()
Expand All @@ -98,9 +70,6 @@ def router_unify_waterlevels(item: ConfigModel):
exists = bucket.blob(f"{itemhash}.csv").exists()
else:
bucket = storage_client.bucket("die_cache")
# combined_exists = bucket.blob(f"{itemhash}.combined.csv").exists()
# timeseries_exists = bucket.blob(f"{itemhash}_timeseries/sites.csv").exists()
# exists = combined_exists or timeseries_exists
exists = bucket.blob(f"{itemhash}.zip").exists()

response = None
Expand All @@ -111,22 +80,14 @@ def router_unify_waterlevels(item: ConfigModel):
url = os.getenv("WORKER_URL")
queue = "die-queue"

task_id = None

cfgobj["output_name"] = itemhash
# Construct the task.
name = None
if task_id is not None:
name = client.task_path(project, location, queue, task_id)

task = tasks_v2.Task(
http_request=tasks_v2.HttpRequest(
http_method=tasks_v2.HttpMethod.POST,
url=f"{url}/unify_waterlevels",
headers={"Content-type": "application/json"},
body=json.dumps(cfgobj).encode(),
),
name=name,
)
response = client.create_task(
tasks_v2.CreateTaskRequest(
Expand All @@ -136,15 +97,6 @@ def router_unify_waterlevels(item: ConfigModel):
task=task,
)
)
# parent = client.queue_path(project, location, queue)
# task = {
# 'app_engine_http_request': {
# 'http_method': 'POST',
# 'relative_uri': f'{url}/unify_waterlevels',
# 'body': jcfg
# }
# }
# response = client.create_task(parent=parent, task=task)

response = {"name": response.name, "dispatch_count": response.dispatch_count}

Expand Down
3 changes: 0 additions & 3 deletions frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
from backend.logger import setup_logging


# setup_logging()


@click.group()
def cli():
pass
Expand Down
2 changes: 1 addition & 1 deletion orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _group_for_param(parameter: str) -> str:
def _param_source_keys(product: dict, parameter: str) -> list[str]:
"""Source keys that apply to *parameter* for this product: the parameter's
agencies filtered by the product's include/exclude list."""
agencies = list(PARAMETER_SOURCE_MAP[parameter]["agencies"])
agencies = list(PARAMETER_SOURCE_MAP[parameter])
spec = product.get("sources", {}) or {}
if spec.get("include"):
return [a for a in agencies if a in spec["include"]]
Expand Down
10 changes: 5 additions & 5 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ def _build_graph(products_config: dict):
unified for both summary and timeseries, so a source two products both need
(in either mode) is fetched once, not twice.

Returns ``(source_assets, pipeline_assets, specs_by_pid, all_specs)`` where
``specs_by_pid`` maps a product id to the source specs its combine consumes
and ``all_specs`` is the deduped set of source specs."""
Returns ``(source_assets, pipeline_assets, specs_by_pid)`` where
``specs_by_pid`` maps a product id to the source specs its combine
consumes."""
specs_by_pid: dict[str, list] = {}
all_specs: dict = {} # SourceSpec -> SourceSpec (dedup; namedtuple is hashable)
for product in _products(products_config):
Expand All @@ -107,7 +107,7 @@ def _build_graph(products_config: dict):
build_product_pipeline_assets(product, specs_by_pid[product["id"]])
)

return source_assets, pipeline_assets, specs_by_pid, all_specs
return source_assets, pipeline_assets, specs_by_pid


def _cohort_key(specs) -> tuple[str, str]:
Expand Down Expand Up @@ -208,7 +208,7 @@ def _build_schedules(


_products_config = _load_products()
_source_assets, _pipeline_assets, _specs_by_pid, _all_specs = _build_graph(_products_config)
_source_assets, _pipeline_assets, _specs_by_pid = _build_graph(_products_config)
_assets = _source_assets + _pipeline_assets
_cohorts = _build_cohorts(_products_config, _specs_by_pid)
_cohort_jobs = _build_cohort_jobs(_cohorts, _specs_by_pid)
Expand Down
Loading
Loading