From bf27be63c026dcf2b4ee84e54712e4421293ce7e Mon Sep 17 00:00:00 2001 From: jakeross Date: Sun, 28 Jun 2026 10:50:10 -0600 Subject: [PATCH 1/2] Add source_datastream_link to waterlevel summary + trend (st2) Link each summary/trend feature back to the raw, NON-NORMALIZED source series it was derived from. Scoped to st2 (SensorThings) sources, which expose a Datastream URL. - constants: SOURCE_DATASTREAM_LINK = "source_datastream_link" (source_ prefix, matching source_parameter_name/units = "as provided by source, pre-normalization"). - st2 source: build the Datastream URL ({url}/Datastreams({id})); set it on each observation record and, via the new _summary_extra hook, on the summary record. - source.RecordSummarizer: merge _summary_extra(cleaned) into the summary rec (default {}; only st2 overrides). - SummaryRecord gains the key, so summary features emit it. The trend dumper reads it from the well's observation dicts and adds it to the per-well feature. - NOT added to timeseries features (per scope). Offline tests cover summary + trend (present for st2, absent otherwise). Co-Authored-By: Claude Opus 4.8 --- backend/connectors/st2/source.py | 20 ++++++++++++ backend/constants.py | 7 ++++ backend/persisters/ogc_features.py | 14 ++++++++ backend/record.py | 2 ++ backend/source.py | 7 ++++ tests/test_persisters/test_ogc_features.py | 37 ++++++++++++++++++++++ 6 files changed, 87 insertions(+) diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index 3467a6d..b7924ec 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -46,6 +46,7 @@ PARAMETER_UNITS, SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, + SOURCE_DATASTREAM_LINK, ) URL = "https://st2.newmexicowaterdata.org/FROST-Server/v1.1" @@ -113,8 +114,27 @@ def _extract_parameter_record(self, record): record[DT_MEASURED] = record["observation"].phenomenon_time record[SOURCE_PARAMETER_NAME] = record["datastream"].name record[SOURCE_PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol + # Link to the raw, non-normalized SensorThings datastream these + # observations came from, so consumers can trace a feature back to the + # provider's original series (before DIE normalization). + record[SOURCE_DATASTREAM_LINK] = self._datastream_link(record["datastream"]) return record + def _datastream_link(self, datastream): + ds_id = getattr(datastream, "id", None) + if ds_id is None: + return None + return f"{self.url}/Datastreams({ds_id})" + + def _summary_extra(self, cleaned: list) -> dict: + # All of a well's observations come from the same datastream; link the + # summary feature back to it. + for r in cleaned: + link = self._datastream_link(r.get("datastream")) + if link: + return {SOURCE_DATASTREAM_LINK: link} + return {} + def _extract_source_parameter_results(self, records): return [r["observation"].result for r in records] diff --git a/backend/constants.py b/backend/constants.py index 889c718..52ab27a 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -60,6 +60,13 @@ SOURCE_PARAMETER_UNITS = "source_parameter_units" CONVERSION_FACTOR = "conversion_factor" +# Link (e.g. SensorThings Datastream URL) to the raw, NON-NORMALIZED source +# series a record came from — the provider's original observations before DIE +# applies unit/datum normalization. Named with the source_ prefix like +# source_parameter_name/source_parameter_units (same "as provided by source" +# meaning). Set by connectors that expose one (currently st2); None otherwise. +SOURCE_DATASTREAM_LINK = "source_datastream_link" + USGS_PCODE_30210 = "30210" USGS_PCODE_70300 = "70300" USGS_PCODE_70301 = "70301" diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 9787650..37486c0 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -356,6 +356,20 @@ def dump_waterlevel_trend_collection( "mk_tau": None if tau is None else round(tau, 4), } + # Link to the raw, non-normalized source datastream used for the + # calculation, when the source provides one (SensorThings/st2). Taken + # from the well's observations; absent for sources without one. + source_datastream_link = next( + ( + o.get("source_datastream_link") + for o in obs_list + if o.get("source_datastream_link") + ), + None, + ) + if source_datastream_link: + props["source_datastream_link"] = source_datastream_link + features.append({ "type": "Feature", "id": _feature_id(props["source"], props["id"]), diff --git a/backend/record.py b/backend/record.py index 43b9424..6befa6f 100644 --- a/backend/record.py +++ b/backend/record.py @@ -20,6 +20,7 @@ SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, CONVERSION_FACTOR, + SOURCE_DATASTREAM_LINK, FEET, ) @@ -145,6 +146,7 @@ class SummaryRecord(BaseRecord): "latest_time", "latest_value", "latest_units", + SOURCE_DATASTREAM_LINK, ) defaults: dict = {} diff --git a/backend/source.py b/backend/source.py index cbfe158..0ce0ef2 100644 --- a/backend/source.py +++ b/backend/source.py @@ -142,6 +142,7 @@ def summarize(self, site, cleaned: list): "latest_source_units": latest_result["source_parameter_units"], "latest_source_name": latest_result["source_parameter_name"], } + rec.update(s._summary_extra(cleaned)) return s.transformer.do_transform(rec, site) @@ -428,6 +429,12 @@ def _extract_site_records(self, records: list[dict], site_record) -> list: def _clean_records(self, records: list) -> list: return records + def _summary_extra(self, cleaned: list) -> dict: + """Extra fields to merge into a site's summary record. Default none; + sources with a non-normalized source-series link (e.g. st2 SensorThings) + override to add a source_datastream_link.""" + return {} + def _extract_terminal_record(self, records, position: str): raise NotImplementedError(f"{self.__class__.__name__} Must implement _extract_terminal_record") diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index 81214bb..7f10500 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -298,3 +298,40 @@ def test_downsamples_to_daily_min(self, tmp_path): assert props["observation_count"] == 3 assert props["record_count"] == 2 # two distinct days assert props["first_observation_datetime"].startswith("2020-01-01") + + +class TestSourceDatastreamLink: + def test_trend_feature_includes_source_datastream_link(self, tmp_path): + site = _trend_site(source="PVACD", rid="W1") + obs = [ + {**_trend_obs(f"{2010 + i}-01-01", 50.0 + 0.5 * i), + "source_datastream_link": "https://st2/FROST-Server/v1.1/Datastreams(42)"} + for i in range(12) + ] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + ) + assert ( + result["features"][0]["properties"]["source_datastream_link"] + == "https://st2/FROST-Server/v1.1/Datastreams(42)" + ) + + def test_trend_feature_omits_link_when_absent(self, tmp_path): + site = _trend_site(source="NWIS", rid="W2") + obs = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + ) + assert "source_datastream_link" not in result["features"][0]["properties"] + + def test_summary_feature_includes_source_datastream_link(self, tmp_path): + rec = _make_summary_record(source="PVACD", rid="W1") + rec.update(source_datastream_link="https://st2/Datastreams(9)") + out = tmp_path / "s.geojson" + result = dump_summary_collection(str(out), [rec], {"id": "nm_waterlevels_summary"}) + assert ( + result["features"][0]["properties"]["source_datastream_link"] + == "https://st2/Datastreams(9)" + ) From fb73713066818c2bf0161d7f95d65b77d9e3139f Mon Sep 17 00:00:00 2001 From: jakeross Date: Sun, 28 Jun 2026 10:51:17 -0600 Subject: [PATCH 2/2] Exclude tests/test_sources from default pytest collection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_sources make live calls to provider APIs — slow and flaky for default/CI runs. Add to norecursedirs (like tests/archived). Run explicitly with --override-ini="norecursedirs=" when needed. Co-Authored-By: Claude Opus 4.8 --- pyproject.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 44ea3aa..f6ebde1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,10 @@ packages = ["frontend", "backend"] [tool.pytest.ini_options] testpaths = ["tests"] -norecursedirs = ["tests/archived"] +# tests/test_sources hit live provider APIs over the network (slow, flaky); +# excluded from default collection. Run them explicitly when needed: +# uv run pytest tests/test_sources --override-ini="norecursedirs=" +norecursedirs = ["tests/archived", "tests/test_sources"] [tool.mypy] ignore_missing_imports = true