Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions backend/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ def get_analyte_search_param(parameter: str, mapping: dict) -> str:
# Base source classes
# =============================================================================

_FETCH_UNSET = object() # sentinel: site fetch not yet cached


class BaseSource:
transformer_klass = BaseTransformer # deprecated: pass transformer= to __init__

Expand All @@ -198,6 +201,25 @@ def __init__(self, transformer: Optional[BaseTransformer] = None, http_client: h
self.log = _l.log
self.warn = _l.warn
self.debug = _l.debug
# Opt-in shared-fetch cache. Off by default, so CLI/API behavior is
# unchanged. unify_source_both turns it on so a source unified for both
# summary and timeseries pulls the API only once (see
# backend/unifier.py:unify_source_both). The two passes issue identical
# fetches (same parameter/scope/dates), so the second reuses the first.
self._fetch_cache_enabled = False
self._records_cache: dict = {} # site-id key -> get_records() result
self._sites_cache = _FETCH_UNSET # BaseSiteSource.read() result

def _fetch_records(self, site_record):
"""get_records() with optional caching (see _fetch_cache_enabled). Keyed
by the site ids requested so repeated chunks reuse the same fetch."""
if not self._fetch_cache_enabled:
return self.get_records(site_record)
sites = site_record if isinstance(site_record, list) else [site_record]
key = tuple(sorted(str(getattr(s, "id", s)) for s in sites))
if key not in self._records_cache:
self._records_cache[key] = self.get_records(site_record)
return self._records_cache[key]

@property
def tag(self):
Expand Down Expand Up @@ -296,13 +318,19 @@ def intersects(self, wkt: str) -> bool:
return True

def read(self, *args, **kw) -> List[SiteRecord] | None:
if self._fetch_cache_enabled and self._sites_cache is not _FETCH_UNSET:
return self._sites_cache
self.log("Gathering site records")
records = self.get_records()
if records:
self.log(f"total records={len(records)}")
return self._transform_sites(records)
self.warn("No site records returned")
return None
result: List[SiteRecord] | None = self._transform_sites(records)
else:
self.warn("No site records returned")
result = None
if self._fetch_cache_enabled:
self._sites_cache = result
return result

def _transform_sites(self, records: list) -> List[SiteRecord]:
transformed_records: List[SiteRecord] = []
Expand Down Expand Up @@ -350,7 +378,7 @@ def read_summary(self, site_record: SiteRecord | list, start_ind: int, end_ind:
else:
self.log(f"{site_record.id}: Gathering {self.name} data")

all_records = self.get_records(site_record)
all_records = self._fetch_records(site_record)
if not all_records:
names = [str(r.id) for r in site_record] if isinstance(site_record, list) else [str(site_record.id)]
self.warn(f"{','.join(names)}: No records found")
Expand Down Expand Up @@ -380,7 +408,7 @@ def read_timeseries(self, site_record: SiteRecord | list) -> List[ParameterRecor
else:
self.log(f"{site_record.id}: Gathering {self.name} data")

all_records = self.get_records(site_record)
all_records = self._fetch_records(site_record)
if not all_records:
names = [str(r.id) for r in site_record] if isinstance(site_record, list) else [str(site_record.id)]
self.warn(f"{','.join(names)}: No records found")
Expand Down
53 changes: 53 additions & 0 deletions backend/unifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,59 @@ def unify_source(config, source_key):
return persister


def unify_source_both(config, source_key):
"""Unify a single source for BOTH summary and timeseries outputs while
fetching the source only once.

Each connector's ``get_records`` is mode-agnostic — summary and timeseries
both pull the same raw observations and differ only in how they are
transformed (see backend/source.py). Running ``unify_source`` twice would
therefore hit the API twice for identical data. This driver instead enables
the source's shared-fetch cache and runs the two transform passes over one
fetch, so a source needed by both a summary and a timeseries product is
pulled once.

Output is identical to calling ``unify_source`` twice (once per mode); only
the underlying fetch is shared. Returns ``(summary_persister,
timeseries_persister)``. Used by the orchestration shared source asset.
"""
config.validate()

pair = config.source_pair(source_key)
if pair is None:
config.warn(
f"Source {source_key!r} does not provide parameter {config.parameter!r}"
)
return make_persister(config), make_persister(config)

site_source, parameter_source = pair
# Share the site list and observation fetch across the two passes. The
# passes issue identical requests (same parameter/scope/dates), so the
# second reuses the first's cached fetch instead of re-querying.
site_source._fetch_cache_enabled = True
parameter_source._fetch_cache_enabled = True

# Timeseries pass first so its fetch primes the cache; the summary pass then
# transforms the same cached observations. output_summary is read live by
# the transformer, so toggling it here switches the record klass/fields per
# pass without rebuilding the source.
config.output_summary = False
timeseries_persister = make_persister(config)
config._persister = timeseries_persister
_site_wrapper(
site_source, parameter_source, timeseries_persister, config, raise_errors=True
)

config.output_summary = True
summary_persister = make_persister(config)
config._persister = summary_persister
_site_wrapper(
site_source, parameter_source, summary_persister, config, raise_errors=True
)

return summary_persister, timeseries_persister


def get_county_bounds(county):
config = Config()
config.county = county
Expand Down
140 changes: 75 additions & 65 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,39 @@

The graph has two layers wired through the GCS IO manager:

shared source assets per-product pipeline
["sources", param, mode, scope, k] ──▶ <product_id> ──▶ <product_id>/geoserver
(combine) (publish)

- **shared source assets** — keyed ``["sources", <parameter>, <mode>, <scope>,
<source_key>]``. One per *distinct* (parameter, mode, scope, source) tuple
across **all** products. ``mode`` is ``summary`` or ``timeseries`` (the
backend only distinguishes these two unification modes); ``scope`` encodes the
spatial filter (``state_NM`` / ``county_Bernalillo`` / ``all``). Because the
key is product-independent, every product that needs the same source under the
same parameter/mode/scope shares one asset — the unification runs **once** per
run instead of once per product. Each asset unifies a single parameter for a
single source and emits its records/sites/timeseries.
shared source assets per-product pipeline
["sources", param, scope, k] ──▶ <product_id> ──▶ <product_id>/geoserver
(combine) (publish)

- **shared source assets** — keyed ``["sources", <parameter>, <scope>,
<source_key>]``. One per *distinct* (parameter, scope, source) tuple across
**all** products. ``scope`` encodes the spatial filter (``state_NM`` /
``county_Bernalillo`` / ``all``). The source is fetched **once** and unified
for *both* summary and timeseries (see ``unify_source_both`` — every
connector's fetch is mode-agnostic, so summary and timeseries differ only in
how the same observations are transformed), so the asset carries records
(summary) and sites/timeseries together. Because the key is
product-independent *and* mode-independent, every product that needs the same
source under the same parameter/scope — whether a summary or a timeseries
product — shares one asset and one fetch.
- **combine asset** — keyed ``[product_id]``. Reads its source inputs back
(``ins``), merges them, writes the OGC GeoJSON collection, uploads to GCS.
(``ins``), takes the slice it needs (records for summary-type products,
sites/timeseries for timeseries-type), writes the OGC GeoJSON collection,
uploads to GCS.
- **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the
combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer.

Job layout (see ``definitions.py``) makes the sharing pay off **and** keeps each
run's lineage complete. Products are grouped into *cohorts* by
(group, mode, scope) — the products that can share source assets. One job per
cohort materializes that cohort's whole graph in a single run: each shared
source unifies once (it is one asset key, selected once), then every member
combine reads it back through the GCS IO manager and publishes. So a source is
never fetched twice in a run, while the full sources → combine → geoserver
lineage stays visible for every product. (Cross-product dedup requires the
sharing products to run together; that is exactly what a cohort is.)
run's lineage complete. Products are grouped into *cohorts* by (group, scope) —
the products that can share source assets. One job per cohort materializes that
cohort's whole graph in a single run: each shared source is fetched once (it is
one asset key, selected once), then every member combine reads it back through
the GCS IO manager and publishes. So a source is never fetched twice in a run —
not across products and not across summary/timeseries — while the full
sources → combine → geoserver lineage stays visible for every product.
(Cross-product dedup requires the sharing products to run together; that is
exactly what a cohort is, and is why summary + timeseries products now share a
cohort.)

Design notes:
- Source and geoserver assets never hard-fail. They catch their own errors and
Expand All @@ -40,21 +46,21 @@
combine asset rebuilds record objects before dumping.

Known limitation — per-analyte source fetches (potential future optimization):
Sharing is deduped at the ``(parameter, mode, scope, source)`` grain, which
collapses duplication *across products* (e.g. ``sulfate/summary/state_NM/wqp``
is one asset shared by nm_major_chemistry and nm_mcl_exceedance). It does NOT
Sharing is deduped at the ``(parameter, scope, source)`` grain, which
collapses duplication *across products* (e.g. ``sulfate/state_NM/wqp`` is one
asset shared by nm_major_chemistry and nm_mcl_exceedance) and *across modes*
(one ``wqp`` asset serves both summary and timeseries products). It does NOT
collapse *across analytes*: a source appears once per analyte (e.g. ``wqp``
has ~13 summary source assets, one per analyte). This is because the backend
unifies a single parameter per pass (``unify_source`` uses one
``config.parameter``), so each analyte is a separate sweep of the same wells
even though one provider query (WQP/AMP/...) typically returns all analytes at
once. Collapsing this would need a **backend** change — multi-analyte
unification that fetches a source once and emits per-analyte records — after
which the source key could drop ``parameter`` (e.g.
``["sources", "analytes", mode, scope, source]``) and the analyte combines
would each filter the shared multi-analyte payload. That is the bulk of the
remaining redundant API pulls for analyte products; it touches DIE core, not
this asset graph, so it is intentionally out of scope here.
has ~13 source assets, one per analyte). This is because the backend unifies a
single parameter per pass (``unify_source_both`` uses one ``config.parameter``),
so each analyte is a separate sweep of the same wells even though one provider
query (WQP/AMP/...) typically returns all analytes at once. Collapsing this
would need a **backend** change — multi-analyte unification that fetches a
source once and emits per-analyte records — after which the source key could
drop ``parameter`` (e.g. ``["sources", "analytes", scope, source]``) and the
analyte combines would each filter the shared multi-analyte payload. That is
the bulk of the remaining redundant API pulls for analyte products; it touches
DIE core, not this asset graph, so it is intentionally out of scope here.
"""
import tempfile
import traceback
Expand All @@ -77,7 +83,7 @@
dump_trend_collection,
)
from backend.record import ParameterRecord, SiteRecord, SummaryRecord
from backend.unifier import unify_source
from backend.unifier import unify_source_both
from orchestration.logging_bridge import forward_die_logs
from orchestration.resources.die_config import DIEConfigResource
from orchestration.resources.gcs import GCSResource
Expand All @@ -90,10 +96,6 @@
# truth for the ogc_mcl_exceedance product.
_MCL_KEY = "config/mcl.json"

# Output types whose unification runs in summary mode. The backend only
# distinguishes summary vs timeseries; everything else unifies as timeseries.
_SUMMARY_OUTPUT_TYPES = ("ogc_summary", "ogc_major_chemistry", "ogc_mcl_exceedance")

# Classic major-ion suite for the ogc_major_chemistry product. One feature per
# well, with each analyte's latest value/units/date as properties.
_MAJOR_CHEMISTRY = [
Expand All @@ -109,8 +111,12 @@

# A single shared source asset's identity. Two products that produce the same
# SourceSpec share one asset (the namedtuple is hashable, so dedup is just set
# membership). ``group`` follows the parameter, not the product.
SourceSpec = namedtuple("SourceSpec", "parameter mode scope source_key group")
# membership). ``group`` follows the parameter, not the product. There is no
# ``mode`` field: a source is fetched once and unified for *both* summary and
# timeseries (see build_shared_source_asset / unify_source_both), so a summary
# product and a timeseries product over the same (parameter, scope, source)
# share one asset and one fetch.
SourceSpec = namedtuple("SourceSpec", "parameter scope source_key group")


def _product_params(product: dict) -> list[str]:
Expand All @@ -126,11 +132,6 @@ def _product_params(product: dict) -> list[str]:
return [product["parameter"]]


def _product_mode(product: dict) -> str:
"""``summary`` or ``timeseries`` — the unification mode the product needs."""
return "summary" if product.get("output_type") in _SUMMARY_OUTPUT_TYPES else "timeseries"


def _spatial_scope(product: dict) -> str:
"""A stable string identity for the product's spatial filter. Sources with
different spatial extents unify to different results, so the extent is part
Expand Down Expand Up @@ -171,20 +172,19 @@ def _param_source_keys(product: dict, parameter: str) -> list[str]:

def product_source_specs(product: dict) -> list[SourceSpec]:
"""Every shared source asset this product depends on, one per
(parameter, source) pair. ``mode`` and ``scope`` are constant for a product;
the parameter and source vary. Returned in a stable order."""
mode = _product_mode(product)
(parameter, source) pair. ``scope`` is constant for a product; the parameter
and source vary. Returned in a stable order."""
scope = _spatial_scope(product)
specs: list[SourceSpec] = []
for param in _product_params(product):
group = _group_for_param(param)
for source_key in _param_source_keys(product, param):
specs.append(SourceSpec(param, mode, scope, source_key, group))
specs.append(SourceSpec(param, scope, source_key, group))
return specs


def shared_source_key(spec: SourceSpec) -> dg.AssetKey:
return dg.AssetKey(["sources", spec.parameter, spec.mode, spec.scope, spec.source_key])
return dg.AssetKey(["sources", spec.parameter, spec.scope, spec.source_key])


def _in_name(spec: SourceSpec) -> str:
Expand All @@ -196,19 +196,26 @@ def _in_name(spec: SourceSpec) -> str:


def build_shared_source_asset(spec: SourceSpec) -> dg.AssetsDefinition:
"""Build the shared asset that unifies one source for one (parameter, mode,
scope) — keyed product-independently so every product needing it shares it.
"""Build the shared asset that unifies one source for one (parameter, scope)
— keyed product-independently so every product needing it shares it.

The source is fetched once and unified for *both* summary and timeseries
(unify_source_both), so the asset carries records (summary) and
sites/timeseries together; summary and timeseries products over the same
(parameter, scope, source) share this one asset and one fetch.

The asset never raises: on failure it records the traceback and fails its
``returned_data`` check (WARN) instead, so a broken source does not block any
product's combine asset. Output ships as plain ``_payload`` dicts for
IO-manager pickling (see module docstring)."""
src_key = shared_source_key(spec)
# Synthetic product spec driving config: only parameter, mode, and spatial
# filter affect a single source's unification (sources include/exclude only
# selects which sources a product consumes — irrelevant here).
# Synthetic product spec driving config: only parameter and spatial filter
# affect a single source's unification (sources include/exclude only selects
# which sources a product consumes — irrelevant here; mode is handled by
# unify_source_both, which produces both). output_type is nominal — the
# driver toggles summary/timeseries itself.
synth_product = {
"output_type": "ogc_summary" if spec.mode == "summary" else "ogc_timeseries",
"output_type": "ogc_timeseries",
"spatial_filter": _scope_to_spatial_filter(spec.scope),
}

Expand All @@ -226,15 +233,19 @@ def _source_asset(
timeseries: list[list[dict]] = []
try:
# A source that doesn't provide this parameter is skipped by
# unify_source (source_pair → None).
# unify_source_both (source_pair → None).
with forward_die_logs(context):
config = die_config.get_config(synth_product, parameter=spec.parameter)
persister = unify_source(config, spec.source_key)
# One fetch, both modes: summary records + timeseries sites/obs.
summary_persister, timeseries_persister = unify_source_both(
config, spec.source_key
)
# Ship plain dicts across the IO manager; rebuild in combine.
records.extend(r._payload for r in persister.records)
sites.extend(s._payload for s in persister.sites)
records.extend(r._payload for r in summary_persister.records)
sites.extend(s._payload for s in timeseries_persister.sites)
timeseries.extend(
[o._payload for o in site_ts] for site_ts in persister.timeseries
[o._payload for o in site_ts]
for site_ts in timeseries_persister.timeseries
)
except Exception:
error = traceback.format_exc()
Expand All @@ -251,7 +262,6 @@ def _source_asset(
metadata={
"source": spec.source_key,
"parameter": spec.parameter,
"mode": spec.mode,
"scope": spec.scope,
"record_count": len(records),
"site_count": len(sites),
Expand Down
Loading
Loading