diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 37486c0..d3ff34e 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -11,34 +11,17 @@ from datetime import datetime, timezone from typing import Optional -# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope -# in ft/year. -_SECONDS_PER_YEAR = 31557600.0 - -# A well is classified only when it has enough data for a meaningful -# Mann-Kendall test: at least 10 measurements, or at least 4 spanning >= 2 years. -_TREND_MIN_RECORDS = 10 -_TREND_MIN_RECORDS_WITH_SPAN = 4 -_TREND_MIN_SPAN_YEARS = 2.0 -_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test - -# Human-readable description of the trend method, embedded in the product so -# consumers know how the classification was derived. -TREND_METHOD_DESCRIPTION = ( - "Depth-to-water trend per well. Observations are first downsampled to one " - "point per calendar day, keeping the daily minimum depth-to-water (the " - "shallowest reading). Monotonic trend is then tested with the non-parametric " - "Mann-Kendall test (pymannkendall.original_test, alpha=0.05) on the daily " - "depth-to-water-below-ground-surface (feet) series ordered by time. The rate " - "is the Theil-Sen slope of depth-to-water vs time, in ft/year. A well is " - "classified only when it has at least 10 daily points, or at least 4 daily " - "points spanning at least 2 years; otherwise 'not enough data'. When " - "classified: a statistically significant increasing trend is 'increasing' " - "(water level getting DEEPER, i.e. a declining water table), a significant " - "decreasing trend is 'decreasing' (water level getting SHALLOWER), and no " - "significant trend is 'stable'. Mirrors the intent of the Ocotillo API " - "ogc_depth_to_water_trend_wells materialized view, using Mann-Kendall + " - "Theil-Sen instead of ordinary least squares." +# Trend statistics live in backend/trend_stats.py (analysis, not serialization). +# Re-exported here so existing importers (and dump_trend_collection's default +# method arg) keep working. +from backend.trend_stats import ( + _SECONDS_PER_YEAR, + ANALYTE_TREND_METHOD_DESCRIPTION, + TREND_METHOD_DESCRIPTION, + daily_series as _daily_series, + mann_kendall_trend as _mann_kendall_trend, + parse_epoch_seconds as _parse_epoch_seconds, + qualifies_for_trend as _qualifies_for_trend, ) @@ -92,94 +75,6 @@ def _dump_collection( return collection -def _parse_epoch_seconds(date, time) -> Optional[float]: - """Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC).""" - if not date: - return None - text = f"{date}T{time}" if time else str(date) - text = text.replace("Z", "") - for parse in (datetime.fromisoformat,): - try: - dt = parse(text) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.timestamp() - except (ValueError, TypeError): - pass - try: - dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc) - return dt.timestamp() - except (ValueError, TypeError): - return None - - -def _daily_min_series(obs_list: list) -> tuple[int, list]: - """Reduce a well's observations to one point per calendar day, keeping the - minimum depth-to-water for each day (the shallowest reading), keyed at the - day's UTC midnight epoch. - - *obs_list* is a list of observation payload dicts (parameter_value, - date_measured, time_measured). Operating on dicts avoids rebuilding - ParameterRecord objects for what can be millions of observations. - - Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells - (e.g. continuous loggers) and removes within-day sampling noise. Returns - (raw_observation_count, [(day_epoch_seconds, min_value), ...] sorted by day). - """ - raw_count = 0 - daily: dict = {} # date -> (day_epoch, min_value) - for obs in obs_list: - value = obs.get("parameter_value") - epoch = _parse_epoch_seconds( - obs.get("date_measured"), obs.get("time_measured") - ) - if value is None or epoch is None: - continue - try: - v = float(value) - except (TypeError, ValueError): - continue - raw_count += 1 - day = datetime.fromtimestamp(epoch, tz=timezone.utc).date() - day_epoch = datetime( - day.year, day.month, day.day, tzinfo=timezone.utc - ).timestamp() - existing = daily.get(day) - if existing is None or v < existing[1]: - daily[day] = (day_epoch, v) - - pairs = sorted(daily.values(), key=lambda p: p[0]) - return raw_count, pairs - - -def _qualifies_for_trend(record_count, span_years) -> bool: - return record_count >= _TREND_MIN_RECORDS or ( - record_count >= _TREND_MIN_RECORDS_WITH_SPAN - and span_years >= _TREND_MIN_SPAN_YEARS - ) - - -def _mann_kendall_trend(years: list, values: list): - """Run the Mann-Kendall trend test + Theil-Sen slope. - - Returns (trend_category, slope_ft_per_year, p_value, tau). *years* are - decimal years, *values* depth-to-water (ft), both ordered by time. - trend_category is one of 'increasing' / 'decreasing' / 'stable'. - """ - import pymannkendall as mk - from scipy.stats import theilslopes - - result = mk.original_test(values, alpha=_TREND_ALPHA) - # Time-aware Theil-Sen slope (ft/year) — robust and correct for the - # irregular sampling typical of water-level records, unlike MK's index-based - # slope which assumes unit spacing. - slope_ft_per_year = float(theilslopes(values, years)[0]) - - # mk trend is 'increasing' / 'decreasing' / 'no trend'. - category = "stable" if result.trend == "no trend" else result.trend - return category, slope_ft_per_year, float(result.p), float(result.Tau) - - def _make_feature(record, collection_id: str) -> dict: """Build one OGC-compliant Feature from a SummaryRecord or SiteRecord.""" props = {k: getattr(record, k) for k in record.keys @@ -286,29 +181,33 @@ def dump_major_chemistry_collection(path: str, records: list, meta: dict) -> dic return _dump_collection(path, collection_id, features, meta) -def dump_waterlevel_trend_collection( +def dump_trend_collection( path: str, site_records: list, timeseries_records: list, meta: dict, + *, + slope_units: str, + reducer: str = "min", + method: str = TREND_METHOD_DESCRIPTION, + parameter_name: Optional[str] = None, ) -> dict: """ - Write an OGC FeatureCollection of per-well depth-to-water trends to *path*. - One Feature per well. - - *site_records* and *timeseries_records* are index-aligned **payload dicts**: - ``site_records[i]`` is the well's site dict and ``timeseries_records[i]`` is - its list of observation dicts. They are consumed as dicts (not rebuilt into - record objects) to keep memory bounded for statewide, high-frequency data. - DIE water-level values are already depth-to-water below ground surface in - feet, so no measuring-point adjustment is applied here. - - Observations are downsampled to the daily minimum depth-to-water before the - trend test (see _daily_min_series). Each feature carries: record_count - (daily points used), observation_count (raw readings), - first/last_observation_datetime, span_years, slope_ft_per_year (Theil-Sen), - trend_category (Mann-Kendall), mk_p_value, mk_tau, well_depth(+units). The - collection carries ``trend_method``. See TREND_METHOD_DESCRIPTION. + Write an OGC FeatureCollection of per-well Mann-Kendall trends to *path*. + One Feature per well. Used for both water-level and analyte trends. + + *site_records* and *timeseries_records* are index-aligned **payload dicts** + (consumed as dicts, not rebuilt into record objects, to keep memory bounded). + Each well's observations are downsampled to one point per calendar day using + *reducer* ("min" for depth-to-water, "mean" for analytes), then tested with + Mann-Kendall + Theil-Sen. + + Each feature carries: parameter_name, record_count (daily points used), + observation_count (raw readings), first/last_observation_datetime, + span_years, slope_per_year (Theil-Sen) + slope_units, trend_category + (Mann-Kendall), mk_p_value, mk_tau, well_depth(+units), and + source_datastream_link when available. The collection carries + ``trend_method`` (*method*). §V: MUST include top-level id, type, numberReturned, timeStamp. §V: Each Feature MUST have top-level id. @@ -317,21 +216,18 @@ def dump_waterlevel_trend_collection( features = [] for site, obs_list in zip(site_records, timeseries_records): - # Downsample to one min-depth-to-water point per day before the trend - # test (see _daily_min_series). record_count is the daily-point count - # used for the trend; observation_count is the raw reading count. - observation_count, pairs = _daily_min_series(obs_list) + observation_count, pairs = _daily_series(obs_list, reducer) record_count = len(pairs) xs = [p[0] for p in pairs] ys = [p[1] for p in pairs] span_years = (xs[-1] - xs[0]) / _SECONDS_PER_YEAR if record_count >= 2 else 0.0 - slope_ft_per_year = None + slope_per_year = None p_value = None tau = None if _qualifies_for_trend(record_count, span_years): years = [x / _SECONDS_PER_YEAR for x in xs] - trend_category, slope_ft_per_year, p_value, tau = _mann_kendall_trend( + trend_category, slope_per_year, p_value, tau = _mann_kendall_trend( years, ys ) else: @@ -341,6 +237,7 @@ def dump_waterlevel_trend_collection( "source": site.get("source") or "", "id": site.get("id") or "", "name": site.get("name"), + "parameter_name": parameter_name, "well_depth": site.get("well_depth"), "well_depth_units": site.get("well_depth_units"), "record_count": record_count, @@ -348,17 +245,17 @@ def dump_waterlevel_trend_collection( "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, "last_observation_datetime": _iso_utc(xs[-1]) if record_count else None, "span_years": round(span_years, 3), - "slope_ft_per_year": ( - None if slope_ft_per_year is None else round(slope_ft_per_year, 4) + "slope_per_year": ( + None if slope_per_year is None else round(slope_per_year, 4) ), + "slope_units": slope_units, "trend_category": trend_category, "mk_p_value": None if p_value is None else round(p_value, 4), "mk_tau": None if tau is None else round(tau, 4), } # Link to the raw, non-normalized source datastream used for the - # calculation, when the source provides one (SensorThings/st2). Taken - # from the well's observations; absent for sources without one. + # calculation, when the source provides one (SensorThings/st2). source_datastream_link = next( ( o.get("source_datastream_link") @@ -381,9 +278,165 @@ def dump_waterlevel_trend_collection( "properties": props, }) + return _dump_collection( + path, collection_id, features, meta, extra={"trend_method": method} + ) + + +def dump_mcl_exceedance_collection( + path: str, records: list, meta: dict, thresholds: dict +) -> dict: + """ + Write an OGC FeatureCollection flagging drinking-water MCL exceedances, one + Feature per well. + + *records* is a flat list of SummaryRecord — one per (well, analyte) — pivoted + by well (like the major-chemistry product). *thresholds* maps analyte -> + {"mcl": , "type": "primary"|"secondary"} + and is the source of truth for which analytes have an MCL. + + Per analyte present on a well: ```` (latest value), ``_mcl``, + ``_mcl_type``, and ``_exceeds`` (value > mcl). Plus + ``any_exceedance`` (bool), ``exceedance_count`` (int), and + ``exceeded_analytes`` (list). The collection carries ``mcl_thresholds``. + """ + collection_id = meta.get("id", "collection") + + wells: dict = {} + for r in records: + source = getattr(r, "source", "") or "" + rid = getattr(r, "id", "") or "" + key = (source, rid) + well = wells.get(key) + if well is None: + well = { + "source": source, "id": rid, "name": getattr(r, "name", None), + "latitude": getattr(r, "latitude", None), + "longitude": getattr(r, "longitude", None), + "elevation": getattr(r, "elevation", None), + "well_depth": getattr(r, "well_depth", None), + "well_depth_units": getattr(r, "well_depth_units", None), + "values": {}, + } + wells[key] = well + analyte = getattr(r, "parameter_name", None) + if analyte: + well["values"][analyte] = getattr(r, "latest_value", None) + + features = [] + for (source, rid), well in wells.items(): + props = { + "source": source, "id": rid, "name": well["name"], + "well_depth": well["well_depth"], + "well_depth_units": well["well_depth_units"], + } + exceeded = [] + for analyte, value in well["values"].items(): + props[analyte] = value + limit = thresholds.get(analyte) + if not limit: + continue + mcl = limit.get("mcl") + props[f"{analyte}_mcl"] = mcl + props[f"{analyte}_mcl_type"] = limit.get("type") + # Direct magnitude comparison: the MCL and the value MUST share + # units AND basis. NOTE (nitrate): the EPA MCL is 10 mg/L measured + # "as N", but providers/DIE may report nitrate "as NO3" (~4.43x + # larger). If the data basis is NO3, this flag is wrong unless the + # mcl in config/mcl.json is the as-NO3 value (~44.3 mg/L). Confirm + # DIE's normalized nitrate basis before trusting nitrate exceedances. + exceeds = value is not None and mcl is not None and value > mcl + props[f"{analyte}_exceeds"] = exceeds + if exceeds: + exceeded.append(analyte) + + props["any_exceedance"] = bool(exceeded) + props["exceedance_count"] = len(exceeded) + props["exceeded_analytes"] = sorted(exceeded) + + features.append({ + "type": "Feature", + "id": _feature_id(source, rid), + "geometry": _point_geometry( + well["latitude"], well["longitude"], well["elevation"] + ), + "properties": props, + }) + + return _dump_collection( + path, collection_id, features, meta, extra={"mcl_thresholds": thresholds} + ) + + +def dump_monitoring_recency_collection( + path: str, + site_records: list, + timeseries_records: list, + meta: dict, + *, + run_date: str, + stale_days: int = 365, +) -> dict: + """ + Write an OGC FeatureCollection of monitoring recency, one Feature per well. + + *site_records* and *timeseries_records* are index-aligned payload dicts. Per + well: ``record_count``, ``first_observation_datetime``, + ``last_observation_datetime``, ``days_since_last`` (relative to *run_date*), + and ``status`` ("active" if days_since_last <= *stale_days*, else "stale"; + "no data" when the well has no observations). Surfaces dead/lagging + monitoring points. + """ + collection_id = meta.get("id", "collection") + run_epoch = _parse_epoch_seconds(run_date, None) + + features = [] + for site, obs_list in zip(site_records, timeseries_records): + epochs = [ + e for e in ( + _parse_epoch_seconds(o.get("date_measured"), o.get("time_measured")) + for o in obs_list + ) if e is not None + ] + record_count = len(epochs) + if record_count: + first_e, last_e = min(epochs), max(epochs) + days_since_last = ( + int((run_epoch - last_e) // 86400) if run_epoch is not None else None + ) + if days_since_last is None: + status = "unknown" + else: + status = "active" if days_since_last <= stale_days else "stale" + else: + first_e = last_e = None + days_since_last = None + status = "no data" + + props = { + "source": site.get("source") or "", + "id": site.get("id") or "", + "name": site.get("name"), + "well_depth": site.get("well_depth"), + "well_depth_units": site.get("well_depth_units"), + "record_count": record_count, + "first_observation_datetime": _iso_utc(first_e), + "last_observation_datetime": _iso_utc(last_e), + "days_since_last": days_since_last, + "status": status, + } + features.append({ + "type": "Feature", + "id": _feature_id(props["source"], props["id"]), + "geometry": _point_geometry( + site.get("latitude"), site.get("longitude"), site.get("elevation") + ), + "properties": props, + }) + return _dump_collection( path, collection_id, features, meta, - extra={"trend_method": TREND_METHOD_DESCRIPTION}, + extra={"stale_threshold_days": stale_days}, ) diff --git a/backend/trend_stats.py b/backend/trend_stats.py new file mode 100644 index 0000000..8c14cbf --- /dev/null +++ b/backend/trend_stats.py @@ -0,0 +1,143 @@ +# =============================================================================== +# Trend statistics for DIE products. +# +# Pure analysis (no serialization / I/O): daily aggregation, the qualification +# gate, and the Mann-Kendall + Theil-Sen trend test. Kept separate from +# backend/persisters/ogc_features.py (which only builds GeoJSON) because this is +# statistics, is independently testable, and pulls heavier deps (scipy, +# pymannkendall) lazily. +# =============================================================================== +from datetime import datetime, timezone +from typing import Optional + +# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope +# per year. +_SECONDS_PER_YEAR = 31557600.0 + +# A well is classified only when it has enough data for a meaningful +# Mann-Kendall test: at least 10 daily points, or at least 4 spanning >= 2 years. +_TREND_MIN_RECORDS = 10 +_TREND_MIN_RECORDS_WITH_SPAN = 4 +_TREND_MIN_SPAN_YEARS = 2.0 +_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test + +_MK_COMMON = ( + "Monotonic trend is tested with the non-parametric Mann-Kendall test " + "(pymannkendall.original_test, alpha=0.05) on the daily series ordered by " + "time; the rate is the Theil-Sen slope vs time. A well is classified only " + "when it has at least 10 daily points, or at least 4 daily points spanning " + "at least 2 years; otherwise 'not enough data'." +) + +# Human-readable description of each trend product's method, embedded in the +# collection so consumers know how the classification was derived. +TREND_METHOD_DESCRIPTION = ( + "Depth-to-water trend per well. Observations are downsampled to one point " + "per calendar day, keeping the daily MINIMUM depth-to-water (shallowest " + f"reading). {_MK_COMMON} A significant increasing slope is 'increasing' " + "(water level getting DEEPER, i.e. a declining water table), a significant " + "decreasing slope is 'decreasing' (water level getting SHALLOWER), else " + "'stable'. Mirrors the Ocotillo API ogc_depth_to_water_trend_wells " + "materialized view, using Mann-Kendall + Theil-Sen instead of OLS." +) + +ANALYTE_TREND_METHOD_DESCRIPTION = ( + "Analyte concentration trend per well. Observations are downsampled to one " + f"point per calendar day, keeping the daily MEAN concentration. {_MK_COMMON} " + "A significant increasing slope is 'increasing' (concentration rising), a " + "significant decreasing slope is 'decreasing', else 'stable'." +) + + +def parse_epoch_seconds(date, time) -> Optional[float]: + """Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC).""" + if not date: + return None + text = f"{date}T{time}" if time else str(date) + text = text.replace("Z", "") + try: + dt = datetime.fromisoformat(text) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + pass + try: + dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc) + return dt.timestamp() + except (ValueError, TypeError): + return None + + +def daily_series(obs_list: list, reducer: str = "min") -> tuple[int, list]: + """Reduce a well's observations to one point per calendar day, keyed at the + day's UTC midnight epoch. *reducer* selects the daily aggregate: "min" + (shallowest depth-to-water), "max", or "mean" (e.g. analyte concentration). + + *obs_list* is a list of observation payload dicts (parameter_value, + date_measured, time_measured). Operating on dicts avoids rebuilding + ParameterRecord objects for what can be millions of observations. + + Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells + (e.g. continuous loggers) and removes within-day sampling noise. Returns + (raw_observation_count, [(day_epoch_seconds, value), ...] sorted by day). + """ + raw_count = 0 + buckets: dict = {} # date -> (day_epoch, [values]) + for obs in obs_list: + value = obs.get("parameter_value") + epoch = parse_epoch_seconds(obs.get("date_measured"), obs.get("time_measured")) + if value is None or epoch is None: + continue + try: + v = float(value) + except (TypeError, ValueError): + continue + raw_count += 1 + day = datetime.fromtimestamp(epoch, tz=timezone.utc).date() + day_epoch = datetime( + day.year, day.month, day.day, tzinfo=timezone.utc + ).timestamp() + if day not in buckets: + buckets[day] = (day_epoch, []) + buckets[day][1].append(v) + + reduce_fn = { + "min": min, + "max": max, + "mean": lambda vs: sum(vs) / len(vs), + }[reducer] + + pairs = sorted( + ((day_epoch, reduce_fn(vals)) for day_epoch, vals in buckets.values()), + key=lambda p: p[0], + ) + return raw_count, pairs + + +def qualifies_for_trend(record_count, span_years) -> bool: + return record_count >= _TREND_MIN_RECORDS or ( + record_count >= _TREND_MIN_RECORDS_WITH_SPAN + and span_years >= _TREND_MIN_SPAN_YEARS + ) + + +def mann_kendall_trend(years: list, values: list): + """Run the Mann-Kendall trend test + Theil-Sen slope. + + Returns (trend_category, slope_per_year, p_value, tau). *years* are decimal + years, *values* the measured quantity, both ordered by time. trend_category + is one of 'increasing' / 'decreasing' / 'stable'. + """ + import pymannkendall as mk + from scipy.stats import theilslopes + + result = mk.original_test(values, alpha=_TREND_ALPHA) + # Time-aware Theil-Sen slope (per year) — robust and correct for the + # irregular sampling typical of these records, unlike MK's index-based slope + # which assumes unit spacing. + slope_per_year = float(theilslopes(values, years)[0]) + + # mk trend is 'increasing' / 'decreasing' / 'no trend'. + category = "stable" if result.trend == "no trend" else result.trend + return category, slope_per_year, float(result.p), float(result.Tau) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index b1d43ca..b268e82 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -28,6 +28,7 @@ import tempfile import traceback from collections.abc import Iterator +from datetime import datetime, timezone from pathlib import Path import dagster as dg @@ -35,10 +36,13 @@ from backend.config import PARAMETER_SOURCE_MAP, WATERLEVELS from backend.persisters.ogc_features import ( + ANALYTE_TREND_METHOD_DESCRIPTION, dump_major_chemistry_collection, + dump_mcl_exceedance_collection, + dump_monitoring_recency_collection, dump_summary_collection, dump_timeseries_collection, - dump_waterlevel_trend_collection, + dump_trend_collection, ) from backend.record import ParameterRecord, SiteRecord, SummaryRecord from backend.unifier import unify_source @@ -50,6 +54,13 @@ _CHECK_NAME = "returned_data" _GEOSERVER_CHECK_NAME = "registered" +# GCS key (within the products bucket) of the MCL threshold file — the source of +# truth for the ogc_mcl_exceedance product. +_MCL_KEY = "config/mcl.json" + +# Multi-analyte products that gather one summary record per analyte per well. +_MULTI_ANALYTE_OUTPUT_TYPES = ("ogc_major_chemistry", "ogc_mcl_exceedance") + # Classic major-ion suite for the ogc_major_chemistry product. One feature per # well, with each analyte's latest value/units/date as properties. _MAJOR_CHEMISTRY = [ @@ -66,9 +77,14 @@ def _product_params(product: dict) -> list[str]: """The DIE parameter(s) a product unifies. Single-parameter products yield - one; the major-chemistry product yields the major-ion suite.""" - if product.get("output_type") == "ogc_major_chemistry": + one; multi-analyte products yield their analyte list.""" + output_type = product.get("output_type") + if output_type == "ogc_major_chemistry": return list(_MAJOR_CHEMISTRY) + if output_type == "ogc_mcl_exceedance": + # Candidate analytes for the static asset graph; the MCL JSON is the + # source of truth for which actually have thresholds. + return list(product["analytes"]) return [product["parameter"]] @@ -214,16 +230,35 @@ def _combine_asset( # to one feature per well with analytes as properties. records = [SummaryRecord(p) for p in all_records] dump_major_chemistry_collection(str(out), records, meta) + elif output_type == "ogc_mcl_exceedance": + # Compare each well's latest analyte values to the MCL JSON + # (source of truth) read from GCS. + thresholds = gcs.read_json(_MCL_KEY) + records = [SummaryRecord(p) for p in all_records] + dump_mcl_exceedance_collection(str(out), records, meta, thresholds) elif output_type == "ogc_summary": records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) elif output_type == "ogc_waterlevel_trend": - # all_sites and all_timeseries are index-aligned payload dicts - # (see source asset). The trend dumper consumes dicts directly — - # no ParameterRecord/SiteRecord rebuild — to keep memory bounded - # for statewide, high-frequency water-level data. - dump_waterlevel_trend_collection( - str(out), all_sites, all_timeseries, meta + # all_sites/all_timeseries are index-aligned payload dicts (see + # source asset); consumed as dicts to keep memory bounded. + dump_trend_collection( + str(out), all_sites, all_timeseries, meta, + slope_units="ft/year", reducer="min", + ) + elif output_type == "ogc_analyte_trend": + dump_trend_collection( + str(out), all_sites, all_timeseries, meta, + slope_units="mg/L/year", reducer="mean", + method=ANALYTE_TREND_METHOD_DESCRIPTION, + parameter_name=product.get("parameter"), + ) + elif output_type == "ogc_monitoring_recency": + run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + dump_monitoring_recency_collection( + str(out), all_sites, all_timeseries, meta, + run_date=run_date, + stale_days=int(product.get("stale_days", 365)), ) else: site_records = [SiteRecord(p) for p in all_sites] diff --git a/orchestration/config/mcl.json b/orchestration/config/mcl.json new file mode 100644 index 0000000..d5823e7 --- /dev/null +++ b/orchestration/config/mcl.json @@ -0,0 +1,69 @@ +{ + "_readme": "Drinking-water Maximum Contaminant Levels (MCLs) for the DIE nm_mcl_exceedance product. This file is the source of truth; upload it to gs:///config/mcl.json. Each top-level key (except those starting with '_') is a DIE analyte name; the product compares each well's latest value to that analyte's 'mcl' and flags value > mcl. Analytes absent here are reported without a flag.", + "_schema": { + "mcl": "number — the threshold, in 'units'. Used by the product (value > mcl => exceedance).", + "units": "string — units of mcl; MUST match DIE's normalized output units for the analyte (mg/L).", + "type": "string — 'primary' (enforceable health-based NPDWR) or 'secondary' (non-mandatory aesthetic SMCL). Used by the product.", + "basis": "string — what the value is measured as (e.g. nitrate 'as N'); the data MUST be on the same basis for the comparison to be valid.", + "label": "string — human-readable contaminant name.", + "note": "string — clarifications/caveats." + }, + "_source": { + "primary": "EPA National Primary Drinking Water Regulations — https://www.epa.gov/ground-water-and-drinking-water/national-primary-drinking-water-regulations", + "secondary": "EPA Secondary Drinking Water Standards — https://www.epa.gov/sdwa/secondary-drinking-water-standards-guidance-nuisance-chemicals", + "retrieved": "2026-06-28" + }, + "arsenic": { + "mcl": 0.01, + "units": "mg/L", + "type": "primary", + "label": "Arsenic", + "note": "EPA NPDWR MCL 0.010 mg/L (10 ug/L)." + }, + "nitrate": { + "mcl": 10.0, + "units": "mg/L", + "type": "primary", + "basis": "as N", + "label": "Nitrate (as nitrogen)", + "note": "EPA MCL is 10 mg/L measured as nitrogen (N). If DIE reports nitrate as NO3, convert (as-NO3 MCL ~= 44.3 mg/L) or this flag is wrong." + }, + "fluoride": { + "mcl": 4.0, + "units": "mg/L", + "type": "primary", + "label": "Fluoride", + "note": "Primary (health) MCL 4.0 mg/L. A secondary aesthetic SMCL of 2.0 mg/L also exists; this file uses the enforceable primary MCL." + }, + "uranium": { + "mcl": 0.03, + "units": "mg/L", + "type": "primary", + "label": "Uranium", + "note": "EPA MCL 30 ug/L = 0.030 mg/L (effective 2003)." + }, + "chloride": { + "mcl": 250.0, + "units": "mg/L", + "type": "secondary", + "label": "Chloride", + "note": "Secondary (aesthetic) SMCL 250 mg/L." + }, + "sulfate": { + "mcl": 250.0, + "units": "mg/L", + "type": "secondary", + "label": "Sulfate", + "note": "Secondary (aesthetic) SMCL 250 mg/L." + }, + "tds": { + "mcl": 500.0, + "units": "mg/L", + "type": "secondary", + "label": "Total dissolved solids", + "note": "Secondary (aesthetic) SMCL 500 mg/L." + }, + "_omitted": { + "ph": "EPA secondary standard is a RANGE (6.5-8.5), not a single MCL, so it does not fit the value > mcl test and is excluded." + } +} diff --git a/orchestration/config/products.yaml b/orchestration/config/products.yaml index c29768f..b39d1be 100644 --- a/orchestration/config/products.yaml +++ b/orchestration/config/products.yaml @@ -83,3 +83,55 @@ products: state: NM sources: exclude: [] + + # Per-well analyte concentration trend (Mann-Kendall + Theil-Sen, daily mean). + # One product per analyte, driven by `parameter`. + - id: nm_arsenic_trend + parameter: arsenic + output_type: ogc_analyte_trend + title: "NM Arsenic Trends" + description: "Per-well arsenic concentration trend (slope and category), all NM sources" + schedule: "0 13 * * *" + spatial_filter: + state: NM + sources: + exclude: [] + + - id: nm_nitrate_trend + parameter: nitrate + output_type: ogc_analyte_trend + title: "NM Nitrate Trends" + description: "Per-well nitrate concentration trend (slope and category), all NM sources" + schedule: "0 14 * * *" + spatial_filter: + state: NM + sources: + exclude: [] + + # One feature per well flagging drinking-water MCL exceedances. Thresholds are + # read at run time from gs:///config/mcl.json (source of truth); + # `analytes` lists the candidates gathered for comparison. + - id: nm_mcl_exceedance + output_type: ogc_mcl_exceedance + title: "NM MCL Exceedances" + description: "Wells flagged against drinking-water MCLs, all NM sources" + schedule: "0 15 * * *" + analytes: [arsenic, nitrate, fluoride, uranium, sulfate, chloride, tds] + spatial_filter: + state: NM + sources: + exclude: [] + + # One feature per well: how recently it was measured (water levels), to surface + # dead/lagging monitoring points. status = active/stale at stale_days. + - id: nm_monitoring_recency + parameter: waterlevels + output_type: ogc_monitoring_recency + title: "NM Monitoring Recency" + description: "Per-well water-level monitoring recency and status, all NM sources" + schedule: "0 16 * * *" + stale_days: 365 + spatial_filter: + state: NM + sources: + exclude: [] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 57178a2..38de0fc 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -24,6 +24,9 @@ "ogc_timeseries", "ogc_major_chemistry", "ogc_waterlevel_trend", + "ogc_analyte_trend", + "ogc_mcl_exceedance", + "ogc_monitoring_recency", } diff --git a/orchestration/pyproject.toml b/orchestration/pyproject.toml index 271ee9d..ebc4b51 100644 --- a/orchestration/pyproject.toml +++ b/orchestration/pyproject.toml @@ -10,6 +10,7 @@ requires-python = ">=3.14" dependencies = [ "dagster>=1.8", "dagster-cloud", + "dagster-dg-cli", "dagster-gcp>=0.24", "dagster-webserver>=1.8", "google-cloud-storage", diff --git a/orchestration/resources/die_config.py b/orchestration/resources/die_config.py index 869c85e..7b60090 100644 --- a/orchestration/resources/die_config.py +++ b/orchestration/resources/die_config.py @@ -42,7 +42,11 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config: sources_spec = product.get("sources", {}) output_type = product.get("output_type", "ogc_summary") - is_summary = output_type in ("ogc_summary", "ogc_major_chemistry") + is_summary = output_type in ( + "ogc_summary", + "ogc_major_chemistry", + "ogc_mcl_exceedance", + ) payload: dict = { "yes": True, diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 119c808..a118ac0 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -66,6 +66,13 @@ class GCSResource(dg.ConfigurableResource): def _client(self): return _storage_client() + def read_json(self, key: str) -> dict: + """Read and parse a JSON object from the bucket at *key* (e.g. + 'config/mcl.json'). Raises if the object is missing.""" + client = self._client() + bucket = client.bucket(self.bucket_name) + return json.loads(bucket.blob(key).download_as_bytes()) + def download_latest(self, product_id: str, dest_path: str) -> str: """Download a product's latest.geojson to *dest_path*. Returns the path.""" client = self._client() diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index 7f10500..d5547cd 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -230,7 +230,7 @@ def test_geometry_and_required_fields(self, tmp_path): assert feat["geometry"]["coordinates"] == [-106.0, 34.0] -from backend.persisters.ogc_features import dump_waterlevel_trend_collection +from backend.persisters.ogc_features import dump_trend_collection # The trend dumper consumes payload dicts directly (no record rebuild). @@ -257,14 +257,14 @@ def test_classifies_trends_and_carries_method(self, tmp_path): series = [increasing, stable, sparse, decreasing] out = tmp_path / "tr.geojson" - result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + result = dump_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}, slope_units="ft/year", reducer="min") assert result["numberReturned"] == 4 assert "trend_method" in result and result["trend_method"] by_id = {f["id"]: f["properties"] for f in result["features"]} assert by_id["NMBGMR:A"]["trend_category"] == "increasing" - assert round(by_id["NMBGMR:A"]["slope_ft_per_year"], 2) == 0.5 + assert round(by_id["NMBGMR:A"]["slope_per_year"], 2) == 0.5 assert by_id["NMBGMR:B"]["trend_category"] == "stable" assert by_id["NWIS:C"]["trend_category"] == "not enough data" # only 3 records assert by_id["PVACD:D"]["trend_category"] == "decreasing" # 5 records / 4 yr span @@ -273,7 +273,7 @@ def test_required_fields_and_geometry(self, tmp_path): sites = [_trend_site(rid="W1")] series = [[_trend_obs("2010-01-01", 50.0)]] out = tmp_path / "tr.geojson" - result = dump_waterlevel_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}) + result = dump_trend_collection(str(out), sites, series, {"id": "nm_waterlevel_trends"}, slope_units="ft/year", reducer="min") assert result["type"] == "FeatureCollection" assert "timeStamp" in result feat = result["features"][0] @@ -291,8 +291,8 @@ def test_downsamples_to_daily_min(self, tmp_path): _trend_obs("2020-01-02", 52.0), ] out = tmp_path / "tr.geojson" - result = dump_waterlevel_trend_collection( - str(out), [_trend_site(rid="W1")], [obs], {"id": "nm_waterlevel_trends"} + result = dump_trend_collection( + str(out), [_trend_site(rid="W1")], [obs], {"id": "nm_waterlevel_trends"}, slope_units="ft/year", reducer="min" ) props = result["features"][0]["properties"] assert props["observation_count"] == 3 @@ -309,8 +309,8 @@ def test_trend_feature_includes_source_datastream_link(self, tmp_path): for i in range(12) ] out = tmp_path / "tr.geojson" - result = dump_waterlevel_trend_collection( - str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + result = dump_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"}, slope_units="ft/year", reducer="min" ) assert ( result["features"][0]["properties"]["source_datastream_link"] @@ -321,8 +321,8 @@ def test_trend_feature_omits_link_when_absent(self, tmp_path): site = _trend_site(source="NWIS", rid="W2") obs = [_trend_obs(f"{2010 + i}-01-01", 50.0) for i in range(12)] out = tmp_path / "tr.geojson" - result = dump_waterlevel_trend_collection( - str(out), [site], [obs], {"id": "nm_waterlevel_trends"} + result = dump_trend_collection( + str(out), [site], [obs], {"id": "nm_waterlevel_trends"}, slope_units="ft/year", reducer="min" ) assert "source_datastream_link" not in result["features"][0]["properties"] @@ -335,3 +335,87 @@ def test_summary_feature_includes_source_datastream_link(self, tmp_path): result["features"][0]["properties"]["source_datastream_link"] == "https://st2/Datastreams(9)" ) + + +from backend.persisters.ogc_features import ( + dump_mcl_exceedance_collection, + dump_monitoring_recency_collection, +) + + +def _mcl_record(source, rid, analyte, value): + return SummaryRecord({ + "source": source, "id": rid, "name": f"Well {rid}", + "latitude": 34.0, "longitude": -106.0, "elevation": None, + "well_depth": None, "well_depth_units": "ft", + "parameter_name": analyte, "latest_value": value, + }) + + +class TestMCLExceedanceCollection: + def test_flags_exceedances(self, tmp_path): + recs = [ + _mcl_record("WQP", "W1", "arsenic", 0.02), # > 0.01 -> exceeds + _mcl_record("WQP", "W1", "nitrate", 5.0), # < 10 -> ok + ] + thresholds = {"arsenic": {"mcl": 0.01, "type": "primary"}, + "nitrate": {"mcl": 10.0, "type": "primary"}} + out = tmp_path / "mcl.geojson" + result = dump_mcl_exceedance_collection(str(out), recs, {"id": "nm_mcl"}, thresholds) + props = result["features"][0]["properties"] + assert props["arsenic_exceeds"] is True + assert props["nitrate_exceeds"] is False + assert props["any_exceedance"] is True + assert props["exceedance_count"] == 1 + assert props["exceeded_analytes"] == ["arsenic"] + assert result["mcl_thresholds"] == thresholds + + def test_analyte_without_threshold_not_flagged(self, tmp_path): + recs = [_mcl_record("WQP", "W1", "calcium", 999.0)] + out = tmp_path / "mcl.geojson" + result = dump_mcl_exceedance_collection(str(out), recs, {"id": "nm_mcl"}, {}) + props = result["features"][0]["properties"] + assert props["calcium"] == 999.0 + assert "calcium_exceeds" not in props + assert props["any_exceedance"] is False + + +class TestMonitoringRecencyCollection: + def test_status_active_and_stale(self, tmp_path): + active = _trend_site(source="PVACD", rid="A") + stale = _trend_site(source="PVACD", rid="B") + nodata = _trend_site(source="PVACD", rid="C") + sites = [active, stale, nodata] + series = [ + [_trend_obs("2024-01-01", 1.0)], + [_trend_obs("2019-01-01", 1.0)], + [], + ] + out = tmp_path / "rec.geojson" + result = dump_monitoring_recency_collection( + str(out), sites, series, {"id": "nm_rec"}, run_date="2024-06-01", stale_days=365 + ) + by_id = {f["id"]: f["properties"] for f in result["features"]} + assert by_id["PVACD:A"]["status"] == "active" + assert by_id["PVACD:B"]["status"] == "stale" + assert by_id["PVACD:C"]["status"] == "no data" + assert by_id["PVACD:C"]["record_count"] == 0 + assert result["stale_threshold_days"] == 365 + + +class TestAnalyteTrend: + def test_daily_mean_and_units(self, tmp_path): + site = _trend_site(source="WQP", rid="W1") + # two readings same day -> mean; rising over years -> increasing + obs = [] + for i in range(12): + obs.append(_trend_obs(f"{2010 + i}-01-01", 0.005 + 0.001 * i)) + out = tmp_path / "at.geojson" + result = dump_trend_collection( + str(out), [site], [obs], {"id": "nm_arsenic_trend"}, + slope_units="mg/L/year", reducer="mean", parameter_name="arsenic", + ) + props = result["features"][0]["properties"] + assert props["parameter_name"] == "arsenic" + assert props["slope_units"] == "mg/L/year" + assert props["trend_category"] == "increasing" diff --git a/tests/test_trend_stats.py b/tests/test_trend_stats.py new file mode 100644 index 0000000..67ee100 --- /dev/null +++ b/tests/test_trend_stats.py @@ -0,0 +1,61 @@ +from backend.trend_stats import ( + daily_series, + qualifies_for_trend, + mann_kendall_trend, + parse_epoch_seconds, + _SECONDS_PER_YEAR, +) + + +def _obs(date, value): + return {"parameter_value": value, "date_measured": date, "time_measured": None} + + +class TestParseEpochSeconds: + def test_date_only_and_none(self): + assert parse_epoch_seconds("2020-01-01", None) is not None + assert parse_epoch_seconds(None, None) is None + assert parse_epoch_seconds("not-a-date", None) is None + + +class TestDailySeries: + def test_collapses_same_day_with_reducer(self): + obs = [_obs("2020-01-01", 5.0), _obs("2020-01-01", 1.0), _obs("2020-01-02", 9.0)] + raw_min, pairs_min = daily_series(obs, "min") + raw_mean, pairs_mean = daily_series(obs, "mean") + assert raw_min == 3 + assert [v for _, v in pairs_min] == [1.0, 9.0] # daily min + assert [v for _, v in pairs_mean] == [3.0, 9.0] # daily mean + # sorted by day ascending + assert pairs_min[0][0] < pairs_min[1][0] + + def test_skips_unparseable(self): + obs = [_obs("2020-01-01", None), _obs(None, 5.0), _obs("2020-01-02", 3.0)] + raw, pairs = daily_series(obs, "min") + assert raw == 1 + assert [v for _, v in pairs] == [3.0] + + +class TestQualifies: + def test_gate(self): + assert qualifies_for_trend(10, 0) is True # >= 10 records + assert qualifies_for_trend(4, 2.0) is True # 4 records, 2yr span + assert qualifies_for_trend(4, 1.0) is False # span too short + assert qualifies_for_trend(3, 50.0) is False # too few records + + +class TestMannKendall: + def test_increasing_decreasing_stable(self): + years = [2010 + i for i in range(12)] + inc = mann_kendall_trend(years, [1.0 + i for i in range(12)]) + dec = mann_kendall_trend(years, [12.0 - i for i in range(12)]) + flat = mann_kendall_trend(years, [5.0] * 12) + assert inc[0] == "increasing" and inc[1] > 0 + assert dec[0] == "decreasing" and dec[1] < 0 + assert flat[0] == "stable" + + def test_slope_is_per_year(self): + # +0.5 per year over 12 years -> Theil-Sen slope ~0.5/yr + years = [2010 + i for i in range(12)] + cat, slope, p, tau = mann_kendall_trend(years, [50.0 + 0.5 * i for i in range(12)]) + assert round(slope, 3) == 0.5