Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 229 additions & 87 deletions backend/persisters/ogc_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,72 +11,167 @@
from datetime import datetime, timezone
from typing import Optional

# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope
# in ft/year.
_SECONDS_PER_YEAR = 31557600.0

# A well is classified only when it has enough data for a meaningful
# Mann-Kendall test: at least 10 measurements, or at least 4 spanning >= 2 years.
_TREND_MIN_RECORDS = 10
_TREND_MIN_RECORDS_WITH_SPAN = 4
_TREND_MIN_SPAN_YEARS = 2.0
_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test

# Human-readable description of the trend method, embedded in the product so
# consumers know how the classification was derived.
TREND_METHOD_DESCRIPTION = (
"Depth-to-water trend per well. Monotonic trend is tested with the "
"non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) "
"on depth-to-water-below-ground-surface (feet) ordered by observation time. "
"The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A "
"well is classified only when it has at least 10 measurements, or at least 4 "
"measurements spanning at least 2 years; otherwise 'not enough data'. When "
"classified: a statistically significant increasing trend is 'increasing' "
"(water level getting DEEPER, i.e. a declining water table), a significant "
"decreasing trend is 'decreasing' (water level getting SHALLOWER), and no "
"significant trend is 'stable'. Mirrors the intent of the Ocotillo API "
"ogc_depth_to_water_trend_wells materialized view, using Mann-Kendall + "
"Theil-Sen instead of ordinary least squares."
)


def _timestamp_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _make_feature(record, collection_id: str) -> dict:
"""Build one OGC-compliant Feature from a SummaryRecord or SiteRecord."""
source = getattr(record, "source", "")
rid = getattr(record, "id", "")
feature_id = f"{source}:{rid}" if source and rid else str(rid)
def _iso_utc(epoch) -> Optional[str]:
"""Format POSIX seconds as an ISO-8601 UTC string (None passes through)."""
if epoch is None:
return None
return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

props = {k: getattr(record, k) for k in record.keys
if k not in ("latitude", "longitude", "elevation")}

lat = getattr(record, "latitude", None)
lon = getattr(record, "longitude", None)
elev = getattr(record, "elevation", None)
def _feature_id(source, rid) -> str:
source = source or ""
rid = rid or ""
return f"{source}:{rid}" if source and rid else str(rid)

coords = [lon, lat] if elev is None else [lon, lat, elev]

return {
"type": "Feature",
"id": feature_id,
"geometry": {"type": "Point", "coordinates": coords},
"properties": props,
}

def _point_geometry(lat, lon, elev=None) -> dict:
coords = [lon, lat] if elev is None else [lon, lat, elev]
return {"type": "Point", "coordinates": coords}

def dump_summary_collection(path: str, records: list, meta: dict) -> dict:
"""
Write an OGC FeatureCollection of summary/site records to *path*.

meta keys (all optional):
id, title, description — collection metadata
def _dump_collection(
path: str, collection_id: str, features: list, meta: dict, extra: Optional[dict] = None
) -> dict:
"""Build the OGC FeatureCollection envelope around *features*, write it to
*path*, and return it. *extra* injects collection-level keys (e.g.
trend_method) after description.

Returns the collection dict (for testing).
§V: MUST include top-level id, type, numberReturned, timeStamp.
§V: Each Feature MUST have top-level id.
"""
collection_id = meta.get("id", "collection")
features = [_make_feature(r, collection_id) for r in records]

collection = {
"type": "FeatureCollection",
"id": collection_id,
"title": meta.get("title", collection_id),
"description": meta.get("description", ""),
**(extra or {}),
"timeStamp": _timestamp_now(),
"numberMatched": len(features),
"numberReturned": len(features),
"links": [
{
"href": meta.get("href", ""),
"rel": "self",
"type": "application/geo+json",
}
{"href": meta.get("href", ""), "rel": "self", "type": "application/geo+json"}
],
"features": features,
}

with open(path, "w", encoding="utf-8") as f:
json.dump(collection, f, indent=2, default=str)

return collection


def _parse_epoch_seconds(date, time) -> Optional[float]:
"""Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC)."""
if not date:
return None
text = f"{date}T{time}" if time else str(date)
text = text.replace("Z", "")
for parse in (datetime.fromisoformat,):
try:
dt = parse(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.timestamp()
except (ValueError, TypeError):
pass
try:
dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc)
return dt.timestamp()
except (ValueError, TypeError):
return None


def _qualifies_for_trend(record_count, span_years) -> bool:
return record_count >= _TREND_MIN_RECORDS or (
record_count >= _TREND_MIN_RECORDS_WITH_SPAN
and span_years >= _TREND_MIN_SPAN_YEARS
)


def _mann_kendall_trend(years: list, values: list):
"""Run the Mann-Kendall trend test + Theil-Sen slope.

