diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 0f8344a..e6d1cb4 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -11,72 +11,167 @@ from datetime import datetime, timezone from typing import Optional +# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope +# in ft/year. +_SECONDS_PER_YEAR = 31557600.0 + +# A well is classified only when it has enough data for a meaningful +# Mann-Kendall test: at least 10 measurements, or at least 4 spanning >= 2 years. +_TREND_MIN_RECORDS = 10 +_TREND_MIN_RECORDS_WITH_SPAN = 4 +_TREND_MIN_SPAN_YEARS = 2.0 +_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test + +# Human-readable description of the trend method, embedded in the product so +# consumers know how the classification was derived. +TREND_METHOD_DESCRIPTION = ( + "Depth-to-water trend per well. Monotonic trend is tested with the " + "non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) " + "on depth-to-water-below-ground-surface (feet) ordered by observation time. " + "The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A " + "well is classified only when it has at least 10 measurements, or at least 4 " + "measurements spanning at least 2 years; otherwise 'not enough data'. When " + "classified: a statistically significant increasing trend is 'increasing' " + "(water level getting DEEPER, i.e. a declining water table), a significant " + "decreasing trend is 'decreasing' (water level getting SHALLOWER), and no " + "significant trend is 'stable'. Mirrors the intent of the Ocotillo API " + "ogc_depth_to_water_trend_wells materialized view, using Mann-Kendall + " + "Theil-Sen instead of ordinary least squares." +) + def _timestamp_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") -def _make_feature(record, collection_id: str) -> dict: - """Build one OGC-compliant Feature from a SummaryRecord or SiteRecord.""" - source = getattr(record, "source", "") - rid = getattr(record, "id", "") - feature_id = f"{source}:{rid}" if source and rid else str(rid) +def _iso_utc(epoch) -> Optional[str]: + """Format POSIX seconds as an ISO-8601 UTC string (None passes through).""" + if epoch is None: + return None + return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - props = {k: getattr(record, k) for k in record.keys - if k not in ("latitude", "longitude", "elevation")} - lat = getattr(record, "latitude", None) - lon = getattr(record, "longitude", None) - elev = getattr(record, "elevation", None) +def _feature_id(source, rid) -> str: + source = source or "" + rid = rid or "" + return f"{source}:{rid}" if source and rid else str(rid) - coords = [lon, lat] if elev is None else [lon, lat, elev] - - return { - "type": "Feature", - "id": feature_id, - "geometry": {"type": "Point", "coordinates": coords}, - "properties": props, - } +def _point_geometry(lat, lon, elev=None) -> dict: + coords = [lon, lat] if elev is None else [lon, lat, elev] + return {"type": "Point", "coordinates": coords} -def dump_summary_collection(path: str, records: list, meta: dict) -> dict: - """ - Write an OGC FeatureCollection of summary/site records to *path*. - meta keys (all optional): - id, title, description — collection metadata +def _dump_collection( + path: str, collection_id: str, features: list, meta: dict, extra: Optional[dict] = None +) -> dict: + """Build the OGC FeatureCollection envelope around *features*, write it to + *path*, and return it. *extra* injects collection-level keys (e.g. + trend_method) after description. - Returns the collection dict (for testing). §V: MUST include top-level id, type, numberReturned, timeStamp. - §V: Each Feature MUST have top-level id. """ - collection_id = meta.get("id", "collection") - features = [_make_feature(r, collection_id) for r in records] - collection = { "type": "FeatureCollection", "id": collection_id, "title": meta.get("title", collection_id), "description": meta.get("description", ""), + **(extra or {}), "timeStamp": _timestamp_now(), "numberMatched": len(features), "numberReturned": len(features), "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } + {"href": meta.get("href", ""), "rel": "self", "type": "application/geo+json"} ], "features": features, } - with open(path, "w", encoding="utf-8") as f: json.dump(collection, f, indent=2, default=str) - return collection +def _parse_epoch_seconds(date, time) -> Optional[float]: + """Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC).""" + if not date: + return None + text = f"{date}T{time}" if time else str(date) + text = text.replace("Z", "") + for parse in (datetime.fromisoformat,): + try: + dt = parse(text) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + pass + try: + dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + return None + + +def _qualifies_for_trend(record_count, span_years) -> bool: + return record_count >= _TREND_MIN_RECORDS or ( + record_count >= _TREND_MIN_RECORDS_WITH_SPAN + and span_years >= _TREND_MIN_SPAN_YEARS + ) + + +def _mann_kendall_trend(years: list, values: list): + """Run the Mann-Kendall trend test + Theil-Sen slope. + + Returns (trend_category, slope_ft_per_year, p_value, tau). *years* are + decimal years, *values* depth-to-water (ft), both ordered by time. + trend_category is one of 'increasing' / 'decreasing' / 'stable'. + """ + import pymannkendall as mk + from scipy.stats import theilslopes + + result = mk.original_test(values, alpha=_TREND_ALPHA) + # Time-aware Theil-Sen slope (ft/year) — robust and correct for the + # irregular sampling typical of water-level records, unlike MK's index-based + # slope which assumes unit spacing. + slope_ft_per_year = float(theilslopes(values, years)[0]) + + # mk trend is 'increasing' / 'decreasing' / 'no trend'. + category = "stable" if result.trend == "no trend" else result.trend + return category, slope_ft_per_year, float(result.p), float(result.Tau) + + +def _make_feature(record, collection_id: str) -> dict: + """Build one OGC-compliant Feature from a SummaryRecord or SiteRecord.""" + props = {k: getattr(record, k) for k in record.keys + if k not in ("latitude", "longitude", "elevation")} + + return { + "type": "Feature", + "id": _feature_id(getattr(record, "source", ""), getattr(record, "id", "")), + "geometry": _point_geometry( + getattr(record, "latitude", None), + getattr(record, "longitude", None), + getattr(record, "elevation", None), + ), + "properties": props, + } + + +def dump_summary_collection(path: str, records: list, meta: dict) -> dict: + """ + Write an OGC FeatureCollection of summary/site records to *path*. + + meta keys (all optional): + id, title, description — collection metadata + + Returns the collection dict (for testing). + §V: MUST include top-level id, type, numberReturned, timeStamp. + §V: Each Feature MUST have top-level id. + """ + collection_id = meta.get("id", "collection") + features = [_make_feature(r, collection_id) for r in records] + return _dump_collection(path, collection_id, features, meta) + + def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dict: """ Write an OGC FeatureCollection of wells with major-chemistry analytes as @@ -128,7 +223,6 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic features = [] for (source, rid), well in wells.items(): - feature_id = f"{source}:{rid}" if source and rid else str(rid) props = { "source": source, "id": rid, @@ -141,37 +235,106 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic props[f"{analyte}_units"] = vals["units"] props[f"{analyte}_date"] = vals["date"] - lat, lon, elev = well["latitude"], well["longitude"], well["elevation"] - coords = [lon, lat] if elev is None else [lon, lat, elev] features.append({ "type": "Feature", - "id": feature_id, - "geometry": {"type": "Point", "coordinates": coords}, + "id": _feature_id(source, rid), + "geometry": _point_geometry(well["latitude"], well["longitude"], well["elevation"]), "properties": props, }) - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } + return _dump_collection(path, collection_id, features, meta) - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - return collection +def dump_waterlevel_trend_collection( + path: str, + site_records: list, + timeseries_records: list, + meta: dict, +) -> dict: + """ + Write an OGC FeatureCollection of per-well depth-to-water trends to *path*. + One Feature per well. + + *site_records* and *timeseries_records* are index-aligned: ``site_records[i]`` + is the well and ``timeseries_records[i]`` is its list of ParameterRecord + observations (DIE water-level values are already depth-to-water below ground + surface in feet, so no measuring-point adjustment is applied here). + + Each feature carries: record_count, first/last_observation_datetime, + span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall), + mk_p_value, mk_tau, well_depth(+units). The collection carries + ``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION. + + §V: MUST include top-level id, type, numberReturned, timeStamp. + §V: Each Feature MUST have top-level id. + """ + collection_id = meta.get("id", "collection") + + features = [] + for site, obs_list in zip(site_records, timeseries_records): + pairs = [] + for obs in obs_list: + value = getattr(obs, "parameter_value", None) + epoch = _parse_epoch_seconds( + getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) + ) + if value is None or epoch is None: + continue + try: + pairs.append((epoch, float(value))) + except (TypeError, ValueError): + continue + + pairs.sort(key=lambda p: p[0]) + record_count = len(pairs) + xs = [p[0] for p in pairs] + ys = [p[1] for p in pairs] + span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR if record_count >= 2 else 0.0 + + slope_ft_per_year = None + p_value = None + tau = None + if _qualifies_for_trend(record_count, span_years): + years = [x / _SECONDS_PER_YEAR for x in xs] + trend_category, slope_ft_per_year, p_value, tau = _mann_kendall_trend( + years, ys + ) + else: + trend_category = "not enough data" + + props = { + "source": getattr(site, "source", "") or "", + "id": getattr(site, "id", "") or "", + "name": getattr(site, "name", None), + "well_depth": getattr(site, "well_depth", None), + "well_depth_units": getattr(site, "well_depth_units", None), + "record_count": record_count, + "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, + "last_observation_datetime": _iso_utc(xs[-1]) if record_count else None, + "span_years": round(span_years, 3), + "slope_ft_per_year": ( + None if slope_ft_per_year is None else round(slope_ft_per_year, 4) + ), + "trend_category": trend_category, + "mk_p_value": None if p_value is None else round(p_value, 4), + "mk_tau": None if tau is None else round(tau, 4), + } + + features.append({ + "type": "Feature", + "id": _feature_id(props["source"], props["id"]), + "geometry": _point_geometry( + getattr(site, "latitude", None), + getattr(site, "longitude", None), + getattr(site, "elevation", None), + ), + "properties": props, + }) + + return _dump_collection( + path, collection_id, features, meta, + extra={"trend_method": TREND_METHOD_DESCRIPTION}, + ) def dump_timeseries_collection( @@ -207,11 +370,11 @@ def dump_timeseries_collection( site = site_lookup.get(site_id) if site: - lat = getattr(site, "latitude", None) - lon = getattr(site, "longitude", None) - elev = getattr(site, "elevation", None) - coords = [lon, lat] if elev is None else [lon, lat, elev] - geometry = {"type": "Point", "coordinates": coords} + geometry = _point_geometry( + getattr(site, "latitude", None), + getattr(site, "longitude", None), + getattr(site, "elevation", None), + ) else: geometry = None @@ -237,25 +400,4 @@ def dump_timeseries_collection( "properties": props, }) - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } - - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - - return collection + return _dump_collection(path, collection_id, features, meta) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index d981300..22a3d74 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -37,6 +37,7 @@ dump_major_chemistry_collection, dump_summary_collection, dump_timeseries_collection, + dump_waterlevel_trend_collection, ) from backend.record import ParameterRecord, SiteRecord, SummaryRecord from backend.unifier import unify_source @@ -167,8 +168,8 @@ def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: li Depends on every source asset (wired via ``ins``), merges their records/sites/timeseries, writes the OGC GeoJSON collection — summary, - timeseries, or major-chemistry depending on ``output_type`` — and uploads it - to GCS.""" + timeseries, major-chemistry, or waterlevel-trend depending on + ``output_type`` — and uploads it to GCS.""" pid = product["id"] output_type = product["output_type"] ins = { @@ -204,6 +205,14 @@ def _combine_asset( elif output_type == "ogc_summary": records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) + elif output_type == "ogc_waterlevel_trend": + # site_records and the per-site observation lists are index + # aligned (see source asset); compute one trend per well. + site_records = [SiteRecord(p) for p in all_sites] + series = [ + [ParameterRecord(o) for o in site_ts] for site_ts in all_timeseries + ] + dump_waterlevel_trend_collection(str(out), site_records, series, meta) else: site_records = [SiteRecord(p) for p in all_sites] flat = [ diff --git a/orchestration/config/products.yaml b/orchestration/config/products.yaml index bff1be7..c29768f 100644 --- a/orchestration/config/products.yaml +++ b/orchestration/config/products.yaml @@ -68,3 +68,18 @@ products: state: NM sources: exclude: [] + + # One feature per well with a depth-to-water trend (slope_ft_per_year + + # category) computed from each well's water-level timeseries. The collection + # carries a trend_method description. Algorithm ported from the Ocotillo API + # ogc_depth_to_water_trend_wells materialized view (see ogc_features.py). + - id: nm_waterlevel_trends + parameter: waterlevels + output_type: ogc_waterlevel_trend + title: "NM Water Level Trends" + description: "Per-well depth-to-water trend (slope and category), all NM sources" + schedule: "0 12 * * *" + spatial_filter: + state: NM + sources: + exclude: [] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index e9ae0eb..21d05cc 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -11,7 +11,12 @@ _PRODUCTS_PATH = Path(__file__).parent / "config" / "products.yaml" -_SUPPORTED_OUTPUT_TYPES = {"ogc_summary", "ogc_timeseries", "ogc_major_chemistry"} +_SUPPORTED_OUTPUT_TYPES = { + "ogc_summary", + "ogc_timeseries", + "ogc_major_chemistry", + "ogc_waterlevel_trend", +} def _load_products() -> dict: diff --git a/pyproject.toml b/pyproject.toml index 58766bb..44ea3aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "geopandas", "httpx", "pandas", + "pymannkendall", "pyyaml", "types-pyyaml", "urllib3>=2.2.0,<3.0.0", diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index c25bd69..bc405a6 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -228,3 +228,53 @@ def test_geometry_and_required_fields(self, tmp_path): assert "timeStamp" in result feat = result["features"][0] assert feat["geometry"]["coordinates"] == [-106.0, 34.0] + + +from backend.persisters.ogc_features import dump_waterlevel_trend_collection + + +def _trend_site(source="NMBGMR", rid="W1", well_depth=100.0): + return SiteRecord({ + "source": source, "id": rid, "name": f"Well {rid}", + "latitude": 34.0, "longitude": -106.0, "elevation": None, + "well_depth": well_depth, "well_depth_units": "ft", + }) + + +def _trend_obs(date, value): + return ParameterRecord({"parameter_value": value, "date_measured": date, "time_measured": None}) + + +class TestWaterLevelTrendCollection: + def test_classifies_trends_and_carries_method(self, tmp_path): + increasing = [_trend_obs(f"{2010 + i}-01-01", 50.0 + 0.5 * i) for i in range(12)] + stable = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)] + sparse = [_trend_obs("2010-01-01", 50.0), _trend_obs("2011-01-01", 51.0), _trend_obs("2012-01-01", 52.0)] + decreasing = [_trend_obs(f"{2010 + i}-01-01", 60.0 - 1.0 * i) for i in range(5)] + + sites = [_trend_site(rid="A"), _trend_site(rid="B"), _trend_site("NWIS", "C"), _trend_site("PVACD", "D")] + series = [increasing, stable, sparse, decreasing] + + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + + assert result["numberReturned"] == 4 + assert "trend_method" in result and result["trend_method"] + by_id = {f["id"]: f["properties"] for f in result["features"]} + + assert by_id["NMBGMR:A"]["trend_category"] == "increasing" + assert round(by_id["NMBGMR:A"]["slope_ft_per_year"], 2) == 0.5 + assert by_id["NMBGMR:B"]["trend_category"] == "stable" + assert by_id["NWIS:C"]["trend_category"] == "not enough data" # only 3 records + assert by_id["PVACD:D"]["trend_category"] == "decreasing" # 5 records / 4 yr span + + def test_required_fields_and_geometry(self, tmp_path): + sites = [_trend_site(rid="W1")] + series = [[_trend_obs("2010-01-01", 50.0)]] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + assert result["type"] == "FeatureCollection" + assert "timeStamp" in result + feat = result["features"][0] + assert feat["geometry"]["coordinates"] == [-106.0, 34.0] + assert feat["properties"]["trend_category"] == "not enough data" # single record