diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index b268e82..cce7165 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -1,32 +1,43 @@ -"""Per-source Dagster asset graph for a product. - -``build_product_assets(product)`` expands one products.yaml entry into a small -asset graph: - - sources/ ─┐ - sources/ ─┼─▶ ─▶ /geoserver - sources/ ─┘ (combine) (publish) - -- **source assets** — keyed ``[product_id, "sources", ]``, one per - data source that provides the product's parameter. Each runs DIE unification - for just that source and emits its records/sites/timeseries. -- **combine asset** — keyed ``[product_id]``. Merges every source's - contribution, writes the OGC GeoJSON collection, and uploads it to GCS. +"""Shared-source Dagster asset graph for the data products. + +The graph has two layers wired through the GCS IO manager: + + shared source assets per-product pipeline + ["sources", param, mode, scope, k] ──▶ ──▶ /geoserver + (combine) (publish) + +- **shared source assets** — keyed ``["sources", , , , + ]``. One per *distinct* (parameter, mode, scope, source) tuple + across **all** products. ``mode`` is ``summary`` or ``timeseries`` (the + backend only distinguishes these two unification modes); ``scope`` encodes the + spatial filter (``state_NM`` / ``county_Bernalillo`` / ``all``). Because the + key is product-independent, every product that needs the same source under the + same parameter/mode/scope shares one asset — the unification runs **once** per + run instead of once per product. Each asset unifies a single parameter for a + single source and emits its records/sites/timeseries. +- **combine asset** — keyed ``[product_id]``. Reads its source inputs back + (``ins``), merges them, writes the OGC GeoJSON collection, uploads to GCS. - **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the - combined GeoJSON, converts it to a GeoPackage, and publishes it as a layer in - GeoServer. + combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer. + +Job layout (see ``definitions.py``) makes the sharing pay off: a dedicated +``sources_job`` materializes every shared source asset once; each per-product +job selects only its combine + geoserver assets and loads the (already +materialized) source inputs from the GCS IO manager. So a product run never +re-unifies a source another product already produced. Design notes: - Source and geoserver assets never hard-fail. They catch their own errors and report status via an ``AssetCheckResult`` that goes red (WARN) on error or empty output, so one dead source — or a GeoServer outage — surfaces in the UI - without blocking the rest of the product graph. + without blocking the rest of the graph. - Records cross the IO manager as plain ``_payload`` dicts (the record classes use ``__getattr__`` over ``_payload`` which does not survive pickling). The combine asset rebuilds record objects before dumping. """ import tempfile import traceback +from collections import namedtuple from collections.abc import Iterator from datetime import datetime, timezone from pathlib import Path @@ -58,8 +69,9 @@ # truth for the ogc_mcl_exceedance product. _MCL_KEY = "config/mcl.json" -# Multi-analyte products that gather one summary record per analyte per well. -_MULTI_ANALYTE_OUTPUT_TYPES = ("ogc_major_chemistry", "ogc_mcl_exceedance") +# Output types whose unification runs in summary mode. The backend only +# distinguishes summary vs timeseries; everything else unifies as timeseries. +_SUMMARY_OUTPUT_TYPES = ("ogc_summary", "ogc_major_chemistry", "ogc_mcl_exceedance") # Classic major-ion suite for the ogc_major_chemistry product. One feature per # well, with each analyte's latest value/units/date as properties. @@ -74,6 +86,11 @@ "sulfate", ] +# A single shared source asset's identity. Two products that produce the same +# SourceSpec share one asset (the namedtuple is hashable, so dedup is just set +# membership). ``group`` follows the parameter, not the product. +SourceSpec = namedtuple("SourceSpec", "parameter mode scope source_key group") + def _product_params(product: dict) -> list[str]: """The DIE parameter(s) a product unifies. Single-parameter products yield @@ -88,14 +105,41 @@ def _product_params(product: dict) -> list[str]: return [product["parameter"]] -def _product_source_keys(product: dict) -> list[str]: - """Source keys that apply to this product: the union of its parameters' - agencies, filtered by the product's include/exclude list.""" - agencies: list = [] - for param in _product_params(product): - for a in PARAMETER_SOURCE_MAP[param]["agencies"]: - if a not in agencies: - agencies.append(a) +def _product_mode(product: dict) -> str: + """``summary`` or ``timeseries`` — the unification mode the product needs.""" + return "summary" if product.get("output_type") in _SUMMARY_OUTPUT_TYPES else "timeseries" + + +def _spatial_scope(product: dict) -> str: + """A stable string identity for the product's spatial filter. Sources with + different spatial extents unify to different results, so the extent is part + of the shared-source key.""" + sf = product.get("spatial_filter", {}) or {} + if sf.get("county"): + return f"county_{sf['county']}" + if sf.get("state"): + return f"state_{sf['state']}" + return "all" + + +def _scope_to_spatial_filter(scope: str) -> dict: + """Inverse of :func:`_spatial_scope`, for rebuilding a config from a spec.""" + kind, _, value = scope.partition("_") + if kind == "county": + return {"county": value} + if kind == "state": + return {"state": value} + return {} + + +def _group_for_param(parameter: str) -> str: + return "waterlevels" if parameter == WATERLEVELS else "analytes" + + +def _param_source_keys(product: dict, parameter: str) -> list[str]: + """Source keys that apply to *parameter* for this product: the parameter's + agencies filtered by the product's include/exclude list.""" + agencies = list(PARAMETER_SOURCE_MAP[parameter]["agencies"]) spec = product.get("sources", {}) or {} if spec.get("include"): return [a for a in agencies if a in spec["include"]] @@ -104,28 +148,52 @@ def _product_source_keys(product: dict) -> list[str]: return agencies -def _in_name(source_key: str) -> str: - # Combine-asset input kwargs must be valid Python identifiers; source keys - # may contain hyphens, so sanitize and prefix. - return f"src_{source_key.replace('-', '_')}" - - -def _build_source_asset( - product: dict, source_key: str, group: str -) -> tuple[dg.AssetsDefinition, dg.AssetKey]: - """Build the asset that unifies a single source for *product*. - - Returns ``(asset_def, asset_key)``. The asset never raises: on failure it - records the traceback and fails its ``returned_data`` check (WARN) instead, - so a broken source does not block the combine asset. Output is shipped as - plain ``_payload`` dicts for IO-manager pickling (see module docstring).""" - pid = product["id"] - src_key = dg.AssetKey([pid, "sources", source_key]) - params = _product_params(product) +def product_source_specs(product: dict) -> list[SourceSpec]: + """Every shared source asset this product depends on, one per + (parameter, source) pair. ``mode`` and ``scope`` are constant for a product; + the parameter and source vary. Returned in a stable order.""" + mode = _product_mode(product) + scope = _spatial_scope(product) + specs: list[SourceSpec] = [] + for param in _product_params(product): + group = _group_for_param(param) + for source_key in _param_source_keys(product, param): + specs.append(SourceSpec(param, mode, scope, source_key, group)) + return specs + + +def shared_source_key(spec: SourceSpec) -> dg.AssetKey: + return dg.AssetKey(["sources", spec.parameter, spec.mode, spec.scope, spec.source_key]) + + +def _in_name(spec: SourceSpec) -> str: + # Combine-asset input kwargs must be valid Python identifiers; parameter and + # source keys may contain hyphens, so sanitize. (parameter, source) is + # unique within a product, so it disambiguates the multi-analyte combines. + raw = f"src_{spec.parameter}_{spec.source_key}" + return raw.replace("-", "_") + + +def build_shared_source_asset(spec: SourceSpec) -> dg.AssetsDefinition: + """Build the shared asset that unifies one source for one (parameter, mode, + scope) — keyed product-independently so every product needing it shares it. + + The asset never raises: on failure it records the traceback and fails its + ``returned_data`` check (WARN) instead, so a broken source does not block any + product's combine asset. Output ships as plain ``_payload`` dicts for + IO-manager pickling (see module docstring).""" + src_key = shared_source_key(spec) + # Synthetic product spec driving config: only parameter, mode, and spatial + # filter affect a single source's unification (sources include/exclude only + # selects which sources a product consumes — irrelevant here). + synth_product = { + "output_type": "ogc_summary" if spec.mode == "summary" else "ogc_timeseries", + "spatial_filter": _scope_to_spatial_filter(spec.scope), + } @dg.asset( key=src_key, - group_name=group, + group_name=spec.group, check_specs=[dg.AssetCheckSpec(name=_CHECK_NAME, asset=src_key)], ) def _source_asset( @@ -136,24 +204,20 @@ def _source_asset( sites: list[dict] = [] timeseries: list[list[dict]] = [] try: - # One unification pass per parameter. Single-parameter products run - # once; the major-chemistry product runs once per analyte and - # accumulates summary records (analyte identity lives in each - # record's parameter_name). A source that doesn't provide a given - # parameter is skipped by unify_source (source_pair → None). + # A source that doesn't provide this parameter is skipped by + # unify_source (source_pair → None). with forward_die_logs(context): - for param in params: - config = die_config.get_config(product, parameter=param) - persister = unify_source(config, source_key) - # Ship plain dicts across the IO manager; rebuild in combine. - records.extend(r._payload for r in persister.records) - sites.extend(s._payload for s in persister.sites) - timeseries.extend( - [o._payload for o in site_ts] for site_ts in persister.timeseries - ) + config = die_config.get_config(synth_product, parameter=spec.parameter) + persister = unify_source(config, spec.source_key) + # Ship plain dicts across the IO manager; rebuild in combine. + records.extend(r._payload for r in persister.records) + sites.extend(s._payload for s in persister.sites) + timeseries.extend( + [o._payload for o in site_ts] for site_ts in persister.timeseries + ) except Exception: error = traceback.format_exc() - context.log.error(f"Source {source_key} failed:\n{error}") + context.log.error(f"Source {spec.source_key} failed:\n{error}") obs_count = sum(len(t) for t in timeseries) payload = {"records": records, "sites": sites, "timeseries": timeseries} @@ -164,7 +228,10 @@ def _source_asset( yield dg.Output( payload, metadata={ - "source": source_key, + "source": spec.source_key, + "parameter": spec.parameter, + "mode": spec.mode, + "scope": spec.scope, "record_count": len(records), "site_count": len(sites), "observation_count": obs_count, @@ -183,27 +250,22 @@ def _source_asset( }, ) - return _source_asset, src_key + return _source_asset def _build_combine_asset( - product: dict, - source_keys: list[str], - source_asset_keys: list[dg.AssetKey], - group: str, + product: dict, specs: list[SourceSpec], group: str ) -> dg.AssetsDefinition: """Build the combine asset (keyed ``[product_id]``) for *product*. - Depends on every source asset (wired via ``ins``), merges their - records/sites/timeseries, writes the OGC GeoJSON collection — summary, - timeseries, major-chemistry, or waterlevel-trend depending on - ``output_type`` — and uploads it to GCS.""" + Depends on every shared source asset it needs (wired via ``ins``), merges + their records/sites/timeseries, writes the OGC GeoJSON collection — summary, + timeseries, major-chemistry, or trend depending on ``output_type`` — and + uploads it to GCS. The source inputs are loaded from the GCS IO manager + (materialized by the sources job), so the combine never re-unifies them.""" pid = product["id"] output_type = product["output_type"] - ins = { - _in_name(k): dg.AssetIn(key=ak) - for k, ak in zip(source_keys, source_asset_keys) - } + ins = {_in_name(spec): dg.AssetIn(key=shared_source_key(spec)) for spec in specs} @dg.asset(key=dg.AssetKey(pid), group_name=group, ins=ins) def _combine_asset( @@ -380,22 +442,15 @@ def _geoserver_asset( return _geoserver_asset -def build_product_assets(product: dict) -> list[dg.AssetsDefinition]: - """Return the full asset list for *product*: one source asset per applicable - source, the combine asset, and the geoserver publish asset (see module - docstring for the graph shape). Assets are grouped ``waterlevels`` or - ``analytes`` by parameter (major-chemistry products group under - ``analytes``).""" +def build_product_pipeline_assets( + product: dict, specs: list[SourceSpec] +) -> list[dg.AssetsDefinition]: + """Return the product's own assets — the combine asset and the geoserver + publish asset. The shared source assets it consumes (``specs``) are built + once by :func:`build_shared_source_asset` in ``definitions.py``, not here, so + products sharing a source share one asset. The combine's group follows its + parameter family (waterlevels vs analytes).""" group = "waterlevels" if product.get("parameter") == WATERLEVELS else "analytes" - source_keys = _product_source_keys(product) - - source_assets = [] - source_asset_keys = [] - for sk in source_keys: - asset, key = _build_source_asset(product, sk, group) - source_assets.append(asset) - source_asset_keys.append(key) - - combine = _build_combine_asset(product, source_keys, source_asset_keys, group) + combine = _build_combine_asset(product, specs, group) geoserver = _build_geoserver_asset(product, group) - return source_assets + [combine, geoserver] + return [combine, geoserver] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 38de0fc..ee52f55 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -5,11 +5,48 @@ import dagster as dg import yaml from dagster_gcp.gcs import GCSPickleIOManager +from google.api_core.exceptions import NotFound from orchestration.resources.die_config import DIEConfigResource from orchestration.resources.gcs import GCSResource, AuthedGCSResource from orchestration.resources.geoserver import GeoServerResource -from orchestration.assets.products import build_product_assets +from orchestration.assets.products import ( + build_product_pipeline_assets, + build_shared_source_asset, + product_source_specs, + shared_source_key, +) + +class _TolerantGCSPickleIOManager(GCSPickleIOManager): + """GCS pickle IO manager that tolerates a missing source input. + + Combine assets load their shared source inputs via ``ins``, and that load + happens *before* the asset body runs — so a missing pickle can't be caught + inside the combine. The source assets are materialized by ``sources_job``; + a product job loads their output from GCS without re-running them. On first + deploy — or if a newly added shared source has never been materialized — + the blob is absent and the stock manager would fail the combine hard. + + Returning an empty payload instead degrades that product to an empty + collection (the geoserver asset already soft-fails on 0 features) rather + than crashing the run. A source asset always writes a payload when it + actually runs (it soft-fails to an empty payload, never to *no* output), so + a missing blob only ever means "never materialized", never "ran and lost + its data" — which is why swallowing NotFound here can't mask a real error. + """ + + def load_input(self, context: dg.InputContext): + try: + return super().load_input(context) + except (NotFound, FileNotFoundError): + context.log.warning( + f"Source input {context.asset_key.to_user_string()!r} not found " + "in GCS; treating as empty. Run sources_job before the product " + "jobs (it is scheduled ahead of them, but must run once on a " + "fresh deploy)." + ) + return {} + if TYPE_CHECKING: # define_asset_job returns this; it isn't a public dagster export. @@ -34,50 +71,98 @@ def _load_products() -> dict: return yaml.safe_load(_PRODUCTS_PATH.read_text()) -def _build_assets(products_config: dict) -> list[dg.AssetsDefinition]: - assets = [] +def _products(products_config: dict) -> Iterator[dict]: for product in products_config["products"]: if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES: - assets.extend(build_product_assets(product)) - return assets + yield product -def _product_selection(pid: str) -> dg.AssetSelection: - # The full per-product graph: the geoserver leaf plus everything upstream of - # it (combine asset + source assets). - return dg.AssetSelection.keys(dg.AssetKey([pid, "geoserver"])).upstream() +def _build_graph(products_config: dict): + """Build every asset once. Source assets are shared across products: each + distinct (parameter, mode, scope, source) tuple becomes one asset, so a + source two products both need is unified once, not twice. + Returns ``(source_assets, pipeline_assets, specs_by_pid, all_specs)`` where + ``specs_by_pid`` maps a product id to the source specs its combine consumes + and ``all_specs`` is the deduped set of source specs.""" + specs_by_pid: dict[str, list] = {} + all_specs: dict = {} # SourceSpec -> SourceSpec (dedup; namedtuple is hashable) + for product in _products(products_config): + specs = product_source_specs(product) + specs_by_pid[product["id"]] = specs + for spec in specs: + all_specs.setdefault(spec, spec) -def _products(products_config: dict) -> Iterator[dict]: - for product in products_config["products"]: - if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES: - yield product + source_assets = [build_shared_source_asset(spec) for spec in all_specs] + + pipeline_assets: list[dg.AssetsDefinition] = [] + for product in _products(products_config): + pipeline_assets.extend( + build_product_pipeline_assets(product, specs_by_pid[product["id"]]) + ) + + return source_assets, pipeline_assets, specs_by_pid, all_specs -def _build_jobs(products_config: dict) -> dict[str, "UnresolvedAssetJobDefinition"]: - """One asset job per product, selecting that product's whole graph. - Returns {product_id: job} so schedules can target the job.""" +def _product_selection(pid: str) -> dg.AssetSelection: + # Only the product's own assets — combine + geoserver. The shared source + # inputs are NOT selected: the sources job materializes them, and the + # combine loads them from the GCS IO manager. This is what keeps a product + # run from re-unifying a source another product already produced. + return dg.AssetSelection.keys(dg.AssetKey(pid), dg.AssetKey([pid, "geoserver"])) + + +def _build_product_jobs( + products_config: dict, +) -> dict[str, "UnresolvedAssetJobDefinition"]: + """One asset job per product, selecting only that product's combine + + geoserver assets. Returns {product_id: job} so schedules can target it. The + shared source inputs are loaded from the GCS IO manager, not re-materialized + (see :func:`_product_selection`).""" jobs = {} for product in _products(products_config): pid = product["id"] jobs[pid] = dg.define_asset_job( name=f"{pid}_job", selection=_product_selection(pid), - description=f"Materialize the {pid} data product (sources → combine → geoserver).", + description=f"Publish the {pid} data product (combine → geoserver) from materialized sources.", ) return jobs +def _build_sources_job(all_specs) -> "UnresolvedAssetJobDefinition": + """A single job that materializes every shared source asset once. Product + jobs read these results from the IO manager rather than re-unifying.""" + keys = [shared_source_key(spec) for spec in all_specs] + return dg.define_asset_job( + name="sources_job", + selection=dg.AssetSelection.keys(*keys), + description="Unify every shared source once; product jobs consume the cached results.", + ) + + def _build_schedules( - products_config: dict, jobs: dict[str, "UnresolvedAssetJobDefinition"] + products_config: dict, + product_jobs: dict[str, "UnresolvedAssetJobDefinition"], + sources_job: "UnresolvedAssetJobDefinition", ) -> list[dg.ScheduleDefinition]: - schedules = [] + # Sources run first (default 05:00), ahead of the product schedules (06:00+), + # so each product publishes from same-day source data. A product that runs + # before the sources job simply reads the prior run's cached source IO. + schedules = [ + dg.ScheduleDefinition( + name="schedule_sources", + job=sources_job, + cron_schedule=products_config.get("sources_schedule", "0 5 * * *"), + execution_timezone="America/Denver", + ) + ] for product in _products(products_config): pid = product["id"] schedules.append( dg.ScheduleDefinition( name=f"schedule_{pid}", - job=jobs[pid], + job=product_jobs[pid], cron_schedule=product.get("schedule", "0 6 * * *"), execution_timezone="America/Denver", ) @@ -86,13 +171,15 @@ def _build_schedules( _products_config = _load_products() -_assets = _build_assets(_products_config) -_jobs = _build_jobs(_products_config) -_schedules = _build_schedules(_products_config, _jobs) +_source_assets, _pipeline_assets, _specs_by_pid, _all_specs = _build_graph(_products_config) +_assets = _source_assets + _pipeline_assets +_product_jobs = _build_product_jobs(_products_config) +_sources_job = _build_sources_job(_all_specs) +_schedules = _build_schedules(_products_config, _product_jobs, _sources_job) defs = dg.Definitions( assets=_assets, - jobs=list(_jobs.values()), + jobs=[_sources_job, *_product_jobs.values()], schedules=_schedules, resources={ # USGS_API_KEY is a Dagster+ secret; EnvVar resolves it at run time and @@ -106,10 +193,12 @@ def _build_schedules( ), "geoserver": GeoServerResource(), # Persist asset I/O to GCS instead of the serverless run's ephemeral - # /tmp. Without this, materializing a downstream asset (combine / - # geoserver) on its own can't load its source inputs from a prior run - # and fails with FileNotFoundError. - "io_manager": GCSPickleIOManager( + # /tmp. This is what lets a product job load its shared source inputs + # (materialized by sources_job) without re-running them. The tolerant + # subclass returns an empty payload when a source's blob is absent, so a + # combine never hard-fails on a not-yet-materialized source (e.g. on a + # fresh deploy — run sources_job once before the product jobs). + "io_manager": _TolerantGCSPickleIOManager( gcs=AuthedGCSResource(), gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"), gcs_prefix="dagster-io",