Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 147 additions & 92 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
"""Per-source Dagster asset graph for a product.

``build_product_assets(product)`` expands one products.yaml entry into a small
asset graph:

sources/<key> ─┐
sources/<key> ─┼─▶ <product_id> ─▶ <product_id>/geoserver
sources/<key> ─┘ (combine) (publish)

- **source assets** — keyed ``[product_id, "sources", <source_key>]``, one per
data source that provides the product's parameter. Each runs DIE unification
for just that source and emits its records/sites/timeseries.
- **combine asset** — keyed ``[product_id]``. Merges every source's
contribution, writes the OGC GeoJSON collection, and uploads it to GCS.
"""Shared-source Dagster asset graph for the data products.

The graph has two layers wired through the GCS IO manager:

shared source assets per-product pipeline
["sources", param, mode, scope, k] ──▶ <product_id> ──▶ <product_id>/geoserver
(combine) (publish)

- **shared source assets** — keyed ``["sources", <parameter>, <mode>, <scope>,
<source_key>]``. One per *distinct* (parameter, mode, scope, source) tuple
across **all** products. ``mode`` is ``summary`` or ``timeseries`` (the
backend only distinguishes these two unification modes); ``scope`` encodes the
spatial filter (``state_NM`` / ``county_Bernalillo`` / ``all``). Because the
key is product-independent, every product that needs the same source under the
same parameter/mode/scope shares one asset — the unification runs **once** per
run instead of once per product. Each asset unifies a single parameter for a
single source and emits its records/sites/timeseries.
- **combine asset** — keyed ``[product_id]``. Reads its source inputs back
(``ins``), merges them, writes the OGC GeoJSON collection, uploads to GCS.
- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the
combined GeoJSON, converts it to a GeoPackage, and publishes it as a layer in
GeoServer.
combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer.

Job layout (see ``definitions.py``) makes the sharing pay off: a dedicated
``sources_job`` materializes every shared source asset once; each per-product
job selects only its combine + geoserver assets and loads the (already
materialized) source inputs from the GCS IO manager. So a product run never
re-unifies a source another product already produced.

Design notes:
- Source and geoserver assets never hard-fail. They catch their own errors and
report status via an ``AssetCheckResult`` that goes red (WARN) on error or
empty output, so one dead source — or a GeoServer outage — surfaces in the UI
without blocking the rest of the product graph.
without blocking the rest of the graph.
- Records cross the IO manager as plain ``_payload`` dicts (the record classes
use ``__getattr__`` over ``_payload`` which does not survive pickling). The
combine asset rebuilds record objects before dumping.
"""
import tempfile
import traceback
from collections import namedtuple
from collections.abc import Iterator
from datetime import datetime, timezone
from pathlib import Path
Expand Down Expand Up @@ -58,8 +69,9 @@
# truth for the ogc_mcl_exceedance product.
_MCL_KEY = "config/mcl.json"

# Multi-analyte products that gather one summary record per analyte per well.
_MULTI_ANALYTE_OUTPUT_TYPES = ("ogc_major_chemistry", "ogc_mcl_exceedance")
# Output types whose unification runs in summary mode. The backend only
# distinguishes summary vs timeseries; everything else unifies as timeseries.
_SUMMARY_OUTPUT_TYPES = ("ogc_summary", "ogc_major_chemistry", "ogc_mcl_exceedance")

# Classic major-ion suite for the ogc_major_chemistry product. One feature per
# well, with each analyte's latest value/units/date as properties.
Expand All @@ -74,6 +86,11 @@
"sulfate",
]

# A single shared source asset's identity. Two products that produce the same
# SourceSpec share one asset (the namedtuple is hashable, so dedup is just set
# membership). ``group`` follows the parameter, not the product.
SourceSpec = namedtuple("SourceSpec", "parameter mode scope source_key group")


def _product_params(product: dict) -> list[str]:
"""The DIE parameter(s) a product unifies. Single-parameter products yield
Expand All @@ -88,14 +105,41 @@ def _product_params(product: dict) -> list[str]:
return [product["parameter"]]


def _product_source_keys(product: dict) -> list[str]:
"""Source keys that apply to this product: the union of its parameters'
agencies, filtered by the product's include/exclude list."""
agencies: list = []
for param in _product_params(product):
for a in PARAMETER_SOURCE_MAP[param]["agencies"]:
if a not in agencies:
agencies.append(a)
def _product_mode(product: dict) -> str:
"""``summary`` or ``timeseries`` — the unification mode the product needs."""
return "summary" if product.get("output_type") in _SUMMARY_OUTPUT_TYPES else "timeseries"