Returns (trend_category, slope_ft_per_year, p_value, tau). *years* are
decimal years, *values* depth-to-water (ft), both ordered by time.
trend_category is one of 'increasing' / 'decreasing' / 'stable'.
"""
import pymannkendall as mk
from scipy.stats import theilslopes

result = mk.original_test(values, alpha=_TREND_ALPHA)
# Time-aware Theil-Sen slope (ft/year) — robust and correct for the
# irregular sampling typical of water-level records, unlike MK's index-based
# slope which assumes unit spacing.
slope_ft_per_year = float(theilslopes(values, years)[0])

# mk trend is 'increasing' / 'decreasing' / 'no trend'.
category = "stable" if result.trend == "no trend" else result.trend
return category, slope_ft_per_year, float(result.p), float(result.Tau)


def _make_feature(record, collection_id: str) -> dict:
"""Build one OGC-compliant Feature from a SummaryRecord or SiteRecord."""
props = {k: getattr(record, k) for k in record.keys
if k not in ("latitude", "longitude", "elevation")}

return {
"type": "Feature",
"id": _feature_id(getattr(record, "source", ""), getattr(record, "id", "")),
"geometry": _point_geometry(
getattr(record, "latitude", None),
getattr(record, "longitude", None),
getattr(record, "elevation", None),
),
"properties": props,
}


def dump_summary_collection(path: str, records: list, meta: dict) -> dict:
"""
Write an OGC FeatureCollection of summary/site records to *path*.

meta keys (all optional):
id, title, description — collection metadata

Returns the collection dict (for testing).
§V: MUST include top-level id, type, numberReturned, timeStamp.
§V: Each Feature MUST have top-level id.
"""
collection_id = meta.get("id", "collection")
features = [_make_feature(r, collection_id) for r in records]
return _dump_collection(path, collection_id, features, meta)


def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dict:
"""
Write an OGC FeatureCollection of wells with major-chemistry analytes as
Expand Down Expand Up @@ -128,7 +223,6 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic

