Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions backend/connectors/st2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
PARAMETER_UNITS,
SOURCE_PARAMETER_NAME,
SOURCE_PARAMETER_UNITS,
SOURCE_DATASTREAM_LINK,
)

URL = "https://st2.newmexicowaterdata.org/FROST-Server/v1.1"
Expand Down Expand Up @@ -113,8 +114,27 @@ def _extract_parameter_record(self, record):
record[DT_MEASURED] = record["observation"].phenomenon_time
record[SOURCE_PARAMETER_NAME] = record["datastream"].name
record[SOURCE_PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol
# Link to the raw, non-normalized SensorThings datastream these
# observations came from, so consumers can trace a feature back to the
# provider's original series (before DIE normalization).
record[SOURCE_DATASTREAM_LINK] = self._datastream_link(record["datastream"])
return record

def _datastream_link(self, datastream):
ds_id = getattr(datastream, "id", None)
if ds_id is None:
return None
return f"{self.url}/Datastreams({ds_id})"

def _summary_extra(self, cleaned: list) -> dict:
# All of a well's observations come from the same datastream; link the
# summary feature back to it.
for r in cleaned:
link = self._datastream_link(r.get("datastream"))
if link:
return {SOURCE_DATASTREAM_LINK: link}
return {}

def _extract_source_parameter_results(self, records):
return [r["observation"].result for r in records]

Expand Down
7 changes: 7 additions & 0 deletions backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@
SOURCE_PARAMETER_UNITS = "source_parameter_units"
CONVERSION_FACTOR = "conversion_factor"

# Link (e.g. SensorThings Datastream URL) to the raw, NON-NORMALIZED source
# series a record came from — the provider's original observations before DIE
# applies unit/datum normalization. Named with the source_ prefix like
# source_parameter_name/source_parameter_units (same "as provided by source"
# meaning). Set by connectors that expose one (currently st2); None otherwise.
SOURCE_DATASTREAM_LINK = "source_datastream_link"

USGS_PCODE_30210 = "30210"
USGS_PCODE_70300 = "70300"
USGS_PCODE_70301 = "70301"
Expand Down
14 changes: 14 additions & 0 deletions backend/persisters/ogc_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,20 @@ def dump_waterlevel_trend_collection(
"mk_tau": None if tau is None else round(tau, 4),
}

# Link to the raw, non-normalized source datastream used for the
# calculation, when the source provides one (SensorThings/st2). Taken
# from the well's observations; absent for sources without one.
source_datastream_link = next(
(
o.get("source_datastream_link")
for o in obs_list
if o.get("source_datastream_link")
),
None,
)
if source_datastream_link:
props["source_datastream_link"] = source_datastream_link

features.append({
"type": "Feature",
"id": _feature_id(props["source"], props["id"]),
Expand Down
2 changes: 2 additions & 0 deletions backend/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
SOURCE_PARAMETER_NAME,
SOURCE_PARAMETER_UNITS,
CONVERSION_FACTOR,
SOURCE_DATASTREAM_LINK,
FEET,
)

Expand Down Expand Up @@ -145,6 +146,7 @@ class SummaryRecord(BaseRecord):
"latest_time",
"latest_value",
"latest_units",
SOURCE_DATASTREAM_LINK,
)
defaults: dict = {}

Expand Down
7 changes: 7 additions & 0 deletions backend/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def summarize(self, site, cleaned: list):
"latest_source_units": latest_result["source_parameter_units"],
"latest_source_name": latest_result["source_parameter_name"],
}
rec.update(s._summary_extra(cleaned))
return s.transformer.do_transform(rec, site)


Expand Down Expand Up @@ -428,6 +429,12 @@ def _extract_site_records(self, records: list[dict], site_record) -> list:
def _clean_records(self, records: list) -> list:
return records

def _summary_extra(self, cleaned: list) -> dict:
"""Extra fields to merge into a site's summary record. Default none;
sources with a non-normalized source-series link (e.g. st2 SensorThings)
override to add a source_datastream_link."""
return {}

def _extract_terminal_record(self, records, position: str):
raise NotImplementedError(f"{self.__class__.__name__} Must implement _extract_terminal_record")

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ packages = ["frontend", "backend"]

[tool.pytest.ini_options]
testpaths = ["tests"]
norecursedirs = ["tests/archived"]
# tests/test_sources hit live provider APIs over the network (slow, flaky);
# excluded from default collection. Run them explicitly when needed:
# uv run pytest tests/test_sources --override-ini="norecursedirs="
norecursedirs = ["tests/archived", "tests/test_sources"]

[tool.mypy]
ignore_missing_imports = true
Expand Down
37 changes: 37 additions & 0 deletions tests/test_persisters/test_ogc_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,40 @@ def test_downsamples_to_daily_min(self, tmp_path):
assert props["observation_count"] == 3
assert props["record_count"] == 2 # two distinct days
assert props["first_observation_datetime"].startswith("2020-01-01")


class TestSourceDatastreamLink:
def test_trend_feature_includes_source_datastream_link(self, tmp_path):
site = _trend_site(source="PVACD", rid="W1")
obs = [
{**_trend_obs(f"{2010 + i}-01-01", 50.0 + 0.5 * i),
"source_datastream_link": "https://st2/FROST-Server/v1.1/Datastreams(42)"}
for i in range(12)
]
out = tmp_path / "tr.geojson"
result = dump_waterlevel_trend_collection(
str(out), [site], [obs], {"id": "nm_waterlevel_trends"}
)
assert (
result["features"][0]["properties"]["source_datastream_link"]
== "https://st2/FROST-Server/v1.1/Datastreams(42)"
)

def test_trend_feature_omits_link_when_absent(self, tmp_path):
site = _trend_site(source="NWIS", rid="W2")
obs = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)]
out = tmp_path / "tr.geojson"
result = dump_waterlevel_trend_collection(
str(out), [site], [obs], {"id": "nm_waterlevel_trends"}
)
assert "source_datastream_link" not in result["features"][0]["properties"]

def test_summary_feature_includes_source_datastream_link(self, tmp_path):
rec = _make_summary_record(source="PVACD", rid="W1")
rec.update(source_datastream_link="https://st2/Datastreams(9)")
out = tmp_path / "s.geojson"
result = dump_summary_collection(str(out), [rec], {"id": "nm_waterlevels_summary"})
assert (
result["features"][0]["properties"]["source_datastream_link"]
== "https://st2/Datastreams(9)"
)
Loading