Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the
combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer.

Job layout (see ``definitions.py``) makes the sharing pay off: a dedicated
``sources_job`` materializes every shared source asset once; each per-product
job selects only its combine + geoserver assets and loads the (already
materialized) source inputs from the GCS IO manager. So a product run never
re-unifies a source another product already produced.
Job layout (see ``definitions.py``) makes the sharing pay off **and** keeps each
run's lineage complete. Products are grouped into *cohorts* by
(group, mode, scope) — the products that can share source assets. One job per
cohort materializes that cohort's whole graph in a single run: each shared
source unifies once (it is one asset key, selected once), then every member
combine reads it back through the GCS IO manager and publishes. So a source is
never fetched twice in a run, while the full sources → combine → geoserver
lineage stays visible for every product. (Cross-product dedup requires the
sharing products to run together; that is exactly what a cohort is.)

Design notes:
- Source and geoserver assets never hard-fail. They catch their own errors and
Expand All @@ -34,6 +38,23 @@
- Records cross the IO manager as plain ``_payload`` dicts (the record classes
use ``__getattr__`` over ``_payload`` which does not survive pickling). The
combine asset rebuilds record objects before dumping.

Known limitation — per-analyte source fetches (potential future optimization):
Sharing is deduped at the ``(parameter, mode, scope, source)`` grain, which
collapses duplication *across products* (e.g. ``sulfate/summary/state_NM/wqp``
is one asset shared by nm_major_chemistry and nm_mcl_exceedance). It does NOT
collapse *across analytes*: a source appears once per analyte (e.g. ``wqp``
has ~13 summary source assets, one per analyte). This is because the backend
unifies a single parameter per pass (``unify_source`` uses one
``config.parameter``), so each analyte is a separate sweep of the same wells
even though one provider query (WQP/AMP/...) typically returns all analytes at
once. Collapsing this would need a **backend** change — multi-analyte
unification that fetches a source once and emits per-analyte records — after
which the source key could drop ``parameter`` (e.g.
``["sources", "analytes", mode, scope, source]``) and the analyte combines
would each filter the shared multi-analyte payload. That is the bulk of the
remaining redundant API pulls for analyte products; it touches DIE core, not
this asset graph, so it is intentionally out of scope here.
"""
import tempfile
import traceback
Expand Down
157 changes: 94 additions & 63 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ class _TolerantGCSPickleIOManager(GCSPickleIOManager):

Combine assets load their shared source inputs via ``ins``, and that load
happens *before* the asset body runs — so a missing pickle can't be caught
inside the combine. The source assets are materialized by ``sources_job``;
a product job loads their output from GCS without re-running them. On first
deploy — or if a newly added shared source has never been materialized —
the blob is absent and the stock manager would fail the combine hard.
inside the combine. A cohort job materializes a combine's sources in the
same run, so the blob is normally present; but an **ad-hoc** materialization
of just a combine (e.g. re-publish from the Assets UI without re-running the
upstream sources), or a brand-new shared source never yet materialized,
leaves the blob absent and the stock manager would fail the combine hard.

Returning an empty payload instead degrades that product to an empty
collection (the geoserver asset already soft-fails on 0 features) rather
Expand Down Expand Up @@ -104,82 +105,111 @@ def _build_graph(products_config: dict):
return source_assets, pipeline_assets, specs_by_pid, all_specs


def _product_selection(pid: str) -> dg.AssetSelection:
# Only the product's own assets — combine + geoserver. The shared source
# inputs are NOT selected: the sources job materializes them, and the
# combine loads them from the GCS IO manager. This is what keeps a product
# run from re-unifying a source another product already produced.
return dg.AssetSelection.keys(dg.AssetKey(pid), dg.AssetKey([pid, "geoserver"]))
def _cohort_key(specs) -> tuple[str, str, str]:
"""A cohort bundles the products that *can* share source assets — same
parameter group, unification mode, and spatial scope. Materializing a cohort
in one run is what lets each shared source unify once while every member's
full lineage (sources → combine → geoserver) stays visible in that run.