features = []
for (source, rid), well in wells.items():
feature_id = f"{source}:{rid}" if source and rid else str(rid)
props = {
"source": source,
"id": rid,
Expand All @@ -141,37 +235,106 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic
props[f"{analyte}_units"] = vals["units"]
props[f"{analyte}_date"] = vals["date"]

lat, lon, elev = well["latitude"], well["longitude"], well["elevation"]
coords = [lon, lat] if elev is None else [lon, lat, elev]
features.append({
"type": "Feature",
"id": feature_id,
"geometry": {"type": "Point", "coordinates": coords},
"id": _feature_id(source, rid),
"geometry": _point_geometry(well["latitude"], well["longitude"], well["elevation"]),
"properties": props,
})

collection = {
"type": "FeatureCollection",
"id": collection_id,
"title": meta.get("title", collection_id),
"description": meta.get("description", ""),
"timeStamp": _timestamp_now(),
"numberMatched": len(features),
"numberReturned": len(features),
"links": [
{
"href": meta.get("href", ""),
"rel": "self",
"type": "application/geo+json",
}
],
"features": features,
}
return _dump_collection(path, collection_id, features, meta)

with open(path, "w", encoding="utf-8") as f:
json.dump(collection, f, indent=2, default=str)

return collection
def dump_waterlevel_trend_collection(
path: str,
site_records: list,
timeseries_records: list,
meta: dict,
) -> dict:
"""
Write an OGC FeatureCollection of per-well depth-to-water trends to *path*.
One Feature per well.

*site_records* and *timeseries_records* are index-aligned: ``site_records[i]``
is the well and ``timeseries_records[i]`` is its list of ParameterRecord
observations (DIE water-level values are already depth-to-water below ground
surface in feet, so no measuring-point adjustment is applied here).

Each feature carries: record_count, first/last_observation_datetime,
span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall),
mk_p_value, mk_tau, well_depth(+units). The collection carries
``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION.

§V: MUST include top-level id, type, numberReturned, timeStamp.
§V: Each Feature MUST have top-level id.
"""
collection_id = meta.get("id", "collection")

features = []
for site, obs_list in zip(site_records, timeseries_records):
pairs = []
for obs in obs_list:
value = getattr(obs, "parameter_value", None)
epoch = _parse_epoch_seconds(
getattr(obs, "date_measured", None), getattr(obs, "time_measured", None)
)
if value is None or epoch is None:
continue
try:
pairs.append((epoch, float(value)))
except (TypeError, ValueError):
continue

pairs.sort(key=lambda p: p[0])
record_count = len(pairs)
xs = [p[0] for p in pairs]
ys = [p[1] for p in pairs]
span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR if record_count >= 2 else 0.0

slope_ft_per_year = None
p_value = None
tau = None
if _qualifies_for_trend(record_count, span_years):
years = [x / _SECONDS_PER_YEAR for x in xs]
trend_category, slope_ft_per_year, p_value, tau = _mann_kendall_trend(
years, ys
)
else:
trend_category = "not enough data"

props = {
"source": getattr(site, "source", "") or "",
"id": getattr(site, "id", "") or "",
"name": getattr(site, "name", None),
"well_depth": getattr(site, "well_depth", None),
"well_depth_units": getattr(site, "well_depth_units", None),
"record_count": record_count,
"first_observation_datetime": _iso_utc(xs[0]) if record_count else None,
"last_observation_datetime": _iso_utc(xs[-1]) if record_count else None,
"span_years": round(span_years, 3),
"slope_ft_per_year": (
None if slope_ft_per_year is None else round(slope_ft_per_year, 4)
),
"trend_category": trend_category,
"mk_p_value": None if p_value is None else round(p_value, 4),
"mk_tau": None if tau is None else round(tau, 4),
}

features.append({
"type": "Feature",
"id": _feature_id(props["source"], props["id"]),
"geometry": _point_geometry(
getattr(site, "latitude", None),
getattr(site, "longitude", None),
getattr(site, "elevation", None),
),
"properties": props,
})

return _dump_collection(
path, collection_id, features, meta,
extra={"trend_method": TREND_METHOD_DESCRIPTION},
)


def dump_timeseries_collection(
Expand Down Expand Up @@ -207,11 +370,11 @@ def dump_timeseries_collection(
site = site_lookup.get(site_id)

if site:
lat = getattr(site, "latitude", None)
lon = getattr(site, "longitude", None)
elev = getattr(site, "elevation", None)
coords = [lon, lat] if elev is None else [lon, lat, elev]
geometry = {"type": "Point", "coordinates": coords}
geometry = _point_geometry(
getattr(site, "latitude", None),
getattr(site, "longitude", None),
getattr(site, "elevation", None),
)
else:
geometry = None

Expand All @@ -237,25 +400,4 @@ def dump_timeseries_collection(
"properties": props,
})

collection = {
"type": "FeatureCollection",
"id": collection_id,
"title": meta.get("title", collection_id),
"description": meta.get("description", ""),
"timeStamp": _timestamp_now(),
"numberMatched": len(features),
"numberReturned": len(features),
"links": [
{
"href": meta.get("href", ""),
"rel": "self",
"type": "application/geo+json",
}
],
"features": features,
}

with open(path, "w", encoding="utf-8") as f:
json.dump(collection, f, indent=2, default=str)

return collection
return _dump_collection(path, collection_id, features, meta)
Loading
Loading