From 59a38c1e98e9b6a7b665041c1bba53a4364e03fa Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 25 Jun 2026 22:56:09 -0600 Subject: [PATCH 1/4] Improve in-code documentation for product asset graph - products.py: refresh stale module docstring (now covers the geoserver publish asset + GeoPackage flow and the ASCII graph), document all three asset builders and build_product_assets, explain _in_name sanitization, drop an unused is_summary local in the source-asset builder. - die_config.py: document get_config's products.yaml -> Config mapping and flag that the include-list source list must track backend SOURCE_KEYS. Docs/comments only; no behavior change. Co-Authored-By: Claude Opus 4.8 --- orchestration/assets/products.py | 52 ++++++++++++++++++++++----- orchestration/resources/die_config.py | 13 +++++++ 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 4b7ad7d..dc7efd0 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -1,14 +1,26 @@ """Per-source Dagster asset graph for a product. -Each product fans out into one asset per data source (keyed -``[product_id, "sources", ]``) plus a combine asset (keyed -``[product_id]``) that merges every source's contribution, writes the OGC -GeoJSON collection, and uploads it to GCS. +``build_product_assets(product)`` expands one products.yaml entry into a small +asset graph: + + sources/ ─┐ + sources/ ─┼─▶ ─▶ /geoserver + sources/ ─┘ (combine) (publish) + +- **source assets** — keyed ``[product_id, "sources", ]``, one per + data source that provides the product's parameter. Each runs DIE unification + for just that source and emits its records/sites/timeseries. +- **combine asset** — keyed ``[product_id]``. Merges every source's + contribution, writes the OGC GeoJSON collection, and uploads it to GCS. +- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the + combined GeoJSON, converts it to a GeoPackage, and publishes it as a layer in + GeoServer. Design notes: -- Source assets never hard-fail. They emit row-count metadata and an - AssetCheckResult that goes red (WARN) on error or empty output, so one dead - source surfaces in the UI without blocking the product's combine asset. +- Source and geoserver assets never hard-fail. They catch their own errors and + report status via an ``AssetCheckResult`` that goes red (WARN) on error or + empty output, so one dead source — or a GeoServer outage — surfaces in the UI + without blocking the rest of the product graph. - Records cross the IO manager as plain ``_payload`` dicts (the record classes use ``__getattr__`` over ``_payload`` which does not survive pickling). The combine asset rebuilds record objects before dumping. @@ -49,13 +61,20 @@ def _product_source_keys(product: dict) -> list: def _in_name(source_key: str) -> str: + # Combine-asset input kwargs must be valid Python identifiers; source keys + # may contain hyphens, so sanitize and prefix. return f"src_{source_key.replace('-', '_')}" def _build_source_asset(product: dict, source_key: str, group: str): + """Build the asset that unifies a single source for *product*. + + Returns ``(asset_def, asset_key)``. The asset never raises: on failure it + records the traceback and fails its ``returned_data`` check (WARN) instead, + so a broken source does not block the combine asset. Output is shipped as + plain ``_payload`` dicts for IO-manager pickling (see module docstring).""" pid = product["id"] src_key = dg.AssetKey([pid, "sources", source_key]) - is_summary = product["output_type"] == "ogc_summary" @dg.asset( key=src_key, @@ -110,6 +129,11 @@ def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResour def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: list, group: str): + """Build the combine asset (keyed ``[product_id]``) for *product*. + + Depends on every source asset (wired via ``ins``), merges their + records/sites/timeseries, writes the OGC GeoJSON collection — summary or + timeseries depending on ``output_type`` — and uploads it to GCS.""" pid = product["id"] is_summary = product["output_type"] == "ogc_summary" ins = { @@ -188,6 +212,13 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path): def _build_geoserver_asset(product: dict, group: str): + """Build the GeoServer publish asset (keyed ``[product_id, "geoserver"]``). + + Depends on the combine asset for ordering only (``deps``, no data passed): + it reads the combined GeoJSON back from GCS, converts to GeoPackage, and + publishes it. Never raises — failures fail the ``registered`` check (WARN) + instead, so a GeoServer outage doesn't fail the run after GCS upload + succeeded.""" pid = product["id"] gs_key = dg.AssetKey([pid, "geoserver"]) @@ -241,7 +272,10 @@ def _geoserver_asset( def build_product_assets(product: dict) -> list: - """Return the per-source assets and the combine asset for *product*.""" + """Return the full asset list for *product*: one source asset per applicable + source, the combine asset, and the geoserver publish asset (see module + docstring for the graph shape). Assets are grouped ``waterlevels`` or + ``analytes`` by parameter.""" group = "waterlevels" if product["parameter"] == WATERLEVELS else "analytes" source_keys = _product_source_keys(product) diff --git a/orchestration/resources/die_config.py b/orchestration/resources/die_config.py index ccbf911..cd572a0 100644 --- a/orchestration/resources/die_config.py +++ b/orchestration/resources/die_config.py @@ -9,6 +9,17 @@ class DIEConfigResource(dg.ConfigurableResource): usgs_api_key: Optional[str] = None def get_config(self, product: dict) -> Config: + """Translate a products.yaml entry into a finalized DIE ``Config``. + + Mapping: + - ``output_type`` → ``output_summary`` / ``output_format``. + - ``spatial_filter.county`` → ``county``. ``spatial_filter.state`` sets + ``wkt = None`` (statewide; DIE applies the NM extent downstream). + - ``sources.include`` → enable only those sources (all others off). + ``sources.exclude`` → disable those, leave the rest at their defaults. + - ``parameter`` is set on the Config, then ``finalize()`` validates and + resolves output units/paths. + """ spatial = product.get("spatial_filter", {}) sources_spec = product.get("sources", {}) @@ -24,6 +35,8 @@ def get_config(self, product: dict) -> Config: payload["wkt"] = None if sources_spec.get("include"): + # NOTE: must stay in sync with backend.config.SOURCE_KEYS — an + # include-list product silently drops any source missing here. all_sources = [ "bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "nmose_pod", From f995c2753b8ae04507842a9d77c9f04e6c33e645 Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 25 Jun 2026 23:11:55 -0600 Subject: [PATCH 2/4] Add dg project config to pyproject files Declare the Dagster dg CLI project layout: root pyproject marks the repo as a dg project; orchestration/pyproject sets root_module=orchestration. Co-Authored-By: Claude Opus 4.8 --- orchestration/pyproject.toml | 6 ++++++ pyproject.toml | 3 +++ 2 files changed, 9 insertions(+) diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml index 50a6288..3f5adcf 100644 --- a/orchestration/pyproject.toml +++ b/orchestration/pyproject.toml @@ -26,3 +26,9 @@ nmuwd = { path = "../", editable = true } [tool.hatch.build.targets.wheel] packages = ["orchestration"] + +[tool.dg] +directory_type="project" + +[tool.dg.project] +root_module="orchestration" diff --git a/pyproject.toml b/pyproject.toml index a35c373..d48760e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,3 +49,6 @@ ignore_missing_imports = true # Archived tests are excluded from pytest (norecursedirs) and reference an old # CLI API; don't type-check them either. exclude = ["tests/archived/"] + +[tool.dg] +directory_type = "project" From 0f36b2623a45d0b626e85d529ea98cd20b7c2ae5 Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 25 Jun 2026 23:19:00 -0600 Subject: [PATCH 3/4] Add AGENTS.md for orchestration; direct agents to use dg Document the dg-based workflow for Dagster operations (dev, list defs, launch), the product asset-graph architecture, validation via the import smoke test (dg check defs is currently blocked by a root_module layout mismatch), and the Dagster+ serverless deploy setup. Co-Authored-By: Claude Opus 4.8 --- orchestration/AGENTS.md | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 orchestration/AGENTS.md diff --git a/orchestration/AGENTS.md b/orchestration/AGENTS.md new file mode 100644 index 0000000..3ca88cc --- /dev/null +++ b/orchestration/AGENTS.md @@ -0,0 +1,60 @@ +# Agent guide — DIE orchestration + +Dagster code location for the Data Integration Engine. Defines, schedules, and +publishes the DIE data products. This dir is a `dg` (Dagster CLI) project — see +`[tool.dg]` in `pyproject.toml`. + +## Use `dg` for Dagster operations + +Prefer the `dg` CLI over raw `dagster ...` or ad-hoc scripts for anything +Dagster-related (listing, running, validating, dev server). Run it through the +project venv: + +```bash +uv run dg # from the orchestration/ directory +``` + +Common operations: + +| Task | Command | +|------|---------| +| Start the local dev UI | `uv run dg dev` | +| List assets / jobs / schedules / resources | `uv run dg list defs` | +| Materialize asset(s) | `uv run dg launch --assets ` | +| Run a product job | `uv run dg launch --assets '*/geoserver'` | +| Scaffold a new component/asset | `uv run dg scaffold ...` | + +Asset-selection syntax is the standard Dagster one (`key`, `key*`, `*key`, +`group:`). A product's full graph is `sources → → geoserver`, +so `/geoserver` plus upstream covers the whole product. + +## Validating changes + +`dg check defs` currently fails here due to a `[tool.dg] root_module` layout +mismatch (the code lives at repo-root-level `orchestration/`, not nested). Until +that's resolved, validate that definitions load with the import smoke test: + +```bash +uv run python -c "import orchestration.definitions; print('ok')" +``` + +`uv run dg list defs` also exercises loading and is a good quick check. + +## Architecture (so changes land in the right place) + +- Products are declared in `config/products.yaml`. Each entry expands — via + `assets/products.py:build_product_assets` — into a per-source asset graph: + `sources/` → combine (``) → `/geoserver`. +- `definitions.py` wires assets, one asset job per product (`_job`), + schedules, and resources (`die_config`, `gcs`, `geoserver`, `io_manager`). +- Resources live in `resources/`. The backend DIE engine is the `nmuwd` package + (repo root `backend/`); orchestration only drives it. +- Per-source and geoserver assets soft-fail: errors surface as red asset checks + (WARN), not hard failures, so one dead source doesn't block a product. + +## Deploy + +Dagster+ serverless builds from the repo root per `dagster_cloud.yaml` +(`module_name: orchestration.definitions`). Runtime deps come from the root +`requirements.txt`. Secrets (GCS, GeoServer) are set as Dagster+ env vars, not +in code. From 60f120c13f638e30728a3dc62273604c9132826e Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 25 Jun 2026 23:23:28 -0600 Subject: [PATCH 4/4] Fix dg project layout: root project, root_module=orchestration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dg resolves a local module as / on disk. With the dg project at orchestration/ and root_module="orchestration", dg looked for orchestration/orchestration/ (nonexistent), so `dg check defs` failed and dg kept rewriting root_module to ".". The orchestration package lives at repo-root level (./orchestration/) — the same place the Dagster+ serverless build imports it from — so make the repo root the dg project with root_module="orchestration", which resolves to ./orchestration/. Remove the competing [tool.dg] block from orchestration/pyproject.toml. Suppress the venv-mismatch warning (dg runs via the orchestration venv through `uv run` while the project root is the repo root). `dg check defs` and `dg list defs` now succeed. Update AGENTS.md accordingly. Co-Authored-By: Claude Opus 4.8 --- orchestration/AGENTS.md | 23 ++++++++++++++++------- orchestration/pyproject.toml | 5 ----- pyproject.toml | 6 ++++++ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/orchestration/AGENTS.md b/orchestration/AGENTS.md index 3ca88cc..e3a88a5 100644 --- a/orchestration/AGENTS.md +++ b/orchestration/AGENTS.md @@ -7,13 +7,19 @@ publishes the DIE data products. This dir is a `dg` (Dagster CLI) project — se ## Use `dg` for Dagster operations Prefer the `dg` CLI over raw `dagster ...` or ad-hoc scripts for anything -Dagster-related (listing, running, validating, dev server). Run it through the -project venv: +Dagster-related (listing, running, validating, dev server). Run it from the +`orchestration/` directory so `uv` uses the venv that has dagster + dg: ```bash uv run dg # from the orchestration/ directory ``` +Note: the `dg` project root is the **repo root** (`[tool.dg]` in the top-level +`pyproject.toml`), because the `orchestration` package lives at repo-root level +(`./orchestration/`) — same place the Dagster+ serverless build imports it from. +`dg` discovers that project by walking up from `orchestration/`; just always run +via `uv run` from `orchestration/` and it resolves correctly. + Common operations: | Task | Command | @@ -30,15 +36,18 @@ so `/geoserver` plus upstream covers the whole product. ## Validating changes -`dg check defs` currently fails here due to a `[tool.dg] root_module` layout -mismatch (the code lives at repo-root-level `orchestration/`, not nested). Until -that's resolved, validate that definitions load with the import smoke test: +Validate that all definitions load and component YAML is valid: ```bash -uv run python -c "import orchestration.definitions; print('ok')" +uv run dg check defs # from orchestration/ ``` -`uv run dg list defs` also exercises loading and is a good quick check. +`uv run dg list defs` also exercises loading and is a good quick check. The +plain import smoke test still works as a fallback: + +```bash +uv run python -c "import orchestration.definitions; print('ok')" +``` ## Architecture (so changes land in the right place) diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml index 3f5adcf..a700805 100644 --- a/orchestration/pyproject.toml +++ b/orchestration/pyproject.toml @@ -27,8 +27,3 @@ nmuwd = { path = "../", editable = true } [tool.hatch.build.targets.wheel] packages = ["orchestration"] -[tool.dg] -directory_type="project" - -[tool.dg.project] -root_module="orchestration" diff --git a/pyproject.toml b/pyproject.toml index d48760e..3ea690d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,3 +52,9 @@ exclude = ["tests/archived/"] [tool.dg] directory_type = "project" + +[tool.dg.cli] +suppress_warnings = ["project_and_activated_venv_mismatch"] + +[tool.dg.project] +root_module = "orchestration"