These three fields are constant within a product (all of a product's specs
carry the same mode/scope, and its parameters are all one group), so any
spec is representative."""
s = specs[0]
return (s.group, s.mode, s.scope)

def _build_product_jobs(
products_config: dict,
) -> dict[str, "UnresolvedAssetJobDefinition"]:
"""One asset job per product, selecting only that product's combine +
geoserver assets. Returns {product_id: job} so schedules can target it. The
shared source inputs are loaded from the GCS IO manager, not re-materialized
(see :func:`_product_selection`)."""
jobs = {}

def _cohort_name(key: tuple[str, str, str]) -> str:
group, mode, scope = key
return f"{group}_{mode}_{scope}"


def _cron_sort_key(cron: str) -> tuple[int, int]:
# "min hour * * *" -> (hour, minute) for picking a cohort's earliest member.
parts = cron.split()
try:
return (int(parts[1]), int(parts[0]))
except (IndexError, ValueError):
return (6, 0)


def _build_cohorts(products_config: dict, specs_by_pid: dict) -> dict:
"""Group products into cohorts keyed by (group, mode, scope). Returns
``{cohort_name: {"members": [pid, ...], "cron": str}}``; the cohort cron is
the earliest member schedule (members run together, so they share one)."""
cohorts: dict = {}
for product in _products(products_config):
pid = product["id"]
jobs[pid] = dg.define_asset_job(
name=f"{pid}_job",
selection=_product_selection(pid),
description=f"Publish the {pid} data product (combine → geoserver) from materialized sources.",
specs = specs_by_pid[pid]
if not specs:
continue
name = _cohort_name(_cohort_key(specs))
cohort = cohorts.setdefault(name, {"members": [], "cron": None})
cohort["members"].append(pid)
cron = product.get("schedule", "0 6 * * *")
if cohort["cron"] is None or _cron_sort_key(cron) < _cron_sort_key(cohort["cron"]):
cohort["cron"] = cron
return cohorts


def _cohort_selection(members: list[str], specs_by_pid: dict) -> dg.AssetSelection:
"""The full graph for a cohort: every member's shared source assets (deduped
across members — a shared source resolves to one key, so it is selected once
and materialized once per run), plus every member's combine and geoserver
asset. The result is a complete sources → combine → geoserver lineage with no
duplicated source fetch."""
keys: set = set()
for pid in members:
for spec in specs_by_pid[pid]:
keys.add(shared_source_key(spec))
keys.add(dg.AssetKey(pid))
keys.add(dg.AssetKey([pid, "geoserver"]))
return dg.AssetSelection.keys(*sorted(keys, key=lambda k: k.to_user_string()))


def _build_cohort_jobs(
cohorts: dict, specs_by_pid: dict
) -> dict[str, "UnresolvedAssetJobDefinition"]:
"""One job per cohort, selecting that cohort's full graph (see
:func:`_cohort_selection`). Returns ``{cohort_name: job}``."""
jobs = {}
for name, cohort in cohorts.items():
members = cohort["members"]
jobs[name] = dg.define_asset_job(
name=f"{name}_job",
selection=_cohort_selection(members, specs_by_pid),
description=(
f"Materialize the {name} cohort in one run — shared sources "
f"(each unified once) → combines → geoserver for: "
f"{', '.join(members)}."
),
)
return jobs


def _build_sources_job(all_specs) -> "UnresolvedAssetJobDefinition":
"""A single job that materializes every shared source asset once. Product
jobs read these results from the IO manager rather than re-unifying."""
keys = [shared_source_key(spec) for spec in all_specs]
return dg.define_asset_job(
name="sources_job",
selection=dg.AssetSelection.keys(*keys),
description="Unify every shared source once; product jobs consume the cached results.",
)


def _build_schedules(
products_config: dict,
product_jobs: dict[str, "UnresolvedAssetJobDefinition"],
sources_job: "UnresolvedAssetJobDefinition",
cohorts: dict, cohort_jobs: dict[str, "UnresolvedAssetJobDefinition"]
) -> list[dg.ScheduleDefinition]:
# Sources run first (default 05:00), ahead of the product schedules (06:00+),
# so each product publishes from same-day source data. A product that runs
# before the sources job simply reads the prior run's cached source IO.
schedules = [
return [
dg.ScheduleDefinition(
name="schedule_sources",
job=sources_job,
cron_schedule=products_config.get("sources_schedule", "0 5 * * *"),
name=f"schedule_{name}",
job=cohort_jobs[name],
cron_schedule=cohort["cron"],
execution_timezone="America/Denver",
)
for name, cohort in cohorts.items()
]
for product in _products(products_config):
pid = product["id"]
schedules.append(
dg.ScheduleDefinition(
name=f"schedule_{pid}",
job=product_jobs[pid],
cron_schedule=product.get("schedule", "0 6 * * *"),
execution_timezone="America/Denver",
)
)
return schedules


_products_config = _load_products()
_source_assets, _pipeline_assets, _specs_by_pid, _all_specs = _build_graph(_products_config)
_assets = _source_assets + _pipeline_assets
_product_jobs = _build_product_jobs(_products_config)
_sources_job = _build_sources_job(_all_specs)
_schedules = _build_schedules(_products_config, _product_jobs, _sources_job)
_cohorts = _build_cohorts(_products_config, _specs_by_pid)
_cohort_jobs = _build_cohort_jobs(_cohorts, _specs_by_pid)
_schedules = _build_schedules(_cohorts, _cohort_jobs)

defs = dg.Definitions(
assets=_assets,
jobs=[_sources_job, *_product_jobs.values()],
jobs=list(_cohort_jobs.values()),
schedules=_schedules,
resources={
# USGS_API_KEY is a Dagster+ secret; EnvVar resolves it at run time and
Expand All @@ -193,11 +223,12 @@ def _build_schedules(
),
"geoserver": GeoServerResource(),
# Persist asset I/O to GCS instead of the serverless run's ephemeral
# /tmp. This is what lets a product job load its shared source inputs
# (materialized by sources_job) without re-running them. The tolerant
# subclass returns an empty payload when a source's blob is absent, so a
# combine never hard-fails on a not-yet-materialized source (e.g. on a
# fresh deploy — run sources_job once before the product jobs).
# /tmp. A cohort run materializes a shared source once and every member
# combine reads it back through this manager (so the source unifies once
# even though multiple combines consume it). The tolerant subclass
# returns an empty payload when a source's blob is absent, so an ad-hoc
# combine-only materialization (or a never-yet-run new source) degrades
# to an empty collection instead of hard-failing.
"io_manager": _TolerantGCSPickleIOManager(
gcs=AuthedGCSResource(),
gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"),
Expand Down
Loading