diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 2c5fc15..9c2f337 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -222,6 +222,14 @@ def build_shared_source_asset(spec: SourceSpec) -> dg.AssetsDefinition: @dg.asset( key=src_key, group_name=spec.group, + description=( + f"Shared source asset: unifies the **{spec.source_key}** source for " + f"**{spec.parameter}** over scope `{spec.scope}`. Fetched once and " + f"unified for both summary and timeseries, so it carries summary " + f"records plus timeseries sites/observations. Shared by every product " + f"that needs this (parameter, scope, source), so the source is pulled " + f"once per run regardless of how many products consume it." + ), check_specs=[dg.AssetCheckSpec(name=_CHECK_NAME, asset=src_key)], ) def _source_asset( @@ -298,7 +306,21 @@ def _build_combine_asset( output_type = product["output_type"] ins = {_in_name(spec): dg.AssetIn(key=shared_source_key(spec)) for spec in specs} - @dg.asset(key=dg.AssetKey(pid), group_name=group, ins=ins) + _combine_description = ( + f"**{product.get('title', pid)}** — product combine asset (`{output_type}`). " + f"{product.get('description', '').rstrip('.')}. Merges its " + f"{len(specs)} shared source asset(s), builds the OGC GeoJSON collection, " + f"and uploads it to GCS. Sources are read back from the GCS IO manager " + f"(materialized upstream in the same cohort run), so the combine never " + f"re-unifies them." + ) + + @dg.asset( + key=dg.AssetKey(pid), + group_name=group, + description=_combine_description, + ins=ins, + ) def _combine_asset( context: dg.AssetExecutionContext, gcs: GCSResource, @@ -427,6 +449,14 @@ def _build_geoserver_asset(product: dict, group: str) -> dg.AssetsDefinition: @dg.asset( key=gs_key, group_name=group, + description=( + f"GeoServer publish asset for **{product.get('title', pid)}**. " + f"Downloads the combined GeoJSON for `{pid}` from GCS, converts it to " + f"a GeoPackage, and publishes it as a GeoServer layer. Depends on the " + f"combine asset for ordering only (no data passed). Never raises — a " + f"publish failure fails the `{_GEOSERVER_CHECK_NAME}` check (WARN) " + f"rather than the run." + ), deps=[dg.AssetKey(pid)], check_specs=[dg.AssetCheckSpec(name=_GEOSERVER_CHECK_NAME, asset=gs_key)], )