From 7fdd08655ffe3255a4e2327696ffaae3b8f0b480 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 11:26:56 -0600 Subject: [PATCH 1/3] Add waterlevel-trends data product (one feature per well) New ogc_waterlevel_trend product: nm_waterlevel_trends. One GeoJSON feature per well with a depth-to-water trend computed from the well's water-level timeseries, across all NM sources. Algorithm ported from the Ocotillo API ogc_depth_to_water_trend_wells materialized view: - least-squares slope (REGR_SLOPE) of depth-to-water (ft) vs observation time, scaled to ft/year (365.25-day year); - classify only with >=10 records, or >=4 records spanning >=2 years; slope > 0.25 ft/yr "increasing" (deeper/declining), < -0.25 "decreasing", else "stable"; otherwise "not enough data". The product includes a trend_method description of how the trend was calculated (TREND_METHOD_DESCRIPTION). Co-Authored-By: Claude Opus 4.8 --- backend/persisters/ogc_features.py | 195 +++++++++++++++++++++ orchestration/assets/products.py | 13 +- orchestration/config/products.yaml | 15 ++ orchestration/definitions.py | 7 +- tests/test_persisters/test_ogc_features.py | 50 ++++++ 5 files changed, 277 insertions(+), 3 deletions(-) diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 0f8344a..b998278 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -11,11 +11,87 @@ from datetime import datetime, timezone from typing import Optional +# Seconds per Julian year (365.25 days) — matches Ocotillo's trend MV, which +# divides the per-second regression slope by 31557600 to get ft/year. +_SECONDS_PER_YEAR = 31557600.0 + +# Trend classification thresholds, ported verbatim from the Ocotillo +# ogc_depth_to_water_trend_wells materialized view. +_TREND_SLOPE_THRESHOLD = 0.25 # ft/year +_TREND_MIN_RECORDS = 10 +_TREND_MIN_RECORDS_WITH_SPAN = 4 +_TREND_MIN_SPAN_YEARS = 2.0 + +# Human-readable description of the trend method, embedded in the product so +# consumers know how the classification was derived. +TREND_METHOD_DESCRIPTION = ( + "Depth-to-water trend per well. A least-squares linear regression " + "(equivalent to PostgreSQL REGR_SLOPE) is fit to depth-to-water-below-" + "ground-surface (feet) against observation time; the per-second slope is " + "scaled by 31,557,600 s/yr (a 365.25-day year) to slope_ft_per_year. A well " + "is classified only when it has at least 10 measurements, or at least 4 " + "measurements spanning at least 2 years; otherwise 'not enough data'. When " + "classified: slope > 0.25 ft/yr is 'increasing' (water level getting " + "DEEPER, i.e. a declining water table), slope < -0.25 ft/yr is 'decreasing' " + "(water level getting SHALLOWER), otherwise 'stable'. Ported from the " + "Ocotillo API ogc_depth_to_water_trend_wells materialized view." +) + def _timestamp_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") +def _parse_epoch_seconds(date, time) -> Optional[float]: + """Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC).""" + if not date: + return None + text = f"{date}T{time}" if time else str(date) + text = text.replace("Z", "") + for parse in (datetime.fromisoformat,): + try: + dt = parse(text) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + pass + try: + dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + return None + + +def _regr_slope(xs: list, ys: list) -> Optional[float]: + """Least-squares slope of ys on xs (PostgreSQL REGR_SLOPE). None if x has no + variance (fewer than 2 distinct x values).""" + n = len(xs) + if n < 2: + return None + mx = sum(xs) / n + my = sum(ys) / n + sxx = sum((x - mx) ** 2 for x in xs) + if sxx == 0: + return None + sxy = sum((x - mx) * (y - my) for x, y in zip(xs, ys)) + return sxy / sxx + + +def _classify_trend(slope_ft_per_year, record_count, span_years) -> str: + qualifies = record_count >= _TREND_MIN_RECORDS or ( + record_count >= _TREND_MIN_RECORDS_WITH_SPAN + and span_years >= _TREND_MIN_SPAN_YEARS + ) + if not qualifies or slope_ft_per_year is None: + return "not enough data" + if slope_ft_per_year > _TREND_SLOPE_THRESHOLD: + return "increasing" + if slope_ft_per_year < -_TREND_SLOPE_THRESHOLD: + return "decreasing" + return "stable" + + def _make_feature(record, collection_id: str) -> dict: """Build one OGC-compliant Feature from a SummaryRecord or SiteRecord.""" source = getattr(record, "source", "") @@ -174,6 +250,125 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic return collection +def dump_waterlevel_trend_collection( + path: str, + site_records: list, + timeseries_records: list, + meta: dict, +) -> dict: + """ + Write an OGC FeatureCollection of per-well depth-to-water trends to *path*. + One Feature per well. + + *site_records* and *timeseries_records* are index-aligned: ``site_records[i]`` + is the well and ``timeseries_records[i]`` is its list of ParameterRecord + observations (DIE water-level values are already depth-to-water below ground + surface in feet, so no measuring-point adjustment is applied here). + + Each feature carries: record_count, first/last_observation_datetime, + span_years, slope_ft_per_year, trend_category, well_depth(+units). The + collection carries ``trend_method`` describing the calculation. See + TREND_METHOD_DESCRIPTION. + + §V: MUST include top-level id, type, numberReturned, timeStamp. + §V: Each Feature MUST have top-level id. + """ + collection_id = meta.get("id", "collection") + + features = [] + for site, obs_list in zip(site_records, timeseries_records): + pairs = [] + for obs in obs_list: + value = getattr(obs, "parameter_value", None) + epoch = _parse_epoch_seconds( + getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) + ) + if value is None or epoch is None: + continue + try: + pairs.append((epoch, float(value))) + except (TypeError, ValueError): + continue + + pairs.sort(key=lambda p: p[0]) + record_count = len(pairs) + xs = [p[0] for p in pairs] + ys = [p[1] for p in pairs] + + if record_count >= 2: + span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR + slope = _regr_slope(xs, ys) + slope_ft_per_year = None if slope is None else slope * _SECONDS_PER_YEAR + else: + span_years = 0.0 + slope_ft_per_year = None + + trend_category = _classify_trend(slope_ft_per_year, record_count, span_years) + + source = getattr(site, "source", "") or "" + rid = getattr(site, "id", "") or "" + feature_id = f"{source}:{rid}" if source and rid else str(rid) + + def _iso(epoch): + if epoch is None: + return None + return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + + props = { + "source": source, + "id": rid, + "name": getattr(site, "name", None), + "well_depth": getattr(site, "well_depth", None), + "well_depth_units": getattr(site, "well_depth_units", None), + "record_count": record_count, + "first_observation_datetime": _iso(xs[0]) if record_count else None, + "last_observation_datetime": _iso(xs[-1]) if record_count else None, + "span_years": round(span_years, 3), + "slope_ft_per_year": ( + None if slope_ft_per_year is None else round(slope_ft_per_year, 4) + ), + "trend_category": trend_category, + } + + lat = getattr(site, "latitude", None) + lon = getattr(site, "longitude", None) + elev = getattr(site, "elevation", None) + coords = [lon, lat] if elev is None else [lon, lat, elev] + + features.append({ + "type": "Feature", + "id": feature_id, + "geometry": {"type": "Point", "coordinates": coords}, + "properties": props, + }) + + collection = { + "type": "FeatureCollection", + "id": collection_id, + "title": meta.get("title", collection_id), + "description": meta.get("description", ""), + "trend_method": TREND_METHOD_DESCRIPTION, + "timeStamp": _timestamp_now(), + "numberMatched": len(features), + "numberReturned": len(features), + "links": [ + { + "href": meta.get("href", ""), + "rel": "self", + "type": "application/geo+json", + } + ], + "features": features, + } + + with open(path, "w", encoding="utf-8") as f: + json.dump(collection, f, indent=2, default=str) + + return collection + + def dump_timeseries_collection( path: str, site_records: list, diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index d981300..22a3d74 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -37,6 +37,7 @@ dump_major_chemistry_collection, dump_summary_collection, dump_timeseries_collection, + dump_waterlevel_trend_collection, ) from backend.record import ParameterRecord, SiteRecord, SummaryRecord from backend.unifier import unify_source @@ -167,8 +168,8 @@ def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: li Depends on every source asset (wired via ``ins``), merges their records/sites/timeseries, writes the OGC GeoJSON collection — summary, - timeseries, or major-chemistry depending on ``output_type`` — and uploads it - to GCS.""" + timeseries, major-chemistry, or waterlevel-trend depending on + ``output_type`` — and uploads it to GCS.""" pid = product["id"] output_type = product["output_type"] ins = { @@ -204,6 +205,14 @@ def _combine_asset( elif output_type == "ogc_summary": records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) + elif output_type == "ogc_waterlevel_trend": + # site_records and the per-site observation lists are index + # aligned (see source asset); compute one trend per well. + site_records = [SiteRecord(p) for p in all_sites] + series = [ + [ParameterRecord(o) for o in site_ts] for site_ts in all_timeseries + ] + dump_waterlevel_trend_collection(str(out), site_records, series, meta) else: site_records = [SiteRecord(p) for p in all_sites] flat = [ diff --git a/orchestration/config/products.yaml b/orchestration/config/products.yaml index bff1be7..c29768f 100644 --- a/orchestration/config/products.yaml +++ b/orchestration/config/products.yaml @@ -68,3 +68,18 @@ products: state: NM sources: exclude: [] + + # One feature per well with a depth-to-water trend (slope_ft_per_year + + # category) computed from each well's water-level timeseries. The collection + # carries a trend_method description. Algorithm ported from the Ocotillo API + # ogc_depth_to_water_trend_wells materialized view (see ogc_features.py). + - id: nm_waterlevel_trends + parameter: waterlevels + output_type: ogc_waterlevel_trend + title: "NM Water Level Trends" + description: "Per-well depth-to-water trend (slope and category), all NM sources" + schedule: "0 12 * * *" + spatial_filter: + state: NM + sources: + exclude: [] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index e9ae0eb..21d05cc 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -11,7 +11,12 @@ _PRODUCTS_PATH = Path(__file__).parent / "config" / "products.yaml" -_SUPPORTED_OUTPUT_TYPES = {"ogc_summary", "ogc_timeseries", "ogc_major_chemistry"} +_SUPPORTED_OUTPUT_TYPES = { + "ogc_summary", + "ogc_timeseries", + "ogc_major_chemistry", + "ogc_waterlevel_trend", +} def _load_products() -> dict: diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index c25bd69..bc405a6 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -228,3 +228,53 @@ def test_geometry_and_required_fields(self, tmp_path): assert "timeStamp" in result feat = result["features"][0] assert feat["geometry"]["coordinates"] == [-106.0, 34.0] + + +from backend.persisters.ogc_features import dump_waterlevel_trend_collection + + +def _trend_site(source="NMBGMR", rid="W1", well_depth=100.0): + return SiteRecord({ + "source": source, "id": rid, "name": f"Well {rid}", + "latitude": 34.0, "longitude": -106.0, "elevation": None, + "well_depth": well_depth, "well_depth_units": "ft", + }) + + +def _trend_obs(date, value): + return ParameterRecord({"parameter_value": value, "date_measured": date, "time_measured": None}) + + +class TestWaterLevelTrendCollection: + def test_classifies_trends_and_carries_method(self, tmp_path): + increasing = [_trend_obs(f"{2010 + i}-01-01", 50.0 + 0.5 * i) for i in range(12)] + stable = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)] + sparse = [_trend_obs("2010-01-01", 50.0), _trend_obs("2011-01-01", 51.0), _trend_obs("2012-01-01", 52.0)] + decreasing = [_trend_obs(f"{2010 + i}-01-01", 60.0 - 1.0 * i) for i in range(5)] + + sites = [_trend_site(rid="A"), _trend_site(rid="B"), _trend_site("NWIS", "C"), _trend_site("PVACD", "D")] + series = [increasing, stable, sparse, decreasing] + + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + + assert result["numberReturned"] == 4 + assert "trend_method" in result and result["trend_method"] + by_id = {f["id"]: f["properties"] for f in result["features"]} + + assert by_id["NMBGMR:A"]["trend_category"] == "increasing" + assert round(by_id["NMBGMR:A"]["slope_ft_per_year"], 2) == 0.5 + assert by_id["NMBGMR:B"]["trend_category"] == "stable" + assert by_id["NWIS:C"]["trend_category"] == "not enough data" # only 3 records + assert by_id["PVACD:D"]["trend_category"] == "decreasing" # 5 records / 4 yr span + + def test_required_fields_and_geometry(self, tmp_path): + sites = [_trend_site(rid="W1")] + series = [[_trend_obs("2010-01-01", 50.0)]] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + assert result["type"] == "FeatureCollection" + assert "timeStamp" in result + feat = result["features"][0] + assert feat["geometry"]["coordinates"] == [-106.0, 34.0] + assert feat["properties"]["trend_category"] == "not enough data" # single record From b4acb48528a4c1d7a52669cc3369f26bd6ab8391 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 11:31:20 -0600 Subject: [PATCH 2/3] Use Mann-Kendall + Theil-Sen for waterlevel trend Replace the OLS slope/threshold classification with the non-parametric Mann-Kendall trend test (pymannkendall.original_test, alpha=0.05) for direction/significance, and the Theil-Sen slope (scipy, time-aware) for the ft/year rate. More robust to outliers and irregular sampling than ordinary least squares. - trend_category: significant increasing/decreasing, else 'stable'; 'not enough data' below the record/span gate (unchanged gate). - new per-feature mk_p_value and mk_tau properties. - add pymannkendall dependency (pulls scipy); serverless gets it via the root pyproject backend deps. - updated TREND_METHOD_DESCRIPTION. Co-Authored-By: Claude Opus 4.8 --- backend/persisters/ogc_features.py | 105 +++++++++++++++-------------- pyproject.toml | 1 + 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index b998278..4326ed2 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -11,30 +11,32 @@ from datetime import datetime, timezone from typing import Optional -# Seconds per Julian year (365.25 days) — matches Ocotillo's trend MV, which -# divides the per-second regression slope by 31557600 to get ft/year. +# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope +# in ft/year. _SECONDS_PER_YEAR = 31557600.0 -# Trend classification thresholds, ported verbatim from the Ocotillo -# ogc_depth_to_water_trend_wells materialized view. -_TREND_SLOPE_THRESHOLD = 0.25 # ft/year +# A well is classified only when it has enough data for a meaningful +# Mann-Kendall test: at least 10 measurements, or at least 4 spanning >= 2 years. _TREND_MIN_RECORDS = 10 _TREND_MIN_RECORDS_WITH_SPAN = 4 _TREND_MIN_SPAN_YEARS = 2.0 +_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test # Human-readable description of the trend method, embedded in the product so # consumers know how the classification was derived. TREND_METHOD_DESCRIPTION = ( - "Depth-to-water trend per well. A least-squares linear regression " - "(equivalent to PostgreSQL REGR_SLOPE) is fit to depth-to-water-below-" - "ground-surface (feet) against observation time; the per-second slope is " - "scaled by 31,557,600 s/yr (a 365.25-day year) to slope_ft_per_year. A well " - "is classified only when it has at least 10 measurements, or at least 4 " + "Depth-to-water trend per well. Monotonic trend is tested with the " + "non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) " + "on depth-to-water-below-ground-surface (feet) ordered by observation time. " + "The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A " + "well is classified only when it has at least 10 measurements, or at least 4 " "measurements spanning at least 2 years; otherwise 'not enough data'. When " - "classified: slope > 0.25 ft/yr is 'increasing' (water level getting " - "DEEPER, i.e. a declining water table), slope < -0.25 ft/yr is 'decreasing' " - "(water level getting SHALLOWER), otherwise 'stable'. Ported from the " - "Ocotillo API ogc_depth_to_water_trend_wells materialized view." + "classified: a statistically significant increasing trend is 'increasing' " + "(water level getting DEEPER, i.e. a declining water table), a significant " + "decreasing trend is 'decreasing' (water level getting SHALLOWER), and no " + "significant trend is 'stable'. Mirrors the intent of the Ocotillo API " + "ogc_depth_to_water_trend_wells materialized view, using Mann-Kendall + " + "Theil-Sen instead of ordinary least squares." ) @@ -63,33 +65,32 @@ def _parse_epoch_seconds(date, time) -> Optional[float]: return None -def _regr_slope(xs: list, ys: list) -> Optional[float]: - """Least-squares slope of ys on xs (PostgreSQL REGR_SLOPE). None if x has no - variance (fewer than 2 distinct x values).""" - n = len(xs) - if n < 2: - return None - mx = sum(xs) / n - my = sum(ys) / n - sxx = sum((x - mx) ** 2 for x in xs) - if sxx == 0: - return None - sxy = sum((x - mx) * (y - my) for x, y in zip(xs, ys)) - return sxy / sxx - - -def _classify_trend(slope_ft_per_year, record_count, span_years) -> str: - qualifies = record_count >= _TREND_MIN_RECORDS or ( +def _qualifies_for_trend(record_count, span_years) -> bool: + return record_count >= _TREND_MIN_RECORDS or ( record_count >= _TREND_MIN_RECORDS_WITH_SPAN and span_years >= _TREND_MIN_SPAN_YEARS ) - if not qualifies or slope_ft_per_year is None: - return "not enough data" - if slope_ft_per_year > _TREND_SLOPE_THRESHOLD: - return "increasing" - if slope_ft_per_year < -_TREND_SLOPE_THRESHOLD: - return "decreasing" - return "stable" + + +def _mann_kendall_trend(years: list, values: list): + """Run the Mann-Kendall trend test + Theil-Sen slope. + + Returns (trend_category, slope_ft_per_year, p_value, tau). *years* are + decimal years, *values* depth-to-water (ft), both ordered by time. + trend_category is one of 'increasing' / 'decreasing' / 'stable'. + """ + import pymannkendall as mk + from scipy.stats import theilslopes + + result = mk.original_test(values, alpha=_TREND_ALPHA) + # Time-aware Theil-Sen slope (ft/year) — robust and correct for the + # irregular sampling typical of water-level records, unlike MK's index-based + # slope which assumes unit spacing. + slope_ft_per_year = float(theilslopes(values, years)[0]) + + # mk trend is 'increasing' / 'decreasing' / 'no trend'. + category = "stable" if result.trend == "no trend" else result.trend + return category, slope_ft_per_year, float(result.p), float(result.Tau) def _make_feature(record, collection_id: str) -> dict: @@ -266,9 +267,9 @@ def dump_waterlevel_trend_collection( surface in feet, so no measuring-point adjustment is applied here). Each feature carries: record_count, first/last_observation_datetime, - span_years, slope_ft_per_year, trend_category, well_depth(+units). The - collection carries ``trend_method`` describing the calculation. See - TREND_METHOD_DESCRIPTION. + span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall), + mk_p_value, mk_tau, well_depth(+units). The collection carries + ``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION. §V: MUST include top-level id, type, numberReturned, timeStamp. §V: Each Feature MUST have top-level id. @@ -294,16 +295,18 @@ def dump_waterlevel_trend_collection( record_count = len(pairs) xs = [p[0] for p in pairs] ys = [p[1] for p in pairs] - - if record_count >= 2: - span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR - slope = _regr_slope(xs, ys) - slope_ft_per_year = None if slope is None else slope * _SECONDS_PER_YEAR + span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR if record_count >= 2 else 0.0 + + slope_ft_per_year = None + p_value = None + tau = None + if _qualifies_for_trend(record_count, span_years): + years = [x / _SECONDS_PER_YEAR for x in xs] + trend_category, slope_ft_per_year, p_value, tau = _mann_kendall_trend( + years, ys + ) else: - span_years = 0.0 - slope_ft_per_year = None - - trend_category = _classify_trend(slope_ft_per_year, record_count, span_years) + trend_category = "not enough data" source = getattr(site, "source", "") or "" rid = getattr(site, "id", "") or "" @@ -330,6 +333,8 @@ def _iso(epoch): None if slope_ft_per_year is None else round(slope_ft_per_year, 4) ), "trend_category": trend_category, + "mk_p_value": None if p_value is None else round(p_value, 4), + "mk_tau": None if tau is None else round(tau, 4), } lat = getattr(site, "latitude", None) diff --git a/pyproject.toml b/pyproject.toml index 58766bb..44ea3aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "geopandas", "httpx", "pandas", + "pymannkendall", "pyyaml", "types-pyyaml", "urllib3>=2.2.0,<3.0.0", From 4df06e9f6ab994b9caa2c80b5f2140064a2d9571 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 11:35:44 -0600 Subject: [PATCH 3/3] DRY ogc_features: shared collection/geometry/id helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The four dump_* functions each repeated the FeatureCollection envelope + file write, the coords/geometry build, the feature-id rule, and ISO-UTC formatting. Extract: - _dump_collection(path, id, features, meta, extra) — envelope + write, with extra for collection-level keys (trend_method). - _point_geometry(lat, lon, elev) — Point geometry. - _feature_id(source, rid) — "source:id" rule. - _iso_utc(epoch) — shared ISO-8601 UTC formatting. Behavior-identical (-58 lines); all 13 persister tests pass. Co-Authored-By: Claude Opus 4.8 --- backend/persisters/ogc_features.py | 210 +++++++++++------------------ 1 file changed, 76 insertions(+), 134 deletions(-) diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 4326ed2..e6d1cb4 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -44,6 +44,52 @@ def _timestamp_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") +def _iso_utc(epoch) -> Optional[str]: + """Format POSIX seconds as an ISO-8601 UTC string (None passes through).""" + if epoch is None: + return None + return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _feature_id(source, rid) -> str: + source = source or "" + rid = rid or "" + return f"{source}:{rid}" if source and rid else str(rid) + + +def _point_geometry(lat, lon, elev=None) -> dict: + coords = [lon, lat] if elev is None else [lon, lat, elev] + return {"type": "Point", "coordinates": coords} + + +def _dump_collection( + path: str, collection_id: str, features: list, meta: dict, extra: Optional[dict] = None +) -> dict: + """Build the OGC FeatureCollection envelope around *features*, write it to + *path*, and return it. *extra* injects collection-level keys (e.g. + trend_method) after description. + + §V: MUST include top-level id, type, numberReturned, timeStamp. + """ + collection = { + "type": "FeatureCollection", + "id": collection_id, + "title": meta.get("title", collection_id), + "description": meta.get("description", ""), + **(extra or {}), + "timeStamp": _timestamp_now(), + "numberMatched": len(features), + "numberReturned": len(features), + "links": [ + {"href": meta.get("href", ""), "rel": "self", "type": "application/geo+json"} + ], + "features": features, + } + with open(path, "w", encoding="utf-8") as f: + json.dump(collection, f, indent=2, default=str) + return collection + + def _parse_epoch_seconds(date, time) -> Optional[float]: """Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC).""" if not date: @@ -95,23 +141,17 @@ def _mann_kendall_trend(years: list, values: list): def _make_feature(record, collection_id: str) -> dict: """Build one OGC-compliant Feature from a SummaryRecord or SiteRecord.""" - source = getattr(record, "source", "") - rid = getattr(record, "id", "") - feature_id = f"{source}:{rid}" if source and rid else str(rid) - props = {k: getattr(record, k) for k in record.keys if k not in ("latitude", "longitude", "elevation")} - lat = getattr(record, "latitude", None) - lon = getattr(record, "longitude", None) - elev = getattr(record, "elevation", None) - - coords = [lon, lat] if elev is None else [lon, lat, elev] - return { "type": "Feature", - "id": feature_id, - "geometry": {"type": "Point", "coordinates": coords}, + "id": _feature_id(getattr(record, "source", ""), getattr(record, "id", "")), + "geometry": _point_geometry( + getattr(record, "latitude", None), + getattr(record, "longitude", None), + getattr(record, "elevation", None), + ), "properties": props, } @@ -129,29 +169,7 @@ def dump_summary_collection(path: str, records: list, meta: dict) -> dict: """ collection_id = meta.get("id", "collection") features = [_make_feature(r, collection_id) for r in records] - - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } - - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - - return collection + return _dump_collection(path, collection_id, features, meta) def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dict: @@ -205,7 +223,6 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic features = [] for (source, rid), well in wells.items(): - feature_id = f"{source}:{rid}" if source and rid else str(rid) props = { "source": source, "id": rid, @@ -218,37 +235,14 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic props[f"{analyte}_units"] = vals["units"] props[f"{analyte}_date"] = vals["date"] - lat, lon, elev = well["latitude"], well["longitude"], well["elevation"] - coords = [lon, lat] if elev is None else [lon, lat, elev] features.append({ "type": "Feature", - "id": feature_id, - "geometry": {"type": "Point", "coordinates": coords}, + "id": _feature_id(source, rid), + "geometry": _point_geometry(well["latitude"], well["longitude"], well["elevation"]), "properties": props, }) - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } - - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - - return collection + return _dump_collection(path, collection_id, features, meta) def dump_waterlevel_trend_collection( @@ -308,26 +302,15 @@ def dump_waterlevel_trend_collection( else: trend_category = "not enough data" - source = getattr(site, "source", "") or "" - rid = getattr(site, "id", "") or "" - feature_id = f"{source}:{rid}" if source and rid else str(rid) - - def _iso(epoch): - if epoch is None: - return None - return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) - props = { - "source": source, - "id": rid, + "source": getattr(site, "source", "") or "", + "id": getattr(site, "id", "") or "", "name": getattr(site, "name", None), "well_depth": getattr(site, "well_depth", None), "well_depth_units": getattr(site, "well_depth_units", None), "record_count": record_count, - "first_observation_datetime": _iso(xs[0]) if record_count else None, - "last_observation_datetime": _iso(xs[-1]) if record_count else None, + "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, + "last_observation_datetime": _iso_utc(xs[-1]) if record_count else None, "span_years": round(span_years, 3), "slope_ft_per_year": ( None if slope_ft_per_year is None else round(slope_ft_per_year, 4) @@ -337,41 +320,21 @@ def _iso(epoch): "mk_tau": None if tau is None else round(tau, 4), } - lat = getattr(site, "latitude", None) - lon = getattr(site, "longitude", None) - elev = getattr(site, "elevation", None) - coords = [lon, lat] if elev is None else [lon, lat, elev] - features.append({ "type": "Feature", - "id": feature_id, - "geometry": {"type": "Point", "coordinates": coords}, + "id": _feature_id(props["source"], props["id"]), + "geometry": _point_geometry( + getattr(site, "latitude", None), + getattr(site, "longitude", None), + getattr(site, "elevation", None), + ), "properties": props, }) - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "trend_method": TREND_METHOD_DESCRIPTION, - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } - - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - - return collection + return _dump_collection( + path, collection_id, features, meta, + extra={"trend_method": TREND_METHOD_DESCRIPTION}, + ) def dump_timeseries_collection( @@ -407,11 +370,11 @@ def dump_timeseries_collection( site = site_lookup.get(site_id) if site: - lat = getattr(site, "latitude", None) - lon = getattr(site, "longitude", None) - elev = getattr(site, "elevation", None) - coords = [lon, lat] if elev is None else [lon, lat, elev] - geometry = {"type": "Point", "coordinates": coords} + geometry = _point_geometry( + getattr(site, "latitude", None), + getattr(site, "longitude", None), + getattr(site, "elevation", None), + ) else: geometry = None @@ -437,25 +400,4 @@ def dump_timeseries_collection( "properties": props, }) - collection = { - "type": "FeatureCollection", - "id": collection_id, - "title": meta.get("title", collection_id), - "description": meta.get("description", ""), - "timeStamp": _timestamp_now(), - "numberMatched": len(features), - "numberReturned": len(features), - "links": [ - { - "href": meta.get("href", ""), - "rel": "self", - "type": "application/geo+json", - } - ], - "features": features, - } - - with open(path, "w", encoding="utf-8") as f: - json.dump(collection, f, indent=2, default=str) - - return collection + return _dump_collection(path, collection_id, features, meta)