diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 1cea661..c404ac9 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -26,20 +26,40 @@ def _build_assets(products_config: dict) -> list: return assets -def _build_schedules(products_config: dict) -> list: - schedules = [] +def _product_selection(pid: str) -> dg.AssetSelection: + # The full per-product graph: the geoserver leaf plus everything upstream of + # it (combine asset + source assets). + return dg.AssetSelection.keys(dg.AssetKey([pid, "geoserver"])).upstream() + + +def _products(products_config: dict): for product in products_config["products"]: - if product.get("output_type") not in _SUPPORTED_OUTPUT_TYPES: - continue + if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES: + yield product + + +def _build_jobs(products_config: dict) -> dict: + """One asset job per product, selecting that product's whole graph. + Returns {product_id: job} so schedules can target the job.""" + jobs = {} + for product in _products(products_config): + pid = product["id"] + jobs[pid] = dg.define_asset_job( + name=f"{pid}_job", + selection=_product_selection(pid), + description=f"Materialize the {pid} data product (sources → combine → geoserver).", + ) + return jobs + + +def _build_schedules(products_config: dict, jobs: dict) -> list: + schedules = [] + for product in _products(products_config): pid = product["id"] schedules.append( dg.ScheduleDefinition( name=f"schedule_{pid}", - # Materialize the full per-product graph: the geoserver leaf plus - # everything upstream of it (combine asset + source assets). - target=dg.AssetSelection.keys( - dg.AssetKey([pid, "geoserver"]) - ).upstream(), + job=jobs[pid], cron_schedule=product.get("schedule", "0 6 * * *"), execution_timezone="America/Denver", ) @@ -49,10 +69,12 @@ def _build_schedules(products_config: dict) -> list: _products_config = _load_products() _assets = _build_assets(_products_config) -_schedules = _build_schedules(_products_config) +_jobs = _build_jobs(_products_config) +_schedules = _build_schedules(_products_config, _jobs) defs = dg.Definitions( assets=_assets, + jobs=list(_jobs.values()), schedules=_schedules, resources={ "die_config": DIEConfigResource(),