diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index cce7165..d296b68 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -20,11 +20,15 @@ - **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer. -Job layout (see ``definitions.py``) makes the sharing pay off: a dedicated -``sources_job`` materializes every shared source asset once; each per-product -job selects only its combine + geoserver assets and loads the (already -materialized) source inputs from the GCS IO manager. So a product run never -re-unifies a source another product already produced. +Job layout (see ``definitions.py``) makes the sharing pay off **and** keeps each +run's lineage complete. Products are grouped into *cohorts* by +(group, mode, scope) — the products that can share source assets. One job per +cohort materializes that cohort's whole graph in a single run: each shared +source unifies once (it is one asset key, selected once), then every member +combine reads it back through the GCS IO manager and publishes. So a source is +never fetched twice in a run, while the full sources → combine → geoserver +lineage stays visible for every product. (Cross-product dedup requires the +sharing products to run together; that is exactly what a cohort is.) Design notes: - Source and geoserver assets never hard-fail. They catch their own errors and @@ -34,6 +38,23 @@ - Records cross the IO manager as plain ``_payload`` dicts (the record classes use ``__getattr__`` over ``_payload`` which does not survive pickling). The combine asset rebuilds record objects before dumping. + +Known limitation — per-analyte source fetches (potential future optimization): + Sharing is deduped at the ``(parameter, mode, scope, source)`` grain, which + collapses duplication *across products* (e.g. ``sulfate/summary/state_NM/wqp`` + is one asset shared by nm_major_chemistry and nm_mcl_exceedance). It does NOT + collapse *across analytes*: a source appears once per analyte (e.g. ``wqp`` + has ~13 summary source assets, one per analyte). This is because the backend + unifies a single parameter per pass (``unify_source`` uses one + ``config.parameter``), so each analyte is a separate sweep of the same wells + even though one provider query (WQP/AMP/...) typically returns all analytes at + once. Collapsing this would need a **backend** change — multi-analyte + unification that fetches a source once and emits per-analyte records — after + which the source key could drop ``parameter`` (e.g. + ``["sources", "analytes", mode, scope, source]``) and the analyte combines + would each filter the shared multi-analyte payload. That is the bulk of the + remaining redundant API pulls for analyte products; it touches DIE core, not + this asset graph, so it is intentionally out of scope here. """ import tempfile import traceback diff --git a/orchestration/definitions.py b/orchestration/definitions.py index ee52f55..ba5fd2e 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -22,10 +22,11 @@ class _TolerantGCSPickleIOManager(GCSPickleIOManager): Combine assets load their shared source inputs via ``ins``, and that load happens *before* the asset body runs — so a missing pickle can't be caught - inside the combine. The source assets are materialized by ``sources_job``; - a product job loads their output from GCS without re-running them. On first - deploy — or if a newly added shared source has never been materialized — - the blob is absent and the stock manager would fail the combine hard. + inside the combine. A cohort job materializes a combine's sources in the + same run, so the blob is normally present; but an **ad-hoc** materialization + of just a combine (e.g. re-publish from the Assets UI without re-running the + upstream sources), or a brand-new shared source never yet materialized, + leaves the blob absent and the stock manager would fail the combine hard. Returning an empty payload instead degrades that product to an empty collection (the geoserver asset already soft-fails on 0 features) rather @@ -104,82 +105,111 @@ def _build_graph(products_config: dict): return source_assets, pipeline_assets, specs_by_pid, all_specs -def _product_selection(pid: str) -> dg.AssetSelection: - # Only the product's own assets — combine + geoserver. The shared source - # inputs are NOT selected: the sources job materializes them, and the - # combine loads them from the GCS IO manager. This is what keeps a product - # run from re-unifying a source another product already produced. - return dg.AssetSelection.keys(dg.AssetKey(pid), dg.AssetKey([pid, "geoserver"])) +def _cohort_key(specs) -> tuple[str, str, str]: + """A cohort bundles the products that *can* share source assets — same + parameter group, unification mode, and spatial scope. Materializing a cohort + in one run is what lets each shared source unify once while every member's + full lineage (sources → combine → geoserver) stays visible in that run. + These three fields are constant within a product (all of a product's specs + carry the same mode/scope, and its parameters are all one group), so any + spec is representative.""" + s = specs[0] + return (s.group, s.mode, s.scope) -def _build_product_jobs( - products_config: dict, -) -> dict[str, "UnresolvedAssetJobDefinition"]: - """One asset job per product, selecting only that product's combine + - geoserver assets. Returns {product_id: job} so schedules can target it. The - shared source inputs are loaded from the GCS IO manager, not re-materialized - (see :func:`_product_selection`).""" - jobs = {} + +def _cohort_name(key: tuple[str, str, str]) -> str: + group, mode, scope = key + return f"{group}_{mode}_{scope}" + + +def _cron_sort_key(cron: str) -> tuple[int, int]: + # "min hour * * *" -> (hour, minute) for picking a cohort's earliest member. + parts = cron.split() + try: + return (int(parts[1]), int(parts[0])) + except (IndexError, ValueError): + return (6, 0) + + +def _build_cohorts(products_config: dict, specs_by_pid: dict) -> dict: + """Group products into cohorts keyed by (group, mode, scope). Returns + ``{cohort_name: {"members": [pid, ...], "cron": str}}``; the cohort cron is + the earliest member schedule (members run together, so they share one).""" + cohorts: dict = {} for product in _products(products_config): pid = product["id"] - jobs[pid] = dg.define_asset_job( - name=f"{pid}_job", - selection=_product_selection(pid), - description=f"Publish the {pid} data product (combine → geoserver) from materialized sources.", + specs = specs_by_pid[pid] + if not specs: + continue + name = _cohort_name(_cohort_key(specs)) + cohort = cohorts.setdefault(name, {"members": [], "cron": None}) + cohort["members"].append(pid) + cron = product.get("schedule", "0 6 * * *") + if cohort["cron"] is None or _cron_sort_key(cron) < _cron_sort_key(cohort["cron"]): + cohort["cron"] = cron + return cohorts + + +def _cohort_selection(members: list[str], specs_by_pid: dict) -> dg.AssetSelection: + """The full graph for a cohort: every member's shared source assets (deduped + across members — a shared source resolves to one key, so it is selected once + and materialized once per run), plus every member's combine and geoserver + asset. The result is a complete sources → combine → geoserver lineage with no + duplicated source fetch.""" + keys: set = set() + for pid in members: + for spec in specs_by_pid[pid]: + keys.add(shared_source_key(spec)) + keys.add(dg.AssetKey(pid)) + keys.add(dg.AssetKey([pid, "geoserver"])) + return dg.AssetSelection.keys(*sorted(keys, key=lambda k: k.to_user_string())) + + +def _build_cohort_jobs( + cohorts: dict, specs_by_pid: dict +) -> dict[str, "UnresolvedAssetJobDefinition"]: + """One job per cohort, selecting that cohort's full graph (see + :func:`_cohort_selection`). Returns ``{cohort_name: job}``.""" + jobs = {} + for name, cohort in cohorts.items(): + members = cohort["members"] + jobs[name] = dg.define_asset_job( + name=f"{name}_job", + selection=_cohort_selection(members, specs_by_pid), + description=( + f"Materialize the {name} cohort in one run — shared sources " + f"(each unified once) → combines → geoserver for: " + f"{', '.join(members)}." + ), ) return jobs -def _build_sources_job(all_specs) -> "UnresolvedAssetJobDefinition": - """A single job that materializes every shared source asset once. Product - jobs read these results from the IO manager rather than re-unifying.""" - keys = [shared_source_key(spec) for spec in all_specs] - return dg.define_asset_job( - name="sources_job", - selection=dg.AssetSelection.keys(*keys), - description="Unify every shared source once; product jobs consume the cached results.", - ) - - def _build_schedules( - products_config: dict, - product_jobs: dict[str, "UnresolvedAssetJobDefinition"], - sources_job: "UnresolvedAssetJobDefinition", + cohorts: dict, cohort_jobs: dict[str, "UnresolvedAssetJobDefinition"] ) -> list[dg.ScheduleDefinition]: - # Sources run first (default 05:00), ahead of the product schedules (06:00+), - # so each product publishes from same-day source data. A product that runs - # before the sources job simply reads the prior run's cached source IO. - schedules = [ + return [ dg.ScheduleDefinition( - name="schedule_sources", - job=sources_job, - cron_schedule=products_config.get("sources_schedule", "0 5 * * *"), + name=f"schedule_{name}", + job=cohort_jobs[name], + cron_schedule=cohort["cron"], execution_timezone="America/Denver", ) + for name, cohort in cohorts.items() ] - for product in _products(products_config): - pid = product["id"] - schedules.append( - dg.ScheduleDefinition( - name=f"schedule_{pid}", - job=product_jobs[pid], - cron_schedule=product.get("schedule", "0 6 * * *"), - execution_timezone="America/Denver", - ) - ) - return schedules _products_config = _load_products() _source_assets, _pipeline_assets, _specs_by_pid, _all_specs = _build_graph(_products_config) _assets = _source_assets + _pipeline_assets -_product_jobs = _build_product_jobs(_products_config) -_sources_job = _build_sources_job(_all_specs) -_schedules = _build_schedules(_products_config, _product_jobs, _sources_job) +_cohorts = _build_cohorts(_products_config, _specs_by_pid) +_cohort_jobs = _build_cohort_jobs(_cohorts, _specs_by_pid) +_schedules = _build_schedules(_cohorts, _cohort_jobs) defs = dg.Definitions( assets=_assets, - jobs=[_sources_job, *_product_jobs.values()], + jobs=list(_cohort_jobs.values()), schedules=_schedules, resources={ # USGS_API_KEY is a Dagster+ secret; EnvVar resolves it at run time and @@ -193,11 +223,12 @@ def _build_schedules( ), "geoserver": GeoServerResource(), # Persist asset I/O to GCS instead of the serverless run's ephemeral - # /tmp. This is what lets a product job load its shared source inputs - # (materialized by sources_job) without re-running them. The tolerant - # subclass returns an empty payload when a source's blob is absent, so a - # combine never hard-fails on a not-yet-materialized source (e.g. on a - # fresh deploy — run sources_job once before the product jobs). + # /tmp. A cohort run materializes a shared source once and every member + # combine reads it back through this manager (so the source unifies once + # even though multiple combines consume it). The tolerant subclass + # returns an empty payload when a source's blob is absent, so an ad-hoc + # combine-only materialization (or a never-yet-run new source) degrades + # to an empty collection instead of hard-failing. "io_manager": _TolerantGCSPickleIOManager( gcs=AuthedGCSResource(), gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"),