def _spatial_scope(product: dict) -> str:
"""A stable string identity for the product's spatial filter. Sources with
different spatial extents unify to different results, so the extent is part
of the shared-source key."""
sf = product.get("spatial_filter", {}) or {}
if sf.get("county"):
return f"county_{sf['county']}"
if sf.get("state"):
return f"state_{sf['state']}"
return "all"


def _scope_to_spatial_filter(scope: str) -> dict:
"""Inverse of :func:`_spatial_scope`, for rebuilding a config from a spec."""
kind, _, value = scope.partition("_")
if kind == "county":
return {"county": value}
if kind == "state":
return {"state": value}
return {}


def _group_for_param(parameter: str) -> str:
return "waterlevels" if parameter == WATERLEVELS else "analytes"


def _param_source_keys(product: dict, parameter: str) -> list[str]:
"""Source keys that apply to *parameter* for this product: the parameter's
agencies filtered by the product's include/exclude list."""
agencies = list(PARAMETER_SOURCE_MAP[parameter]["agencies"])
spec = product.get("sources", {}) or {}
if spec.get("include"):
return [a for a in agencies if a in spec["include"]]
Expand All @@ -104,28 +148,52 @@ def _product_source_keys(product: dict) -> list[str]:
return agencies


def _in_name(source_key: str) -> str:
# Combine-asset input kwargs must be valid Python identifiers; source keys
# may contain hyphens, so sanitize and prefix.
return f"src_{source_key.replace('-', '_')}"


def _build_source_asset(
product: dict, source_key: str, group: str
) -> tuple[dg.AssetsDefinition, dg.AssetKey]:
"""Build the asset that unifies a single source for *product*.

Returns ``(asset_def, asset_key)``. The asset never raises: on failure it
records the traceback and fails its ``returned_data`` check (WARN) instead,
so a broken source does not block the combine asset. Output is shipped as
plain ``_payload`` dicts for IO-manager pickling (see module docstring)."""
pid = product["id"]
src_key = dg.AssetKey([pid, "sources", source_key])
params = _product_params(product)
def product_source_specs(product: dict) -> list[SourceSpec]:
"""Every shared source asset this product depends on, one per
(parameter, source) pair. ``mode`` and ``scope`` are constant for a product;
the parameter and source vary. Returned in a stable order."""
mode = _product_mode(product)
scope = _spatial_scope(product)
specs: list[SourceSpec] = []
for param in _product_params(product):
group = _group_for_param(param)
for source_key in _param_source_keys(product, param):
specs.append(SourceSpec(param, mode, scope, source_key, group))
return specs


def shared_source_key(spec: SourceSpec) -> dg.AssetKey:
return dg.AssetKey(["sources", spec.parameter, spec.mode, spec.scope, spec.source_key])


def _in_name(spec: SourceSpec) -> str:
# Combine-asset input kwargs must be valid Python identifiers; parameter and
# source keys may contain hyphens, so sanitize. (parameter, source) is
# unique within a product, so it disambiguates the multi-analyte combines.
raw = f"src_{spec.parameter}_{spec.source_key}"
return raw.replace("-", "_")


def build_shared_source_asset(spec: SourceSpec) -> dg.AssetsDefinition:
"""Build the shared asset that unifies one source for one (parameter, mode,
scope) — keyed product-independently so every product needing it shares it.

The asset never raises: on failure it records the traceback and fails its
``returned_data`` check (WARN) instead, so a broken source does not block any
product's combine asset. Output ships as plain ``_payload`` dicts for
IO-manager pickling (see module docstring)."""
src_key = shared_source_key(spec)
# Synthetic product spec driving config: only parameter, mode, and spatial
# filter affect a single source's unification (sources include/exclude only
# selects which sources a product consumes — irrelevant here).
synth_product = {
"output_type": "ogc_summary" if spec.mode == "summary" else "ogc_timeseries",
"spatial_filter": _scope_to_spatial_filter(spec.scope),
}

