diff --git a/orchestration/AGENTS.md b/orchestration/AGENTS.md new file mode 100644 index 0000000..e3a88a5 --- /dev/null +++ b/orchestration/AGENTS.md @@ -0,0 +1,69 @@ +# Agent guide — DIE orchestration + +Dagster code location for the Data Integration Engine. Defines, schedules, and +publishes the DIE data products. This dir is a `dg` (Dagster CLI) project — see +`[tool.dg]` in `pyproject.toml`. + +## Use `dg` for Dagster operations + +Prefer the `dg` CLI over raw `dagster ...` or ad-hoc scripts for anything +Dagster-related (listing, running, validating, dev server). Run it from the +`orchestration/` directory so `uv` uses the venv that has dagster + dg: + +```bash +uv run dg # from the orchestration/ directory +``` + +Note: the `dg` project root is the **repo root** (`[tool.dg]` in the top-level +`pyproject.toml`), because the `orchestration` package lives at repo-root level +(`./orchestration/`) — same place the Dagster+ serverless build imports it from. +`dg` discovers that project by walking up from `orchestration/`; just always run +via `uv run` from `orchestration/` and it resolves correctly. + +Common operations: + +| Task | Command | +|------|---------| +| Start the local dev UI | `uv run dg dev` | +| List assets / jobs / schedules / resources | `uv run dg list defs` | +| Materialize asset(s) | `uv run dg launch --assets ` | +| Run a product job | `uv run dg launch --assets '*/geoserver'` | +| Scaffold a new component/asset | `uv run dg scaffold ...` | + +Asset-selection syntax is the standard Dagster one (`key`, `key*`, `*key`, +`group:`). A product's full graph is `sources → → geoserver`, +so `/geoserver` plus upstream covers the whole product. + +## Validating changes + +Validate that all definitions load and component YAML is valid: + +```bash +uv run dg check defs # from orchestration/ +``` + +`uv run dg list defs` also exercises loading and is a good quick check. The +plain import smoke test still works as a fallback: + +```bash +uv run python -c "import orchestration.definitions; print('ok')" +``` + +## Architecture (so changes land in the right place) + +- Products are declared in `config/products.yaml`. Each entry expands — via + `assets/products.py:build_product_assets` — into a per-source asset graph: + `sources/` → combine (``) → `/geoserver`. +- `definitions.py` wires assets, one asset job per product (`_job`), + schedules, and resources (`die_config`, `gcs`, `geoserver`, `io_manager`). +- Resources live in `resources/`. The backend DIE engine is the `nmuwd` package + (repo root `backend/`); orchestration only drives it. +- Per-source and geoserver assets soft-fail: errors surface as red asset checks + (WARN), not hard failures, so one dead source doesn't block a product. + +## Deploy + +Dagster+ serverless builds from the repo root per `dagster_cloud.yaml` +(`module_name: orchestration.definitions`). Runtime deps come from the root +`requirements.txt`. Secrets (GCS, GeoServer) are set as Dagster+ env vars, not +in code. diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 4b7ad7d..dc7efd0 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -1,14 +1,26 @@ """Per-source Dagster asset graph for a product. -Each product fans out into one asset per data source (keyed -``[product_id, "sources", ]``) plus a combine asset (keyed -``[product_id]``) that merges every source's contribution, writes the OGC -GeoJSON collection, and uploads it to GCS. +``build_product_assets(product)`` expands one products.yaml entry into a small +asset graph: + + sources/ ─┐ + sources/ ─┼─▶ ─▶ /geoserver + sources/ ─┘ (combine) (publish) + +- **source assets** — keyed ``[product_id, "sources", ]``, one per + data source that provides the product's parameter. Each runs DIE unification + for just that source and emits its records/sites/timeseries. +- **combine asset** — keyed ``[product_id]``. Merges every source's + contribution, writes the OGC GeoJSON collection, and uploads it to GCS. +- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the + combined GeoJSON, converts it to a GeoPackage, and publishes it as a layer in + GeoServer. Design notes: -- Source assets never hard-fail. They emit row-count metadata and an - AssetCheckResult that goes red (WARN) on error or empty output, so one dead - source surfaces in the UI without blocking the product's combine asset. +- Source and geoserver assets never hard-fail. They catch their own errors and + report status via an ``AssetCheckResult`` that goes red (WARN) on error or + empty output, so one dead source — or a GeoServer outage — surfaces in the UI + without blocking the rest of the product graph. - Records cross the IO manager as plain ``_payload`` dicts (the record classes use ``__getattr__`` over ``_payload`` which does not survive pickling). The combine asset rebuilds record objects before dumping. @@ -49,13 +61,20 @@ def _product_source_keys(product: dict) -> list: def _in_name(source_key: str) -> str: + # Combine-asset input kwargs must be valid Python identifiers; source keys + # may contain hyphens, so sanitize and prefix. return f"src_{source_key.replace('-', '_')}" def _build_source_asset(product: dict, source_key: str, group: str): + """Build the asset that unifies a single source for *product*. + + Returns ``(asset_def, asset_key)``. The asset never raises: on failure it + records the traceback and fails its ``returned_data`` check (WARN) instead, + so a broken source does not block the combine asset. Output is shipped as + plain ``_payload`` dicts for IO-manager pickling (see module docstring).""" pid = product["id"] src_key = dg.AssetKey([pid, "sources", source_key]) - is_summary = product["output_type"] == "ogc_summary" @dg.asset( key=src_key, @@ -110,6 +129,11 @@ def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResour def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: list, group: str): + """Build the combine asset (keyed ``[product_id]``) for *product*. + + Depends on every source asset (wired via ``ins``), merges their + records/sites/timeseries, writes the OGC GeoJSON collection — summary or + timeseries depending on ``output_type`` — and uploads it to GCS.""" pid = product["id"] is_summary = product["output_type"] == "ogc_summary" ins = { @@ -188,6 +212,13 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path): def _build_geoserver_asset(product: dict, group: str): + """Build the GeoServer publish asset (keyed ``[product_id, "geoserver"]``). + + Depends on the combine asset for ordering only (``deps``, no data passed): + it reads the combined GeoJSON back from GCS, converts to GeoPackage, and + publishes it. Never raises — failures fail the ``registered`` check (WARN) + instead, so a GeoServer outage doesn't fail the run after GCS upload + succeeded.""" pid = product["id"] gs_key = dg.AssetKey([pid, "geoserver"]) @@ -241,7 +272,10 @@ def _geoserver_asset( def build_product_assets(product: dict) -> list: - """Return the per-source assets and the combine asset for *product*.""" + """Return the full asset list for *product*: one source asset per applicable + source, the combine asset, and the geoserver publish asset (see module + docstring for the graph shape). Assets are grouped ``waterlevels`` or + ``analytes`` by parameter.""" group = "waterlevels" if product["parameter"] == WATERLEVELS else "analytes" source_keys = _product_source_keys(product) diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml index 50a6288..a700805 100644 --- a/orchestration/pyproject.toml +++ b/orchestration/pyproject.toml @@ -26,3 +26,4 @@ nmuwd = { path = "../", editable = true } [tool.hatch.build.targets.wheel] packages = ["orchestration"] + diff --git a/orchestration/resources/die_config.py b/orchestration/resources/die_config.py index ccbf911..cd572a0 100644 --- a/orchestration/resources/die_config.py +++ b/orchestration/resources/die_config.py @@ -9,6 +9,17 @@ class DIEConfigResource(dg.ConfigurableResource): usgs_api_key: Optional[str] = None def get_config(self, product: dict) -> Config: + """Translate a products.yaml entry into a finalized DIE ``Config``. + + Mapping: + - ``output_type`` → ``output_summary`` / ``output_format``. + - ``spatial_filter.county`` → ``county``. ``spatial_filter.state`` sets + ``wkt = None`` (statewide; DIE applies the NM extent downstream). + - ``sources.include`` → enable only those sources (all others off). + ``sources.exclude`` → disable those, leave the rest at their defaults. + - ``parameter`` is set on the Config, then ``finalize()`` validates and + resolves output units/paths. + """ spatial = product.get("spatial_filter", {}) sources_spec = product.get("sources", {}) @@ -24,6 +35,8 @@ def get_config(self, product: dict) -> Config: payload["wkt"] = None if sources_spec.get("include"): + # NOTE: must stay in sync with backend.config.SOURCE_KEYS — an + # include-list product silently drops any source missing here. all_sources = [ "bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "nmose_pod", diff --git a/pyproject.toml b/pyproject.toml index a35c373..3ea690d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,3 +49,12 @@ ignore_missing_imports = true # Archived tests are excluded from pytest (norecursedirs) and reference an old # CLI API; don't type-check them either. exclude = ["tests/archived/"] + +[tool.dg] +directory_type = "project" + +[tool.dg.cli] +suppress_warnings = ["project_and_activated_venv_mismatch"] + +[tool.dg.project] +root_module = "orchestration"