diff --git a/backend/config.py b/backend/config.py index d9f209f..5ff5430 100644 --- a/backend/config.py +++ b/backend/config.py @@ -76,25 +76,30 @@ from backend.logger import make_logger +# Which sources report each parameter (empirical availability). A plain +# parameter -> [source_key, ...] map; the waterlevels list mirrors the sources +# with a waterlevel class in the SOURCES registry (asserted by +# tests/test_source_registry.py), while the analyte lists are authored because +# they encode which analytes each agency actually reports. PARAMETER_SOURCE_MAP = { - WATERLEVELS: {"agencies": ["bernco", "cabq", "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", "wqp"]}, - CARBONATE: {"agencies": ["nmbgmr_amp", "wqp"]}, - ARSENIC: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"]}, - URANIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"]}, - SPECIFIC_CONDUCTANCE: {"agencies": ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - CONDUCTIVITY: {"agencies": ["bor", "nmose_isc_seven_rivers", "wqp"]}, - BICARBONATE: {"agencies": ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - CALCIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - CHLORIDE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - FLUORIDE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - MAGNESIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - NITRATE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - PH: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - POTASSIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - SILICA: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - SODIUM: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - SULFATE: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, - TDS: {"agencies": ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"]}, + WATERLEVELS: ["bernco", "cabq", "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", "wqp"], + CARBONATE: ["nmbgmr_amp", "wqp"], + ARSENIC: ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"], + URANIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "wqp"], + SPECIFIC_CONDUCTANCE: ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + CONDUCTIVITY: ["bor", "nmose_isc_seven_rivers", "wqp"], + BICARBONATE: ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + CALCIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + CHLORIDE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + FLUORIDE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + MAGNESIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + NITRATE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + PH: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + POTASSIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + SILICA: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + SODIUM: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + SULFATE: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], + TDS: ["bor", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"], } @dataclass(frozen=True) @@ -280,10 +285,9 @@ def _load_from_yaml(self, path): self.warn(f"Config file {path} not found") def get_config_and_false_agencies(self): - entry = PARAMETER_SOURCE_MAP.get(self.parameter) - if entry is None: + config_agencies = PARAMETER_SOURCE_MAP.get(self.parameter) + if config_agencies is None: raise ValueError(f"Unknown parameter {self.parameter!r}. Valid parameters: {sorted(PARAMETER_SOURCE_MAP)}") - config_agencies = entry["agencies"] false_agencies = [a for a in SOURCE_KEYS if a not in config_agencies] return config_agencies, false_agencies @@ -309,9 +313,6 @@ def all_site_sources(self): source.set_config(self) sources.append((source, None)) - # pods = NMOSEPODSiteSource() - # pods.set_config(self) - # sources.append((pods, None)) return sources def analyte_sources(self): diff --git a/backend/logger.py b/backend/logger.py index ae23cf0..313559b 100644 --- a/backend/logger.py +++ b/backend/logger.py @@ -72,7 +72,6 @@ def setup_logging(level=None, log_format=None, path=None): else: path = os.path.join(path, "die.log") - # shandler = logging.StreamHandler() rhandler = RotatingFileHandler(path, maxBytes=1e8, backupCount=50) _managed_handlers.append(rhandler) diff --git a/backend/record.py b/backend/record.py index 6befa6f..4489a99 100644 --- a/backend/record.py +++ b/backend/record.py @@ -50,9 +50,6 @@ def update(self, **kw): self._payload.update(kw) def _get_sigfig_formatted_value(self, attr): - # v = self._payload.get(attr) - # if v is None and self.defaults: - # v = self.defaults.get(attr) v = self.__getattr__(attr) field_sigfigs = [ diff --git a/backend/unifier.py b/backend/unifier.py index 8c3c4cf..1800b41 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -81,44 +81,6 @@ def unify_sites(config): return True -# def _perister_factory(config): -# """ -# Determines the type of persister to use based on the configuration. The -# persister types are: - -# - CSVPersister -# - CloudStoragePersister -# - GeoJSONPersister - -# Parameters -# ------- -# config: Config -# The configuration object - -# Returns -# ------- -# Persister -# The persister object to use -# """ -# persister_klass = CSVPersister -# if config.use_cloud_storage: -# persister_klass = CloudStoragePersister -# elif config.output_format == OutputFormat.CSV: -# persister_klass = CSVPersister -# elif config.output_format == OutputFormat.GEOJSON: -# persister_klass = GeoJSONPersister -# elif config.output_format == OutputFormat.GEOSERVER: -# persister_klass = GeoServerPersister - -# return persister_klass(config) - - -# def _unify_wrapper(config, func): -# persister = _perister_factory(config) -# func(persister) -# persister.save(config.output_path) - - def _site_wrapper(site_source, parameter_source, persister, config, raise_errors=False): try: @@ -162,7 +124,7 @@ def _site_wrapper(site_source, parameter_source, persister, config, raise_errors persister.sites.extend(sites) else: for site_records in site_source.chunks(sites): - if type(site_records) == list: + if isinstance(site_records, list): n = len(site_records) if first_flag: first_flag = False diff --git a/backend/worker.py b/backend/worker.py index 049c0ee..ed629a5 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -88,15 +88,6 @@ def county_bounds_handler(): return make_cors_response({"wkt": bounds}) -# @app.route("/sources_in_polygon") -# def sources_in_polygon_handler(): -# from backend.unifier import get_sources_in_polygon -# polygon = request.args.get("wkt") -# sources = get_sources_in_polygon(polygon) -# -# return make_cors_response({"sources": sources}) - - @app.route("/unify_analytes", methods=["POST"]) def unify_analytes_handler(): from backend.unifier import unify_analytes diff --git a/docs/cleanup-todo.md b/docs/cleanup-todo.md new file mode 100644 index 0000000..fd460cb --- /dev/null +++ b/docs/cleanup-todo.md @@ -0,0 +1,72 @@ +# DIE Cleanup TODO + +> **Status:** Tier 1 (all), plus 2.1 and 2.2, are **DONE** (this PR). Remaining: +> Tier 2 items 2.3–2.7, all of Tier 3, all of Tier 4. + +Prioritized cleanup backlog from a code-analysis sweep (backend + frontend + +orchestration). Each item: location, effort (S/M/L), risk, and whether it +changes behavior. Items marked ✓ were confirmed against current `main` +(post #95 dual-fetch, #99 descriptions, #100 new products, #101/#102 config +registry). Tiers are ordered by safety — Tier 1 is batchable into one no-risk PR. + +--- + +## Tier 1 — Dead / commented-out code (no behavior change, safe) — ✅ DONE + +| # | Location | Action | Effort | ✓ | +|---|----------|--------|--------|---| +| 1.1 | `backend/unifier.py:84-113` | Delete commented `_perister_factory()` block (~30 lines) | S | ✓ | +| 1.2 | `backend/unifier.py:116-120` | Delete commented `_unify_wrapper()` | S | ✓ | +| 1.3 | `backend/config.py` `all_site_sources()` | Delete commented `pods` wiring (3 lines) | S | ✓ | +| 1.4 | `frontend/api/app.py:60-82` | Delete commented `create_queue()` (~23 lines) | S | ✓ | +| 1.5 | `frontend/api/app.py:101-103, 139-147` | Delete commented cache-check + alt Cloud Tasks payload | S | ✓ | +| 1.6 | `frontend/api/app.py:87` | Remove `print("unify waterlevels", item)` debug | S | ✓ | +| 1.7 | `frontend/api/app.py:114-119` | Remove dead `task_id is not None` path (never set) | S | ✓ | +| 1.8 | `frontend/api/app.py` `ConfigModel.sources` | Remove field — accepted on POST but never read | S | ✓ | +| 1.9 | `orchestration/definitions.py:211` | Drop `_all_specs` from `_build_graph` return + unpack — unused | S | ✓ | +| 1.10 | `orchestration/resources/die_config.py:44-56` | Drop `is_summary`/`output_format` mapping — **now dead**: source asset always passes synth `output_type="ogc_timeseries"` and `unify_source_both` ignores both. (Also resolves the missing `ogc_hardness`/`ogc_water_type` in the `is_summary` tuple — harmless today, but the whole block goes.) | S | ✓ | +| 1.11 | `backend/worker.py:91-97` | Delete commented `sources_in_polygon_handler()` | S | | +| 1.12 | `backend/record.py:52-55`, `frontend/cli.py:28`, `backend/logger.py:75` | Delete misc commented lines | S | | + +--- + +## Tier 2 — Consistency / small tidy (low risk) + +| # | Location | Action | Effort | Behavior | ✓ | +|---|----------|--------|--------|----------|---| +| **2.1 (requested) ✅** | `backend/config.py:79-97` + reads at `config.py`, `orchestration/assets/products.py:169`, 2 tests | **Flatten `PARAMETER_SOURCE_MAP`**: `param: {"agencies":[...]}` → `param: [...]`. `"agencies"` is the only key ever present; every read is `["agencies"]`. | S | none | ✓ | +| 2.2 ✅ | `backend/unifier.py` | `type(site_records) == list` → `isinstance(...)` | S | none | ✓ | +| 2.3 | `backend/config.py` (`parameter == "ph"`) | Use `PH` constant from `constants.py` | S | none | | +| 2.4 | `backend/transformer.py:272`, `backend/record.py:70` | `record_type == "analytes"/"waterlevels"` string compares → constants | S | none | | +| 2.5 | `frontend/api/app.py` | Hoist magic strings to constants: bucket `"die_cache"` (×3), queue `"die-queue"`, header casing | S | none | | +| 2.6 | `frontend/api/app.py` `router_parameters()` | Derive parameter list from `PARAMETER_SOURCE_MAP` instead of hardcoded 2-item list | M | none | | +| 2.7 | `orchestration/definitions.py:155,204` | Hoist default cron `"0 6 * * *"` + timezone `"America/Denver"` to constants | S | none | | + +--- + +## Tier 3 — De-duplicate knowledge across layers (medium) + +| # | Location | Action | Effort | ✓ | +|---|----------|--------|--------|---| +| 3.1 | `frontend/cli.py:36-121` (`--no-X` flags) + `cli.py:385-398` (hardcoded agency list in `sites()`) | Derive from `backend.config.SOURCE_KEYS` — the remaining hardcoded source list after #101/#102. Click flags are harder to generate dynamically; at minimum dedup the `sites()` list. | M | ✓ | +| 3.2 | `orchestration/definitions.py:60-68` (`_SUPPORTED_OUTPUT_TYPES`) vs `products.py` combine `if/elif` chain | Single registry mapping `output_type → (dumper, is_summary)`; both the supported-set and the dispatch derive from it. Adding an output type → one entry. | M | | +| 3.3 | `backend/config.py:79` `PARAMETER_SOURCE_MAP[WATERLEVELS]` | Derive the waterlevels agency list from the `SOURCES` registry (== sources with a waterlevel class; the desync test already asserts equality). Keep analyte entries authored. | S | ✓ | + +--- + +## Tier 4 — Larger refactors (separate efforts, not quick wins) + +- **`OutputMode` enum** — replace the 3 output bools (`output_summary` / `output_timeseries_unified` / `output_timeseries_separated`) + stringly-typed dispatch in `cli.py` and `die_config.py`, and make `output_format` an `OutputFormat` enum rather than a bare string. (Touches CLI/API/orchestration; the validation in #101 is a stopgap.) +- **Merge `read_summary` / `read_timeseries`** (`backend/source.py:375-433`) — ~85% identical; extract the shared fetch/clean/iterate skeleton. +- **`config.validate()` `sys.exit(2)` → raise** — library code shouldn't exit the process; raise a `ConfigError` and let CLI translate to an exit code. (Affects callers; needs care.) +- **Hoist connector `_extract_*` duplication** — `_extract_source_parameter_results/dates/units` repeat dict/list access across connectors; lift common shapes to a base. +- **Spatial-filter precedence** — `bbox_bounding_points` (bbox first) vs `bounding_wkt` (wkt first) resolve multiple filters differently; #101 warns, but unify the precedence. Consider a small `Scope` value object and drop the `wkt=None`-means-statewide magic in `die_config`. +- **Remove deprecated shims** — `transformer_klass`, `_SubclassValidatorShim`, `_validate_record` (`backend/source.py`). Verify no connector still relies on the override path first. + +--- + +### Suggested execution order +1. Tier 1 as one PR (pure deletions; tests + `dg check` are the gate). +2. Tier 2 incl. the requested flatten (2.1) as one small PR. +3. Tier 3 piecemeal. +4. Tier 4 each as its own scoped PR. diff --git a/frontend/api/app.py b/frontend/api/app.py index 5b7a06e..7746122 100644 --- a/frontend/api/app.py +++ b/frontend/api/app.py @@ -50,42 +50,14 @@ class ConfigModel(BaseModel): wkt: str = "" site_limit: int = 0 force: bool = False - sources: list = [] output_name: str = "" output_summary: bool = True start_date: str = "" end_date: str = "" -# def create_queue(project: str, location: str, queue_id: str) -> tasks_v2.Queue: -# """Create a queue. -# Args: -# project: The project ID to create the queue in. -# location: The location to create the queue in. -# queue_id: The ID to use for the new queue. -# -# Returns: -# The newly created queue. -# """ -# -# # Create a client. -# client = tasks_v2.CloudTasksClient() -# queue_path = client.queue_path(project, location, queue_id) -# queue = client.get_queue(name=queue_path) -# if not queue: -# # Use the client to send a CreateQueueRequest. -# client.create_queue( -# tasks_v2.CreateQueueRequest( -# parent=client.common_location_path(project, location), -# queue=tasks_v2.Queue(name=queue_path), -# ) -# ) - - @app.post("/trigger_unify_waterlevels") def router_unify_waterlevels(item: ConfigModel): - print("unify waterlevels", item) - exists = False cfgobj = item.model_dump() @@ -98,9 +70,6 @@ def router_unify_waterlevels(item: ConfigModel): exists = bucket.blob(f"{itemhash}.csv").exists() else: bucket = storage_client.bucket("die_cache") - # combined_exists = bucket.blob(f"{itemhash}.combined.csv").exists() - # timeseries_exists = bucket.blob(f"{itemhash}_timeseries/sites.csv").exists() - # exists = combined_exists or timeseries_exists exists = bucket.blob(f"{itemhash}.zip").exists() response = None @@ -111,14 +80,7 @@ def router_unify_waterlevels(item: ConfigModel): url = os.getenv("WORKER_URL") queue = "die-queue" - task_id = None - cfgobj["output_name"] = itemhash - # Construct the task. - name = None - if task_id is not None: - name = client.task_path(project, location, queue, task_id) - task = tasks_v2.Task( http_request=tasks_v2.HttpRequest( http_method=tasks_v2.HttpMethod.POST, @@ -126,7 +88,6 @@ def router_unify_waterlevels(item: ConfigModel): headers={"Content-type": "application/json"}, body=json.dumps(cfgobj).encode(), ), - name=name, ) response = client.create_task( tasks_v2.CreateTaskRequest( @@ -136,15 +97,6 @@ def router_unify_waterlevels(item: ConfigModel): task=task, ) ) - # parent = client.queue_path(project, location, queue) - # task = { - # 'app_engine_http_request': { - # 'http_method': 'POST', - # 'relative_uri': f'{url}/unify_waterlevels', - # 'body': jcfg - # } - # } - # response = client.create_task(parent=parent, task=task) response = {"name": response.name, "dispatch_count": response.dispatch_count} diff --git a/frontend/cli.py b/frontend/cli.py index be226cf..75ad7a5 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -25,9 +25,6 @@ from backend.logger import setup_logging -# setup_logging() - - @click.group() def cli(): pass diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 23c7a54..02fddd1 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -166,7 +166,7 @@ def _group_for_param(parameter: str) -> str: def _param_source_keys(product: dict, parameter: str) -> list[str]: """Source keys that apply to *parameter* for this product: the parameter's agencies filtered by the product's include/exclude list.""" - agencies = list(PARAMETER_SOURCE_MAP[parameter]["agencies"]) + agencies = list(PARAMETER_SOURCE_MAP[parameter]) spec = product.get("sources", {}) or {} if spec.get("include"): return [a for a in agencies if a in spec["include"]] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 9bca2db..9c9da36 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -88,9 +88,9 @@ def _build_graph(products_config: dict): unified for both summary and timeseries, so a source two products both need (in either mode) is fetched once, not twice. - Returns ``(source_assets, pipeline_assets, specs_by_pid, all_specs)`` where - ``specs_by_pid`` maps a product id to the source specs its combine consumes - and ``all_specs`` is the deduped set of source specs.""" + Returns ``(source_assets, pipeline_assets, specs_by_pid)`` where + ``specs_by_pid`` maps a product id to the source specs its combine + consumes.""" specs_by_pid: dict[str, list] = {} all_specs: dict = {} # SourceSpec -> SourceSpec (dedup; namedtuple is hashable) for product in _products(products_config): @@ -107,7 +107,7 @@ def _build_graph(products_config: dict): build_product_pipeline_assets(product, specs_by_pid[product["id"]]) ) - return source_assets, pipeline_assets, specs_by_pid, all_specs + return source_assets, pipeline_assets, specs_by_pid def _cohort_key(specs) -> tuple[str, str]: @@ -208,7 +208,7 @@ def _build_schedules( _products_config = _load_products() -_source_assets, _pipeline_assets, _specs_by_pid, _all_specs = _build_graph(_products_config) +_source_assets, _pipeline_assets, _specs_by_pid = _build_graph(_products_config) _assets = _source_assets + _pipeline_assets _cohorts = _build_cohorts(_products_config, _specs_by_pid) _cohort_jobs = _build_cohort_jobs(_cohorts, _specs_by_pid) diff --git a/orchestration/resources/die_config.py b/orchestration/resources/die_config.py index 6d43d01..ebacacd 100644 --- a/orchestration/resources/die_config.py +++ b/orchestration/resources/die_config.py @@ -18,15 +18,13 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config: """Translate a products.yaml entry into a finalized DIE ``Config``. Mapping: - - ``output_type`` → ``output_summary`` / ``output_format``. Both - ``ogc_summary`` and ``ogc_major_chemistry`` run in summary mode (the - latter pivots per-analyte summaries into one feature per well). - ``spatial_filter.county`` → ``county``. ``spatial_filter.state`` sets ``wkt = None`` (statewide; DIE applies the NM extent downstream). - ``sources.include`` → enable only those sources (all others off). ``sources.exclude`` → disable those, leave the rest at their defaults. - - ``parameter`` is set on the Config, then ``finalize()`` validates and - resolves output units/paths. + - ``parameter`` is set on the Config, then ``finalize()`` resolves output + units/paths. No output mode is set: the source asset unifies both + summary and timeseries from one fetch (see unify_source_both). *parameter* overrides ``product["parameter"]`` — used by the major-chemistry product, which has no single parameter and calls this @@ -41,20 +39,11 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config: spatial = product.get("spatial_filter", {}) sources_spec = product.get("sources", {}) - output_type = product.get("output_type", "ogc_summary") - is_summary = output_type in ( - "ogc_summary", - "ogc_major_chemistry", - "ogc_mcl_exceedance", - ) - - payload: dict = { - "yes": True, - "output_summary": is_summary, - # backend only distinguishes summary vs timeseries; major-chemistry - # is a summary variant as far as unification is concerned. - "output_format": "ogc_summary" if is_summary else output_type, - } + # The source asset fetches a source once and unifies it for BOTH summary + # and timeseries (unify_source_both toggles output_summary itself), so + # the config need not carry an output mode — only the parameter and + # spatial filter, which select what a single source unifies. + payload: dict = {"yes": True} if spatial.get("county"): payload["county"] = spatial["county"] diff --git a/tests/test_config_validation.py b/tests/test_config_validation.py index 2be251d..717fdc9 100644 --- a/tests/test_config_validation.py +++ b/tests/test_config_validation.py @@ -55,6 +55,6 @@ def test_validate_passes_with_one_mode(self): class TestSourceKeysCanonical: def test_source_keys_cover_parameter_map(self): # every agency referenced by the parameter map is a real source key - for entry in PARAMETER_SOURCE_MAP.values(): - for agency in entry["agencies"]: + for agencies in PARAMETER_SOURCE_MAP.values(): + for agency in agencies: assert agency in SOURCE_KEYS diff --git a/tests/test_source_registry.py b/tests/test_source_registry.py index c7fd536..94319fb 100644 --- a/tests/test_source_registry.py +++ b/tests/test_source_registry.py @@ -35,7 +35,7 @@ def test_waterlevel_agencies_match_registry(): # Every source the parameter map lists for waterlevels must have a # waterlevel source class — and vice versa. registry_wl = {s.key for s in SOURCES if s.waterlevel} - map_wl = set(PARAMETER_SOURCE_MAP[WATERLEVELS]["agencies"]) + map_wl = set(PARAMETER_SOURCE_MAP[WATERLEVELS]) assert map_wl == registry_wl @@ -44,14 +44,14 @@ def test_analyte_agencies_have_analyte_source(): # class in the registry (the map is a subset per analyte; the registry is # the universe of analyte-capable sources). analyte_keys = {s.key for s in SOURCES if s.analyte} - for parameter, entry in PARAMETER_SOURCE_MAP.items(): + for parameter, agencies in PARAMETER_SOURCE_MAP.items(): if parameter == WATERLEVELS: continue - missing = set(entry["agencies"]) - analyte_keys + missing = set(agencies) - analyte_keys assert not missing, f"{parameter}: agencies without an analyte source: {missing}" def test_every_map_agency_is_a_known_source(): - for entry in PARAMETER_SOURCE_MAP.values(): - for agency in entry["agencies"]: + for agencies in PARAMETER_SOURCE_MAP.values(): + for agency in agencies: assert agency in SOURCE_DICT