@dg.asset(
key=src_key,
group_name=group,
group_name=spec.group,
check_specs=[dg.AssetCheckSpec(name=_CHECK_NAME, asset=src_key)],
)
def _source_asset(
Expand All @@ -136,24 +204,20 @@ def _source_asset(
sites: list[dict] = []
timeseries: list[list[dict]] = []
try:
# One unification pass per parameter. Single-parameter products run
# once; the major-chemistry product runs once per analyte and
# accumulates summary records (analyte identity lives in each
# record's parameter_name). A source that doesn't provide a given
# parameter is skipped by unify_source (source_pair → None).
# A source that doesn't provide this parameter is skipped by
# unify_source (source_pair → None).
with forward_die_logs(context):
for param in params:
config = die_config.get_config(product, parameter=param)
persister = unify_source(config, source_key)
# Ship plain dicts across the IO manager; rebuild in combine.
records.extend(r._payload for r in persister.records)
sites.extend(s._payload for s in persister.sites)
timeseries.extend(
[o._payload for o in site_ts] for site_ts in persister.timeseries
)
config = die_config.get_config(synth_product, parameter=spec.parameter)
persister = unify_source(config, spec.source_key)
# Ship plain dicts across the IO manager; rebuild in combine.
records.extend(r._payload for r in persister.records)
sites.extend(s._payload for s in persister.sites)
timeseries.extend(
[o._payload for o in site_ts] for site_ts in persister.timeseries
)
except Exception:
error = traceback.format_exc()
context.log.error(f"Source {source_key} failed:\n{error}")
context.log.error(f"Source {spec.source_key} failed:\n{error}")

obs_count = sum(len(t) for t in timeseries)
payload = {"records": records, "sites": sites, "timeseries": timeseries}
Expand All @@ -164,7 +228,10 @@ def _source_asset(
yield dg.Output(
payload,
metadata={
"source": source_key,
"source": spec.source_key,
"parameter": spec.parameter,
"mode": spec.mode,
"scope": spec.scope,
"record_count": len(records),
"site_count": len(sites),
"observation_count": obs_count,
Expand All @@ -183,27 +250,22 @@ def _source_asset(
},
)

return _source_asset, src_key
return _source_asset


def _build_combine_asset(
product: dict,
source_keys: list[str],
source_asset_keys: list[dg.AssetKey],
group: str,
product: dict, specs: list[SourceSpec], group: str
) -> dg.AssetsDefinition:
"""Build the combine asset (keyed ``[product_id]``) for *product*.

Depends on every source asset (wired via ``ins``), merges their
records/sites/timeseries, writes the OGC GeoJSON collection — summary,
timeseries, major-chemistry, or waterlevel-trend depending on
``output_type`` — and uploads it to GCS."""
Depends on every shared source asset it needs (wired via ``ins``), merges
their records/sites/timeseries, writes the OGC GeoJSON collection — summary,
timeseries, major-chemistry, or trend depending on ``output_type`` — and
uploads it to GCS. The source inputs are loaded from the GCS IO manager
(materialized by the sources job), so the combine never re-unifies them."""
pid = product["id"]
output_type = product["output_type"]
ins = {
_in_name(k): dg.AssetIn(key=ak)
for k, ak in zip(source_keys, source_asset_keys)
}
ins = {_in_name(spec): dg.AssetIn(key=shared_source_key(spec)) for spec in specs}

@dg.asset(key=dg.AssetKey(pid), group_name=group, ins=ins)
def _combine_asset(
Expand Down Expand Up @@ -380,22 +442,15 @@ def _geoserver_asset(
return _geoserver_asset


def build_product_assets(product: dict) -> list[dg.AssetsDefinition]:
"""Return the full asset list for *product*: one source asset per applicable
source, the combine asset, and the geoserver publish asset (see module
docstring for the graph shape). Assets are grouped ``waterlevels`` or
``analytes`` by parameter (major-chemistry products group under
``analytes``)."""
def build_product_pipeline_assets(
product: dict, specs: list[SourceSpec]
) -> list[dg.AssetsDefinition]:
"""Return the product's own assets — the combine asset and the geoserver
publish asset. The shared source assets it consumes (``specs``) are built
once by :func:`build_shared_source_asset` in ``definitions.py``, not here, so
products sharing a source share one asset. The combine's group follows its
parameter family (waterlevels vs analytes)."""
group = "waterlevels" if product.get("parameter") == WATERLEVELS else "analytes"
source_keys = _product_source_keys(product)

source_assets = []
source_asset_keys = []
for sk in source_keys:
asset, key = _build_source_asset(product, sk, group)
source_assets.append(asset)
source_asset_keys.append(key)

combine = _build_combine_asset(product, source_keys, source_asset_keys, group)
combine = _build_combine_asset(product, specs, group)
geoserver = _build_geoserver_asset(product, group)
return source_assets + [combine, geoserver]
return [combine, geoserver]
Loading
Loading