Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,40 @@ def _build_assets(products_config: dict) -> list:
return assets


def _build_schedules(products_config: dict) -> list:
schedules = []
def _product_selection(pid: str) -> dg.AssetSelection:
# The full per-product graph: the geoserver leaf plus everything upstream of
# it (combine asset + source assets).
return dg.AssetSelection.keys(dg.AssetKey([pid, "geoserver"])).upstream()


def _products(products_config: dict):
for product in products_config["products"]:
if product.get("output_type") not in _SUPPORTED_OUTPUT_TYPES:
continue
if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES:
yield product


def _build_jobs(products_config: dict) -> dict:
"""One asset job per product, selecting that product's whole graph.
Returns {product_id: job} so schedules can target the job."""
jobs = {}
for product in _products(products_config):
pid = product["id"]
jobs[pid] = dg.define_asset_job(
name=f"{pid}_job",
selection=_product_selection(pid),
description=f"Materialize the {pid} data product (sources → combine → geoserver).",
)
return jobs


def _build_schedules(products_config: dict, jobs: dict) -> list:
schedules = []
for product in _products(products_config):
pid = product["id"]
schedules.append(
dg.ScheduleDefinition(
name=f"schedule_{pid}",
# Materialize the full per-product graph: the geoserver leaf plus
# everything upstream of it (combine asset + source assets).
target=dg.AssetSelection.keys(
dg.AssetKey([pid, "geoserver"])
).upstream(),
job=jobs[pid],
cron_schedule=product.get("schedule", "0 6 * * *"),
execution_timezone="America/Denver",
)
Expand All @@ -49,10 +69,12 @@ def _build_schedules(products_config: dict) -> list:

_products_config = _load_products()
_assets = _build_assets(_products_config)
_schedules = _build_schedules(_products_config)
_jobs = _build_jobs(_products_config)
_schedules = _build_schedules(_products_config, _jobs)

defs = dg.Definitions(
assets=_assets,
jobs=list(_jobs.values()),
schedules=_schedules,
resources={
"die_config": DIEConfigResource(),
Expand Down
Loading