Fetch each source once for both summary and timeseries unification#95
Merged
Conversation
A DIE source was unified separately per mode (summary vs timeseries), but every connector's get_records is mode-agnostic: both modes pull the same raw observations and differ only in how they are transformed. So a source needed by both a summary and a timeseries product was fetched from the API twice for identical data. Backend: add unify_source_both(config, source_key), which fetches a source once and unifies it for both modes. It enables an opt-in shared-fetch cache on the source (BaseSource._fetch_records / _sites_cache, off by default so the CLI/API path is byte-identical) and runs _site_wrapper twice with config.output_summary toggled — the second pass reuses the first pass's cached site list and observations instead of re-querying. Output is identical to running unify_source twice; only the underlying fetch is shared. Orchestration: drop `mode` from the shared source key and the cohort key. A source asset now calls unify_source_both and carries records (summary) + sites/timeseries together, so a summary product and a timeseries product over the same (parameter, scope, source) share one asset and one fetch. Summary and timeseries products consequently share a cohort (cohorts keyed by group+scope), which is what lets them run together and dedupe the fetch. Effect: shared source assets 86 -> 68 (-18 redundant fetches: waterlevels 9, arsenic 4, nitrate 5); cohort jobs 4 -> 2 (waterlevels_state_NM, analytes_state_NM). Per-analyte fetch multiplication is unchanged and remains the documented next-step backend optimization. Adds tests/test_unify_dual.py: proves unify_source_both fetches each source once and yields output identical to two separate unify_source runs, plus the fetch-cache invariants. dg check defs clean; 284 offline tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Your pull request is automatically being deployed to Dagster Cloud.
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
A DIE source was unified separately per mode (summary vs timeseries). But every connector's
get_recordsis mode-agnostic — both modes pull the same raw observations and differ only in how they're transformed. So a source needed by both a summary and a timeseries product hit the API twice for identical data.This makes a source get fetched once and unified for both modes.
How
Backend —
unify_source_both(config, source_key)(backend/unifier.py):BaseSource._fetch_records/_sites_cache, off by default → the CLI/API path is byte-identical)._site_wrappertwice withconfig.output_summarytoggled; the second pass reuses the first pass's cached site list + observations instead of re-querying.unify_sourcetwice — only the underlying fetch is shared.Orchestration — drop
modefrom the shared source key and the cohort key:unify_source_bothand carries records (summary) + sites/timeseries together.(parameter, scope, source)share one asset and one fetch.group+scope), which is what lets them run together and dedupe the fetch.Effect
waterlevels_state_NM,analytes_state_NM)Per-analyte fetch multiplication is unchanged and remains the documented next-step backend optimization.
Verification
tests/test_unify_dual.py(new): provesunify_source_bothfetches each source once and yields output identical to two separateunify_sourceruns, plus the fetch-cache invariants.dg check defsclean; 284 offline tests pass.🤖 Generated with Claude Code