Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions orchestration/AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Agent guide — DIE orchestration

Dagster code location for the Data Integration Engine. Defines, schedules, and
publishes the DIE data products. This dir is a `dg` (Dagster CLI) project — see
`[tool.dg]` in `pyproject.toml`.

## Use `dg` for Dagster operations

Prefer the `dg` CLI over raw `dagster ...` or ad-hoc scripts for anything
Dagster-related (listing, running, validating, dev server). Run it from the
`orchestration/` directory so `uv` uses the venv that has dagster + dg:

```bash
uv run dg <command> # from the orchestration/ directory
```

Note: the `dg` project root is the **repo root** (`[tool.dg]` in the top-level
`pyproject.toml`), because the `orchestration` package lives at repo-root level
(`./orchestration/`) — same place the Dagster+ serverless build imports it from.
`dg` discovers that project by walking up from `orchestration/`; just always run
via `uv run` from `orchestration/` and it resolves correctly.

Common operations:

| Task | Command |
|------|---------|
| Start the local dev UI | `uv run dg dev` |
| List assets / jobs / schedules / resources | `uv run dg list defs` |
| Materialize asset(s) | `uv run dg launch --assets <selection>` |
| Run a product job | `uv run dg launch --assets '*<product_id>/geoserver'` |
| Scaffold a new component/asset | `uv run dg scaffold ...` |

Asset-selection syntax is the standard Dagster one (`key`, `key*`, `*key`,
`group:<name>`). A product's full graph is `sources → <product_id> → geoserver`,
so `<product_id>/geoserver` plus upstream covers the whole product.

## Validating changes

Validate that all definitions load and component YAML is valid:

```bash
uv run dg check defs # from orchestration/
```

`uv run dg list defs` also exercises loading and is a good quick check. The
plain import smoke test still works as a fallback:

```bash
uv run python -c "import orchestration.definitions; print('ok')"
```

## Architecture (so changes land in the right place)

- Products are declared in `config/products.yaml`. Each entry expands — via
`assets/products.py:build_product_assets` — into a per-source asset graph:
`sources/<key>` → combine (`<product_id>`) → `<product_id>/geoserver`.
- `definitions.py` wires assets, one asset job per product (`<product_id>_job`),
schedules, and resources (`die_config`, `gcs`, `geoserver`, `io_manager`).
- Resources live in `resources/`. The backend DIE engine is the `nmuwd` package
(repo root `backend/`); orchestration only drives it.
- Per-source and geoserver assets soft-fail: errors surface as red asset checks
(WARN), not hard failures, so one dead source doesn't block a product.

## Deploy

Dagster+ serverless builds from the repo root per `dagster_cloud.yaml`
(`module_name: orchestration.definitions`). Runtime deps come from the root
`requirements.txt`. Secrets (GCS, GeoServer) are set as Dagster+ env vars, not
in code.
52 changes: 43 additions & 9 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
"""Per-source Dagster asset graph for a product.

Each product fans out into one asset per data source (keyed
``[product_id, "sources", <source_key>]``) plus a combine asset (keyed
``[product_id]``) that merges every source's contribution, writes the OGC
GeoJSON collection, and uploads it to GCS.
``build_product_assets(product)`` expands one products.yaml entry into a small
asset graph:

sources/<key> ─┐
sources/<key> ─┼─▶ <product_id> ─▶ <product_id>/geoserver
sources/<key> ─┘ (combine) (publish)

- **source assets** — keyed ``[product_id, "sources", <source_key>]``, one per
data source that provides the product's parameter. Each runs DIE unification
for just that source and emits its records/sites/timeseries.
- **combine asset** — keyed ``[product_id]``. Merges every source's
contribution, writes the OGC GeoJSON collection, and uploads it to GCS.
- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the
combined GeoJSON, converts it to a GeoPackage, and publishes it as a layer in
GeoServer.

Design notes:
- Source assets never hard-fail. They emit row-count metadata and an
AssetCheckResult that goes red (WARN) on error or empty output, so one dead
source surfaces in the UI without blocking the product's combine asset.
- Source and geoserver assets never hard-fail. They catch their own errors and
report status via an ``AssetCheckResult`` that goes red (WARN) on error or
empty output, so one dead source — or a GeoServer outage — surfaces in the UI
without blocking the rest of the product graph.
- Records cross the IO manager as plain ``_payload`` dicts (the record classes
use ``__getattr__`` over ``_payload`` which does not survive pickling). The
combine asset rebuilds record objects before dumping.
Expand Down Expand Up @@ -49,13 +61,20 @@ def _product_source_keys(product: dict) -> list:


def _in_name(source_key: str) -> str:
# Combine-asset input kwargs must be valid Python identifiers; source keys
# may contain hyphens, so sanitize and prefix.
return f"src_{source_key.replace('-', '_')}"


def _build_source_asset(product: dict, source_key: str, group: str):
"""Build the asset that unifies a single source for *product*.

