From 5011cae329c8a58bf4988ebaa856cd7ba85abdc9 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sun, 28 Jun 2026 22:51:17 -0600 Subject: [PATCH] Fetch each source once for both summary and timeseries unification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A DIE source was unified separately per mode (summary vs timeseries), but every connector's get_records is mode-agnostic: both modes pull the same raw observations and differ only in how they are transformed. So a source needed by both a summary and a timeseries product was fetched from the API twice for identical data. Backend: add unify_source_both(config, source_key), which fetches a source once and unifies it for both modes. It enables an opt-in shared-fetch cache on the source (BaseSource._fetch_records / _sites_cache, off by default so the CLI/API path is byte-identical) and runs _site_wrapper twice with config.output_summary toggled — the second pass reuses the first pass's cached site list and observations instead of re-querying. Output is identical to running unify_source twice; only the underlying fetch is shared. Orchestration: drop `mode` from the shared source key and the cohort key. A source asset now calls unify_source_both and carries records (summary) + sites/timeseries together, so a summary product and a timeseries product over the same (parameter, scope, source) share one asset and one fetch. Summary and timeseries products consequently share a cohort (cohorts keyed by group+scope), which is what lets them run together and dedupe the fetch. Effect: shared source assets 86 -> 68 (-18 redundant fetches: waterlevels 9, arsenic 4, nitrate 5); cohort jobs 4 -> 2 (waterlevels_state_NM, analytes_state_NM). Per-analyte fetch multiplication is unchanged and remains the documented next-step backend optimization. Adds tests/test_unify_dual.py: proves unify_source_both fetches each source once and yields output identical to two separate unify_source runs, plus the fetch-cache invariants. dg check defs clean; 284 offline tests pass. Co-Authored-By: Claude Opus 4.8 --- backend/source.py | 38 +++++++- backend/unifier.py | 53 +++++++++++ orchestration/assets/products.py | 140 +++++++++++++++------------- orchestration/definitions.py | 33 ++++--- tests/test_unify_dual.py | 151 +++++++++++++++++++++++++++++++ 5 files changed, 330 insertions(+), 85 deletions(-) create mode 100644 tests/test_unify_dual.py diff --git a/backend/source.py b/backend/source.py index 0ce0ef2..534f192 100644 --- a/backend/source.py +++ b/backend/source.py @@ -188,6 +188,9 @@ def get_analyte_search_param(parameter: str, mapping: dict) -> str: # Base source classes # ============================================================================= +_FETCH_UNSET = object() # sentinel: site fetch not yet cached + + class BaseSource: transformer_klass = BaseTransformer # deprecated: pass transformer= to __init__ @@ -198,6 +201,25 @@ def __init__(self, transformer: Optional[BaseTransformer] = None, http_client: h self.log = _l.log self.warn = _l.warn self.debug = _l.debug + # Opt-in shared-fetch cache. Off by default, so CLI/API behavior is + # unchanged. unify_source_both turns it on so a source unified for both + # summary and timeseries pulls the API only once (see + # backend/unifier.py:unify_source_both). The two passes issue identical + # fetches (same parameter/scope/dates), so the second reuses the first. + self._fetch_cache_enabled = False + self._records_cache: dict = {} # site-id key -> get_records() result + self._sites_cache = _FETCH_UNSET # BaseSiteSource.read() result + + def _fetch_records(self, site_record): + """get_records() with optional caching (see _fetch_cache_enabled). Keyed + by the site ids requested so repeated chunks reuse the same fetch.""" + if not self._fetch_cache_enabled: + return self.get_records(site_record) + sites = site_record if isinstance(site_record, list) else [site_record] + key = tuple(sorted(str(getattr(s, "id", s)) for s in sites)) + if key not in self._records_cache: + self._records_cache[key] = self.get_records(site_record) + return self._records_cache[key] @property def tag(self): @@ -296,13 +318,19 @@ def intersects(self, wkt: str) -> bool: return True def read(self, *args, **kw) -> List[SiteRecord] | None: + if self._fetch_cache_enabled and self._sites_cache is not _FETCH_UNSET: + return self._sites_cache self.log("Gathering site records") records = self.get_records() if records: self.log(f"total records={len(records)}") - return self._transform_sites(records) - self.warn("No site records returned") - return None + result: List[SiteRecord] | None = self._transform_sites(records) + else: + self.warn("No site records returned") + result = None + if self._fetch_cache_enabled: + self._sites_cache = result + return result def _transform_sites(self, records: list) -> List[SiteRecord]: transformed_records: List[SiteRecord] = [] @@ -350,7 +378,7 @@ def read_summary(self, site_record: SiteRecord | list, start_ind: int, end_ind: else: self.log(f"{site_record.id}: Gathering {self.name} data") - all_records = self.get_records(site_record) + all_records = self._fetch_records(site_record) if not all_records: names = [str(r.id) for r in site_record] if isinstance(site_record, list) else [str(site_record.id)] self.warn(f"{','.join(names)}: No records found") @@ -380,7 +408,7 @@ def read_timeseries(self, site_record: SiteRecord | list) -> List[ParameterRecor else: self.log(f"{site_record.id}: Gathering {self.name} data") - all_records = self.get_records(site_record) + all_records = self._fetch_records(site_record) if not all_records: names = [str(r.id) for r in site_record] if isinstance(site_record, list) else [str(site_record.id)] self.warn(f"{','.join(names)}: No records found") diff --git a/backend/unifier.py b/backend/unifier.py index ba9f531..8c3c4cf 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -301,6 +301,59 @@ def unify_source(config, source_key): return persister +def unify_source_both(config, source_key): + """Unify a single source for BOTH summary and timeseries outputs while + fetching the source only once. + + Each connector's ``get_records`` is mode-agnostic — summary and timeseries + both pull the same raw observations and differ only in how they are + transformed (see backend/source.py). Running ``unify_source`` twice would + therefore hit the API twice for identical data. This driver instead enables + the source's shared-fetch cache and runs the two transform passes over one + fetch, so a source needed by both a summary and a timeseries product is + pulled once. + + Output is identical to calling ``unify_source`` twice (once per mode); only + the underlying fetch is shared. Returns ``(summary_persister, + timeseries_persister)``. Used by the orchestration shared source asset. + """ + config.validate() + + pair = config.source_pair(source_key) + if pair is None: + config.warn( + f"Source {source_key!r} does not provide parameter {config.parameter!r}" + ) + return make_persister(config), make_persister(config) + + site_source, parameter_source = pair + # Share the site list and observation fetch across the two passes. The + # passes issue identical requests (same parameter/scope/dates), so the + # second reuses the first's cached fetch instead of re-querying. + site_source._fetch_cache_enabled = True + parameter_source._fetch_cache_enabled = True + + # Timeseries pass first so its fetch primes the cache; the summary pass then + # transforms the same cached observations. output_summary is read live by + # the transformer, so toggling it here switches the record klass/fields per + # pass without rebuilding the source. + config.output_summary = False + timeseries_persister = make_persister(config) + config._persister = timeseries_persister + _site_wrapper( + site_source, parameter_source, timeseries_persister, config, raise_errors=True + ) + + config.output_summary = True + summary_persister = make_persister(config) + config._persister = summary_persister + _site_wrapper( + site_source, parameter_source, summary_persister, config, raise_errors=True + ) + + return summary_persister, timeseries_persister + + def get_county_bounds(county): config = Config() config.county = county diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index d296b68..2c5fc15 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -2,33 +2,39 @@ The graph has two layers wired through the GCS IO manager: - shared source assets per-product pipeline - ["sources", param, mode, scope, k] ──▶ ──▶ /geoserver - (combine) (publish) - -- **shared source assets** — keyed ``["sources", , , , - ]``. One per *distinct* (parameter, mode, scope, source) tuple - across **all** products. ``mode`` is ``summary`` or ``timeseries`` (the - backend only distinguishes these two unification modes); ``scope`` encodes the - spatial filter (``state_NM`` / ``county_Bernalillo`` / ``all``). Because the - key is product-independent, every product that needs the same source under the - same parameter/mode/scope shares one asset — the unification runs **once** per - run instead of once per product. Each asset unifies a single parameter for a - single source and emits its records/sites/timeseries. + shared source assets per-product pipeline + ["sources", param, scope, k] ──▶ ──▶ /geoserver + (combine) (publish) + +- **shared source assets** — keyed ``["sources", , , + ]``. One per *distinct* (parameter, scope, source) tuple across + **all** products. ``scope`` encodes the spatial filter (``state_NM`` / + ``county_Bernalillo`` / ``all``). The source is fetched **once** and unified + for *both* summary and timeseries (see ``unify_source_both`` — every + connector's fetch is mode-agnostic, so summary and timeseries differ only in + how the same observations are transformed), so the asset carries records + (summary) and sites/timeseries together. Because the key is + product-independent *and* mode-independent, every product that needs the same + source under the same parameter/scope — whether a summary or a timeseries + product — shares one asset and one fetch. - **combine asset** — keyed ``[product_id]``. Reads its source inputs back - (``ins``), merges them, writes the OGC GeoJSON collection, uploads to GCS. + (``ins``), takes the slice it needs (records for summary-type products, + sites/timeseries for timeseries-type), writes the OGC GeoJSON collection, + uploads to GCS. - **geoserver asset** — keyed ``[product_id, "geoserver"]``. Downloads the combined GeoJSON, converts to GeoPackage, publishes it as a GeoServer layer. Job layout (see ``definitions.py``) makes the sharing pay off **and** keeps each -run's lineage complete. Products are grouped into *cohorts* by -(group, mode, scope) — the products that can share source assets. One job per -cohort materializes that cohort's whole graph in a single run: each shared -source unifies once (it is one asset key, selected once), then every member -combine reads it back through the GCS IO manager and publishes. So a source is -never fetched twice in a run, while the full sources → combine → geoserver -lineage stays visible for every product. (Cross-product dedup requires the -sharing products to run together; that is exactly what a cohort is.) +run's lineage complete. Products are grouped into *cohorts* by (group, scope) — +the products that can share source assets. One job per cohort materializes that +cohort's whole graph in a single run: each shared source is fetched once (it is +one asset key, selected once), then every member combine reads it back through +the GCS IO manager and publishes. So a source is never fetched twice in a run — +not across products and not across summary/timeseries — while the full +sources → combine → geoserver lineage stays visible for every product. +(Cross-product dedup requires the sharing products to run together; that is +exactly what a cohort is, and is why summary + timeseries products now share a +cohort.) Design notes: - Source and geoserver assets never hard-fail. They catch their own errors and @@ -40,21 +46,21 @@ combine asset rebuilds record objects before dumping. Known limitation — per-analyte source fetches (potential future optimization): - Sharing is deduped at the ``(parameter, mode, scope, source)`` grain, which - collapses duplication *across products* (e.g. ``sulfate/summary/state_NM/wqp`` - is one asset shared by nm_major_chemistry and nm_mcl_exceedance). It does NOT + Sharing is deduped at the ``(parameter, scope, source)`` grain, which + collapses duplication *across products* (e.g. ``sulfate/state_NM/wqp`` is one + asset shared by nm_major_chemistry and nm_mcl_exceedance) and *across modes* + (one ``wqp`` asset serves both summary and timeseries products). It does NOT collapse *across analytes*: a source appears once per analyte (e.g. ``wqp`` - has ~13 summary source assets, one per analyte). This is because the backend - unifies a single parameter per pass (``unify_source`` uses one - ``config.parameter``), so each analyte is a separate sweep of the same wells - even though one provider query (WQP/AMP/...) typically returns all analytes at - once. Collapsing this would need a **backend** change — multi-analyte - unification that fetches a source once and emits per-analyte records — after - which the source key could drop ``parameter`` (e.g. - ``["sources", "analytes", mode, scope, source]``) and the analyte combines - would each filter the shared multi-analyte payload. That is the bulk of the - remaining redundant API pulls for analyte products; it touches DIE core, not - this asset graph, so it is intentionally out of scope here. + has ~13 source assets, one per analyte). This is because the backend unifies a + single parameter per pass (``unify_source_both`` uses one ``config.parameter``), + so each analyte is a separate sweep of the same wells even though one provider + query (WQP/AMP/...) typically returns all analytes at once. Collapsing this + would need a **backend** change — multi-analyte unification that fetches a + source once and emits per-analyte records — after which the source key could + drop ``parameter`` (e.g. ``["sources", "analytes", scope, source]``) and the + analyte combines would each filter the shared multi-analyte payload. That is + the bulk of the remaining redundant API pulls for analyte products; it touches + DIE core, not this asset graph, so it is intentionally out of scope here. """ import tempfile import traceback @@ -77,7 +83,7 @@ dump_trend_collection, ) from backend.record import ParameterRecord, SiteRecord, SummaryRecord -from backend.unifier import unify_source +from backend.unifier import unify_source_both from orchestration.logging_bridge import forward_die_logs from orchestration.resources.die_config import DIEConfigResource from orchestration.resources.gcs import GCSResource @@ -90,10 +96,6 @@ # truth for the ogc_mcl_exceedance product. _MCL_KEY = "config/mcl.json" -# Output types whose unification runs in summary mode. The backend only -# distinguishes summary vs timeseries; everything else unifies as timeseries. -_SUMMARY_OUTPUT_TYPES = ("ogc_summary", "ogc_major_chemistry", "ogc_mcl_exceedance") - # Classic major-ion suite for the ogc_major_chemistry product. One feature per # well, with each analyte's latest value/units/date as properties. _MAJOR_CHEMISTRY = [ @@ -109,8 +111,12 @@ # A single shared source asset's identity. Two products that produce the same # SourceSpec share one asset (the namedtuple is hashable, so dedup is just set -# membership). ``group`` follows the parameter, not the product. -SourceSpec = namedtuple("SourceSpec", "parameter mode scope source_key group") +# membership). ``group`` follows the parameter, not the product. There is no +# ``mode`` field: a source is fetched once and unified for *both* summary and +# timeseries (see build_shared_source_asset / unify_source_both), so a summary +# product and a timeseries product over the same (parameter, scope, source) +# share one asset and one fetch. +SourceSpec = namedtuple("SourceSpec", "parameter scope source_key group") def _product_params(product: dict) -> list[str]: @@ -126,11 +132,6 @@ def _product_params(product: dict) -> list[str]: return [product["parameter"]] -def _product_mode(product: dict) -> str: - """``summary`` or ``timeseries`` — the unification mode the product needs.""" - return "summary" if product.get("output_type") in _SUMMARY_OUTPUT_TYPES else "timeseries" - - def _spatial_scope(product: dict) -> str: """A stable string identity for the product's spatial filter. Sources with different spatial extents unify to different results, so the extent is part @@ -171,20 +172,19 @@ def _param_source_keys(product: dict, parameter: str) -> list[str]: def product_source_specs(product: dict) -> list[SourceSpec]: """Every shared source asset this product depends on, one per - (parameter, source) pair. ``mode`` and ``scope`` are constant for a product; - the parameter and source vary. Returned in a stable order.""" - mode = _product_mode(product) + (parameter, source) pair. ``scope`` is constant for a product; the parameter + and source vary. Returned in a stable order.""" scope = _spatial_scope(product) specs: list[SourceSpec] = [] for param in _product_params(product): group = _group_for_param(param) for source_key in _param_source_keys(product, param): - specs.append(SourceSpec(param, mode, scope, source_key, group)) + specs.append(SourceSpec(param, scope, source_key, group)) return specs def shared_source_key(spec: SourceSpec) -> dg.AssetKey: - return dg.AssetKey(["sources", spec.parameter, spec.mode, spec.scope, spec.source_key]) + return dg.AssetKey(["sources", spec.parameter, spec.scope, spec.source_key]) def _in_name(spec: SourceSpec) -> str: @@ -196,19 +196,26 @@ def _in_name(spec: SourceSpec) -> str: def build_shared_source_asset(spec: SourceSpec) -> dg.AssetsDefinition: - """Build the shared asset that unifies one source for one (parameter, mode, - scope) — keyed product-independently so every product needing it shares it. + """Build the shared asset that unifies one source for one (parameter, scope) + — keyed product-independently so every product needing it shares it. + + The source is fetched once and unified for *both* summary and timeseries + (unify_source_both), so the asset carries records (summary) and + sites/timeseries together; summary and timeseries products over the same + (parameter, scope, source) share this one asset and one fetch. The asset never raises: on failure it records the traceback and fails its ``returned_data`` check (WARN) instead, so a broken source does not block any product's combine asset. Output ships as plain ``_payload`` dicts for IO-manager pickling (see module docstring).""" src_key = shared_source_key(spec) - # Synthetic product spec driving config: only parameter, mode, and spatial - # filter affect a single source's unification (sources include/exclude only - # selects which sources a product consumes — irrelevant here). + # Synthetic product spec driving config: only parameter and spatial filter + # affect a single source's unification (sources include/exclude only selects + # which sources a product consumes — irrelevant here; mode is handled by + # unify_source_both, which produces both). output_type is nominal — the + # driver toggles summary/timeseries itself. synth_product = { - "output_type": "ogc_summary" if spec.mode == "summary" else "ogc_timeseries", + "output_type": "ogc_timeseries", "spatial_filter": _scope_to_spatial_filter(spec.scope), } @@ -226,15 +233,19 @@ def _source_asset( timeseries: list[list[dict]] = [] try: # A source that doesn't provide this parameter is skipped by - # unify_source (source_pair → None). + # unify_source_both (source_pair → None). with forward_die_logs(context): config = die_config.get_config(synth_product, parameter=spec.parameter) - persister = unify_source(config, spec.source_key) + # One fetch, both modes: summary records + timeseries sites/obs. + summary_persister, timeseries_persister = unify_source_both( + config, spec.source_key + ) # Ship plain dicts across the IO manager; rebuild in combine. - records.extend(r._payload for r in persister.records) - sites.extend(s._payload for s in persister.sites) + records.extend(r._payload for r in summary_persister.records) + sites.extend(s._payload for s in timeseries_persister.sites) timeseries.extend( - [o._payload for o in site_ts] for site_ts in persister.timeseries + [o._payload for o in site_ts] + for site_ts in timeseries_persister.timeseries ) except Exception: error = traceback.format_exc() @@ -251,7 +262,6 @@ def _source_asset( metadata={ "source": spec.source_key, "parameter": spec.parameter, - "mode": spec.mode, "scope": spec.scope, "record_count": len(records), "site_count": len(sites), diff --git a/orchestration/definitions.py b/orchestration/definitions.py index ba5fd2e..df02dfe 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -80,8 +80,9 @@ def _products(products_config: dict) -> Iterator[dict]: def _build_graph(products_config: dict): """Build every asset once. Source assets are shared across products: each - distinct (parameter, mode, scope, source) tuple becomes one asset, so a - source two products both need is unified once, not twice. + distinct (parameter, scope, source) tuple becomes one asset, fetched once and + unified for both summary and timeseries, so a source two products both need + (in either mode) is fetched once, not twice. Returns ``(source_assets, pipeline_assets, specs_by_pid, all_specs)`` where ``specs_by_pid`` maps a product id to the source specs its combine consumes @@ -105,22 +106,24 @@ def _build_graph(products_config: dict): return source_assets, pipeline_assets, specs_by_pid, all_specs -def _cohort_key(specs) -> tuple[str, str, str]: +def _cohort_key(specs) -> tuple[str, str]: """A cohort bundles the products that *can* share source assets — same - parameter group, unification mode, and spatial scope. Materializing a cohort - in one run is what lets each shared source unify once while every member's - full lineage (sources → combine → geoserver) stays visible in that run. - - These three fields are constant within a product (all of a product's specs - carry the same mode/scope, and its parameters are all one group), so any - spec is representative.""" + parameter group and spatial scope. Materializing a cohort in one run is what + lets each shared source unify once while every member's full lineage + (sources → combine → geoserver) stays visible in that run. + + A shared source is now fetched once for *both* summary and timeseries, so a + summary product and a timeseries product over the same group/scope share + source assets and must run together — hence mode is not part of the key. + Both fields are constant within a product (its scope is fixed and its + parameters are all one group), so any spec is representative.""" s = specs[0] - return (s.group, s.mode, s.scope) + return (s.group, s.scope) -def _cohort_name(key: tuple[str, str, str]) -> str: - group, mode, scope = key - return f"{group}_{mode}_{scope}" +def _cohort_name(key: tuple[str, str]) -> str: + group, scope = key + return f"{group}_{scope}" def _cron_sort_key(cron: str) -> tuple[int, int]: @@ -133,7 +136,7 @@ def _cron_sort_key(cron: str) -> tuple[int, int]: def _build_cohorts(products_config: dict, specs_by_pid: dict) -> dict: - """Group products into cohorts keyed by (group, mode, scope). Returns + """Group products into cohorts keyed by (group, scope). Returns ``{cohort_name: {"members": [pid, ...], "cron": str}}``; the cohort cron is the earliest member schedule (members run together, so they share one).""" cohorts: dict = {} diff --git a/tests/test_unify_dual.py b/tests/test_unify_dual.py new file mode 100644 index 0000000..ceaacca --- /dev/null +++ b/tests/test_unify_dual.py @@ -0,0 +1,151 @@ +"""Gate for the single-fetch dual unification (unify_source_both). + +Proves the new path produces output identical to running unify_source twice +(once per mode) while pulling each source's records only once. Uses lightweight +fake sources so the test is offline and deterministic; the transform internals +are unchanged and covered by the connector tests. +""" +from collections import namedtuple + +import pytest + +from backend.config import Config +from backend.record import ParameterRecord, SiteRecord, SummaryRecord +from backend.source import BaseParameterSource, BaseSiteSource, BaseTransformer +from backend.unifier import unify_source, unify_source_both + +_Site = namedtuple("_Site", "id") + + +class _FakeSiteSource(BaseSiteSource): + chunk_size = 1 + + def __init__(self): + super().__init__(transformer=BaseTransformer()) + self.get_records_calls = 0 + + def get_records(self, *a, **k): + self.get_records_calls += 1 + return [{"id": "W1"}] + + def _transform_sites(self, records): + s = SiteRecord({"source": "fake", "id": "W1", "latitude": 34.0, "longitude": -106.0}) + s.chunk_size = self.chunk_size + return [s] + + +class _FakeParamSource(BaseParameterSource): + def __init__(self): + super().__init__(transformer=BaseTransformer()) + self.get_records_calls = 0 + + def get_records(self, site_record, *a, **k): + # The mode-agnostic API pull. Counting proves it runs once when shared. + self.get_records_calls += 1 + return [{"id": "W1", "value": 1.0, "date": "2020-01-01"}] + + # Mode-specific transforms, kept trivial but driven off the shared fetch so + # the fetch cache is exercised exactly as the real reads exercise it. + def read_summary(self, site_record, start_ind, end_ind): + obs = self._fetch_records(site_record) + return [ + SummaryRecord({"source": "fake", "id": "W1", "nrecords": len(obs), "mean": 1.0}) + ] + + def read_timeseries(self, site_record): + obs = self._fetch_records(site_record) + site = SiteRecord({"source": "fake", "id": "W1"}) + recs = [ParameterRecord({"source": "fake", "id": "W1", "parameter_value": o["value"]}) for o in obs] + return [(site, recs)] + + +def _config(): + cfg = Config(payload={"yes": True}) + cfg.parameter = "waterlevels" + return cfg + + +@pytest.fixture +def patched_pair(monkeypatch): + """Make config.source_pair return fresh fakes, and report the param source + so a test can read its fetch count.""" + holder = {} + + def fake_pair(self, source_key): + site = _FakeSiteSource() + param = _FakeParamSource() + site.set_config(self) + param.set_config(self) + holder["site"] = site + holder["param"] = param + return site, param + + monkeypatch.setattr(Config, "source_pair", fake_pair) + return holder + + +class TestUnifySourceBoth: + def test_fetches_source_once(self, patched_pair): + cfg = _config() + summary_p, ts_p = unify_source_both(cfg, "fake") + # One observation fetch and one site fetch shared across both passes. + assert patched_pair["param"].get_records_calls == 1 + assert patched_pair["site"].get_records_calls == 1 + # Both outputs populated. + assert len(summary_p.records) == 1 + assert len(ts_p.sites) == 1 and len(ts_p.timeseries) == 1 + + def test_output_identical_to_two_single_runs(self, patched_pair): + # dual + dual_summary, dual_ts = unify_source_both(_config(), "fake") + # two separate runs (cache off path) + cfg_s = _config(); cfg_s.output_summary = True + single_summary = unify_source(cfg_s, "fake") + cfg_t = _config(); cfg_t.output_summary = False + single_ts = unify_source(cfg_t, "fake") + + def payloads(recs): + return [r._payload for r in recs] + + assert payloads(dual_summary.records) == payloads(single_summary.records) + assert payloads(dual_ts.sites) == payloads(single_ts.sites) + assert [payloads(t) for t in dual_ts.timeseries] == [ + payloads(t) for t in single_ts.timeseries + ] + + def test_single_run_fetches_each_time(self, patched_pair): + # Control: a normal single unify (cache disabled) fetches on its one pass. + cfg = _config(); cfg.output_summary = True + unify_source(cfg, "fake") + assert patched_pair["param"].get_records_calls == 1 + + +class TestFetchCache: + def test_disabled_calls_get_records_each_time(self): + src = _FakeParamSource() + src._fetch_records([_Site("W1")]) + src._fetch_records([_Site("W1")]) + assert src.get_records_calls == 2 + + def test_enabled_shares_by_site_key(self): + src = _FakeParamSource() + src._fetch_cache_enabled = True + a = src._fetch_records([_Site("W1")]) + b = src._fetch_records([_Site("W1")]) + assert src.get_records_calls == 1 + assert a is b + + def test_enabled_distinct_keys_refetch(self): + src = _FakeParamSource() + src._fetch_cache_enabled = True + src._fetch_records([_Site("W1")]) + src._fetch_records([_Site("W2")]) + assert src.get_records_calls == 2 + + def test_site_source_read_cached(self): + site = _FakeSiteSource() + site._fetch_cache_enabled = True + r1 = site.read() + r2 = site.read() + assert site.get_records_calls == 1 + assert r1 is r2