diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index 3467a6d..b7924ec 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -46,6 +46,7 @@ PARAMETER_UNITS, SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, + SOURCE_DATASTREAM_LINK, ) URL = "https://st2.newmexicowaterdata.org/FROST-Server/v1.1" @@ -113,8 +114,27 @@ def _extract_parameter_record(self, record): record[DT_MEASURED] = record["observation"].phenomenon_time record[SOURCE_PARAMETER_NAME] = record["datastream"].name record[SOURCE_PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol + # Link to the raw, non-normalized SensorThings datastream these + # observations came from, so consumers can trace a feature back to the + # provider's original series (before DIE normalization). + record[SOURCE_DATASTREAM_LINK] = self._datastream_link(record["datastream"]) return record + def _datastream_link(self, datastream): + ds_id = getattr(datastream, "id", None) + if ds_id is None: + return None + return f"{self.url}/Datastreams({ds_id})" + + def _summary_extra(self, cleaned: list) -> dict: + # All of a well's observations come from the same datastream; link the + # summary feature back to it. + for r in cleaned: + link = self._datastream_link(r.get("datastream")) + if link: + return {SOURCE_DATASTREAM_LINK: link} + return {} + def _extract_source_parameter_results(self, records): return [r["observation"].result for r in records] diff --git a/backend/constants.py b/backend/constants.py index 889c718..52ab27a 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -60,6 +60,13 @@ SOURCE_PARAMETER_UNITS = "source_parameter_units" CONVERSION_FACTOR = "conversion_factor" +# Link (e.g. SensorThings Datastream URL) to the raw, NON-NORMALIZED source +# series a record came from — the provider's original observations before DIE +# applies unit/datum normalization. Named with the source_ prefix like +# source_parameter_name/source_parameter_units (same "as provided by source" +# meaning). Set by connectors that expose one (currently st2); None otherwise. +SOURCE_DATASTREAM_LINK = "source_datastream_link" + USGS_PCODE_30210 = "30210" USGS_PCODE_70300 = "70300" USGS_PCODE_70301 = "70301" diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 9787650..37486c0 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -356,6 +356,20 @@ def dump_waterlevel_trend_collection( "mk_tau": None if tau is None else round(tau, 4), } + # Link to the raw, non-normalized source datastream used for the + # calculation, when the source provides one (SensorThings/st2). Taken + # from the well's observations; absent for sources without one. + source_datastream_link = next( + ( + o.get("source_datastream_link") + for o in obs_list + if o.get("source_datastream_link") + ), + None, + ) + if source_datastream_link: + props["source_datastream_link"] = source_datastream_link + features.append({ "type": "Feature", "id": _feature_id(props["source"], props["id"]), diff --git a/backend/record.py b/backend/record.py index 43b9424..6befa6f 100644 --- a/backend/record.py +++ b/backend/record.py @@ -20,6 +20,7 @@ SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, CONVERSION_FACTOR, + SOURCE_DATASTREAM_LINK, FEET, ) @@ -145,6 +146,7 @@ class SummaryRecord(BaseRecord): "latest_time", "latest_value", "latest_units", + SOURCE_DATASTREAM_LINK, ) defaults: dict = {} diff --git a/backend/source.py b/backend/source.py index cbfe158..0ce0ef2 100644 --- a/backend/source.py +++ b/backend/source.py @@ -142,6 +142,7 @@ def summarize(self, site, cleaned: list): "latest_source_units": latest_result["source_parameter_units"], "latest_source_name": latest_result["source_parameter_name"], } + rec.update(s._summary_extra(cleaned)) return s.transformer.do_transform(rec, site) @@ -428,6 +429,12 @@ def _extract_site_records(self, records: list[dict], site_record) -> list: def _clean_records(self, records: list) -> list: return records + def _summary_extra(self, cleaned: list) -> dict: + """Extra fields to merge into a site's summary record. Default none; + sources with a non-normalized source-series link (e.g. st2 SensorThings) + override to add a source_datastream_link.""" + return {} + def _extract_terminal_record(self, records, position: str): raise NotImplementedError(f"{self.__class__.__name__} Must implement _extract_terminal_record") diff --git a/pyproject.toml b/pyproject.toml index 44ea3aa..f6ebde1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,10 @@ packages = ["frontend", "backend"] [tool.pytest.ini_options] testpaths = ["tests"] -norecursedirs = ["tests/archived"] +# tests/test_sources hit live provider APIs over the network (slow, flaky); +# excluded from default collection. Run them explicitly when needed: +# uv run pytest tests/test_sources --override-ini="norecursedirs=" +norecursedirs = ["tests/archived", "tests/test_sources"] [tool.mypy] ignore_missing_imports = true diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index 81214bb..7f10500 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -298,3 +298,40 @@ def test_downsamples_to_daily_min(self, tmp_path): assert props["observation_count"] == 3 assert props["record_count"] == 2 # two distinct days assert props["first_observation_datetime"].startswith("2020-01-01") + + +class TestSourceDatastreamLink: + def test_trend_feature_includes_source_datastream_link(self, tmp_path): + site = _trend_site(source="PVACD", rid="W1") + obs = [ + {**_trend_obs(f"{2010 + i}-01-01", 50.0 + 0.5 * i), + "source_datastream_link": "https://st2/FROST-Server/v1.1/Datastreams(42)"} + for i in range(12) + ] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + ) + assert ( + result["features"][0]["properties"]["source_datastream_link"] + == "https://st2/FROST-Server/v1.1/Datastreams(42)" + ) + + def test_trend_feature_omits_link_when_absent(self, tmp_path): + site = _trend_site(source="NWIS", rid="W2") + obs = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + ) + assert "source_datastream_link" not in result["features"][0]["properties"] + + def test_summary_feature_includes_source_datastream_link(self, tmp_path): + rec = _make_summary_record(source="PVACD", rid="W1") + rec.update(source_datastream_link="https://st2/Datastreams(9)") + out = tmp_path / "s.geojson" + result = dump_summary_collection(str(out), [rec], {"id": "nm_waterlevels_summary"}) + assert ( + result["features"][0]["properties"]["source_datastream_link"] + == "https://st2/Datastreams(9)" + )