Returns ``(asset_def, asset_key)``. The asset never raises: on failure it
records the traceback and fails its ``returned_data`` check (WARN) instead,
so a broken source does not block the combine asset. Output is shipped as
plain ``_payload`` dicts for IO-manager pickling (see module docstring)."""
pid = product["id"]
src_key = dg.AssetKey([pid, "sources", source_key])
is_summary = product["output_type"] == "ogc_summary"

@dg.asset(
key=src_key,
Expand Down Expand Up @@ -110,6 +129,11 @@ def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResour


def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: list, group: str):
"""Build the combine asset (keyed ``[product_id]``) for *product*.

Depends on every source asset (wired via ``ins``), merges their
records/sites/timeseries, writes the OGC GeoJSON collection — summary or
timeseries depending on ``output_type`` — and uploads it to GCS."""
pid = product["id"]
is_summary = product["output_type"] == "ogc_summary"
ins = {
Expand Down Expand Up @@ -188,6 +212,13 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path):


def _build_geoserver_asset(product: dict, group: str):
"""Build the GeoServer publish asset (keyed ``[product_id, "geoserver"]``).

Depends on the combine asset for ordering only (``deps``, no data passed):
it reads the combined GeoJSON back from GCS, converts to GeoPackage, and
publishes it. Never raises — failures fail the ``registered`` check (WARN)
instead, so a GeoServer outage doesn't fail the run after GCS upload
succeeded."""
pid = product["id"]
gs_key = dg.AssetKey([pid, "geoserver"])

Expand Down Expand Up @@ -241,7 +272,10 @@ def _geoserver_asset(


def build_product_assets(product: dict) -> list:
"""Return the per-source assets and the combine asset for *product*."""
"""Return the full asset list for *product*: one source asset per applicable
source, the combine asset, and the geoserver publish asset (see module
docstring for the graph shape). Assets are grouped ``waterlevels`` or
``analytes`` by parameter."""
group = "waterlevels" if product["parameter"] == WATERLEVELS else "analytes"
source_keys = _product_source_keys(product)

Expand Down
1 change: 1 addition & 0 deletions orchestration/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ nmuwd = { path = "../", editable = true }

[tool.hatch.build.targets.wheel]
packages = ["orchestration"]

13 changes: 13 additions & 0 deletions orchestration/resources/die_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ class DIEConfigResource(dg.ConfigurableResource):
usgs_api_key: Optional[str] = None

def get_config(self, product: dict) -> Config:
"""Translate a products.yaml entry into a finalized DIE ``Config``.

Mapping:
- ``output_type`` → ``output_summary`` / ``output_format``.
- ``spatial_filter.county`` → ``county``. ``spatial_filter.state`` sets
``wkt = None`` (statewide; DIE applies the NM extent downstream).
- ``sources.include`` → enable only those sources (all others off).
``sources.exclude`` → disable those, leave the rest at their defaults.
- ``parameter`` is set on the Config, then ``finalize()`` validates and
resolves output units/paths.
"""
spatial = product.get("spatial_filter", {})
sources_spec = product.get("sources", {})

Expand All @@ -24,6 +35,8 @@ def get_config(self, product: dict) -> Config:
payload["wkt"] = None

if sources_spec.get("include"):
# NOTE: must stay in sync with backend.config.SOURCE_KEYS — an
# include-list product silently drops any source missing here.
all_sources = [
"bernco", "bor", "cabq", "ebid", "nmbgmr_amp",
"nmed_dwb", "nmose_isc_seven_rivers", "nmose_pod",
Expand Down
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,12 @@ ignore_missing_imports = true
# Archived tests are excluded from pytest (norecursedirs) and reference an old
# CLI API; don't type-check them either.
exclude = ["tests/archived/"]

[tool.dg]
directory_type = "project"

[tool.dg.cli]
suppress_warnings = ["project_and_activated_venv_mismatch"]

[tool.dg.project]
root_module = "orchestration"
Loading