Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 197 additions & 144 deletions backend/persisters/ogc_features.py

Large diffs are not rendered by default.

143 changes: 143 additions & 0 deletions backend/trend_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# ===============================================================================
# Trend statistics for DIE products.
#
# Pure analysis (no serialization / I/O): daily aggregation, the qualification
# gate, and the Mann-Kendall + Theil-Sen trend test. Kept separate from
# backend/persisters/ogc_features.py (which only builds GeoJSON) because this is
# statistics, is independently testable, and pulls heavier deps (scipy,
# pymannkendall) lazily.
# ===============================================================================
from datetime import datetime, timezone
from typing import Optional

# Seconds per Julian year (365.25 days) — used to express the Theil-Sen slope
# per year.
_SECONDS_PER_YEAR = 31557600.0

# A well is classified only when it has enough data for a meaningful
# Mann-Kendall test: at least 10 daily points, or at least 4 spanning >= 2 years.
_TREND_MIN_RECORDS = 10
_TREND_MIN_RECORDS_WITH_SPAN = 4
_TREND_MIN_SPAN_YEARS = 2.0
_TREND_ALPHA = 0.05 # significance level for the Mann-Kendall test

_MK_COMMON = (
"Monotonic trend is tested with the non-parametric Mann-Kendall test "
"(pymannkendall.original_test, alpha=0.05) on the daily series ordered by "
"time; the rate is the Theil-Sen slope vs time. A well is classified only "
"when it has at least 10 daily points, or at least 4 daily points spanning "
"at least 2 years; otherwise 'not enough data'."
)

# Human-readable description of each trend product's method, embedded in the
# collection so consumers know how the classification was derived.
TREND_METHOD_DESCRIPTION = (
"Depth-to-water trend per well. Observations are downsampled to one point "
"per calendar day, keeping the daily MINIMUM depth-to-water (shallowest "
f"reading). {_MK_COMMON} A significant increasing slope is 'increasing' "
"(water level getting DEEPER, i.e. a declining water table), a significant "
"decreasing slope is 'decreasing' (water level getting SHALLOWER), else "
"'stable'. Mirrors the Ocotillo API ogc_depth_to_water_trend_wells "
"materialized view, using Mann-Kendall + Theil-Sen instead of OLS."
)

ANALYTE_TREND_METHOD_DESCRIPTION = (
"Analyte concentration trend per well. Observations are downsampled to one "
f"point per calendar day, keeping the daily MEAN concentration. {_MK_COMMON} "
"A significant increasing slope is 'increasing' (concentration rising), a "
"significant decreasing slope is 'decreasing', else 'stable'."
)


def parse_epoch_seconds(date, time) -> Optional[float]:
"""Best-effort parse of a DIE date (+optional time) to POSIX seconds (UTC)."""
if not date:
return None
text = f"{date}T{time}" if time else str(date)
text = text.replace("Z", "")
try:
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.timestamp()
except (ValueError, TypeError):
pass
try:
dt = datetime.fromisoformat(str(date)).replace(tzinfo=timezone.utc)
return dt.timestamp()
except (ValueError, TypeError):
return None


def daily_series(obs_list: list, reducer: str = "min") -> tuple[int, list]:
"""Reduce a well's observations to one point per calendar day, keyed at the
day's UTC midnight epoch. *reducer* selects the daily aggregate: "min"
(shallowest depth-to-water), "max", or "mean" (e.g. analyte concentration).

*obs_list* is a list of observation payload dicts (parameter_value,
date_measured, time_measured). Operating on dicts avoids rebuilding
ParameterRecord objects for what can be millions of observations.

Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells
(e.g. continuous loggers) and removes within-day sampling noise. Returns
(raw_observation_count, [(day_epoch_seconds, value), ...] sorted by day).
"""
raw_count = 0
buckets: dict = {} # date -> (day_epoch, [values])
for obs in obs_list:
value = obs.get("parameter_value")
epoch = parse_epoch_seconds(obs.get("date_measured"), obs.get("time_measured"))
if value is None or epoch is None:
continue
try:
v = float(value)
except (TypeError, ValueError):
continue
raw_count += 1
day = datetime.fromtimestamp(epoch, tz=timezone.utc).date()
day_epoch = datetime(
day.year, day.month, day.day, tzinfo=timezone.utc
).timestamp()
if day not in buckets:
buckets[day] = (day_epoch, [])
buckets[day][1].append(v)

reduce_fn = {
"min": min,
"max": max,
"mean": lambda vs: sum(vs) / len(vs),
}[reducer]

pairs = sorted(
((day_epoch, reduce_fn(vals)) for day_epoch, vals in buckets.values()),
key=lambda p: p[0],
)
return raw_count, pairs


def qualifies_for_trend(record_count, span_years) -> bool:
return record_count >= _TREND_MIN_RECORDS or (
record_count >= _TREND_MIN_RECORDS_WITH_SPAN
and span_years >= _TREND_MIN_SPAN_YEARS
)


def mann_kendall_trend(years: list, values: list):
"""Run the Mann-Kendall trend test + Theil-Sen slope.

Returns (trend_category, slope_per_year, p_value, tau). *years* are decimal
years, *values* the measured quantity, both ordered by time. trend_category
is one of 'increasing' / 'decreasing' / 'stable'.
"""
import pymannkendall as mk
from scipy.stats import theilslopes

result = mk.original_test(values, alpha=_TREND_ALPHA)
# Time-aware Theil-Sen slope (per year) — robust and correct for the
# irregular sampling typical of these records, unlike MK's index-based slope
# which assumes unit spacing.
slope_per_year = float(theilslopes(values, years)[0])

# mk trend is 'increasing' / 'decreasing' / 'no trend'.
category = "stable" if result.trend == "no trend" else result.trend
return category, slope_per_year, float(result.p), float(result.Tau)
53 changes: 44 additions & 9 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@
import tempfile
import traceback
from collections.abc import Iterator
from datetime import datetime, timezone
from pathlib import Path

import dagster as dg
import geopandas as gpd

from backend.config import PARAMETER_SOURCE_MAP, WATERLEVELS
from backend.persisters.ogc_features import (
ANALYTE_TREND_METHOD_DESCRIPTION,
dump_major_chemistry_collection,
dump_mcl_exceedance_collection,
dump_monitoring_recency_collection,
dump_summary_collection,
dump_timeseries_collection,
dump_waterlevel_trend_collection,
dump_trend_collection,
)
from backend.record import ParameterRecord, SiteRecord, SummaryRecord
from backend.unifier import unify_source
Expand All @@ -50,6 +54,13 @@
_CHECK_NAME = "returned_data"
_GEOSERVER_CHECK_NAME = "registered"

# GCS key (within the products bucket) of the MCL threshold file — the source of
# truth for the ogc_mcl_exceedance product.
_MCL_KEY = "config/mcl.json"

# Multi-analyte products that gather one summary record per analyte per well.
_MULTI_ANALYTE_OUTPUT_TYPES = ("ogc_major_chemistry", "ogc_mcl_exceedance")

# Classic major-ion suite for the ogc_major_chemistry product. One feature per
# well, with each analyte's latest value/units/date as properties.
_MAJOR_CHEMISTRY = [
Expand All @@ -66,9 +77,14 @@

def _product_params(product: dict) -> list[str]:
"""The DIE parameter(s) a product unifies. Single-parameter products yield
one; the major-chemistry product yields the major-ion suite."""
if product.get("output_type") == "ogc_major_chemistry":
one; multi-analyte products yield their analyte list."""
output_type = product.get("output_type")
if output_type == "ogc_major_chemistry":
return list(_MAJOR_CHEMISTRY)
if output_type == "ogc_mcl_exceedance":
# Candidate analytes for the static asset graph; the MCL JSON is the
# source of truth for which actually have thresholds.
return list(product["analytes"])
return [product["parameter"]]


Expand Down Expand Up @@ -214,16 +230,35 @@ def _combine_asset(
# to one feature per well with analytes as properties.
records = [SummaryRecord(p) for p in all_records]
dump_major_chemistry_collection(str(out), records, meta)
elif output_type == "ogc_mcl_exceedance":
# Compare each well's latest analyte values to the MCL JSON
# (source of truth) read from GCS.
thresholds = gcs.read_json(_MCL_KEY)
records = [SummaryRecord(p) for p in all_records]
dump_mcl_exceedance_collection(str(out), records, meta, thresholds)
elif output_type == "ogc_summary":
records = [SummaryRecord(p) for p in all_records]
dump_summary_collection(str(out), records, meta)
elif output_type == "ogc_waterlevel_trend":
# all_sites and all_timeseries are index-aligned payload dicts
# (see source asset). The trend dumper consumes dicts directly —
# no ParameterRecord/SiteRecord rebuild — to keep memory bounded
# for statewide, high-frequency water-level data.
dump_waterlevel_trend_collection(
str(out), all_sites, all_timeseries, meta
# all_sites/all_timeseries are index-aligned payload dicts (see
# source asset); consumed as dicts to keep memory bounded.
dump_trend_collection(
str(out), all_sites, all_timeseries, meta,
slope_units="ft/year", reducer="min",
)
elif output_type == "ogc_analyte_trend":
dump_trend_collection(
str(out), all_sites, all_timeseries, meta,
slope_units="mg/L/year", reducer="mean",
method=ANALYTE_TREND_METHOD_DESCRIPTION,
parameter_name=product.get("parameter"),
)
elif output_type == "ogc_monitoring_recency":
run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
dump_monitoring_recency_collection(
str(out), all_sites, all_timeseries, meta,
run_date=run_date,
stale_days=int(product.get("stale_days", 365)),
)
else:
site_records = [SiteRecord(p) for p in all_sites]
Expand Down
69 changes: 69 additions & 0 deletions orchestration/config/mcl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"_readme": "Drinking-water Maximum Contaminant Levels (MCLs) for the DIE nm_mcl_exceedance product. This file is the source of truth; upload it to gs://<products_bucket>/config/mcl.json. Each top-level key (except those starting with '_') is a DIE analyte name; the product compares each well's latest value to that analyte's 'mcl' and flags value > mcl. Analytes absent here are reported without a flag.",
"_schema": {
"mcl": "number — the threshold, in 'units'. Used by the product (value > mcl => exceedance).",
"units": "string — units of mcl; MUST match DIE's normalized output units for the analyte (mg/L).",
"type": "string — 'primary' (enforceable health-based NPDWR) or 'secondary' (non-mandatory aesthetic SMCL). Used by the product.",
"basis": "string — what the value is measured as (e.g. nitrate 'as N'); the data MUST be on the same basis for the comparison to be valid.",
"label": "string — human-readable contaminant name.",
"note": "string — clarifications/caveats."
},
"_source": {
"primary": "EPA National Primary Drinking Water Regulations — https://www.epa.gov/ground-water-and-drinking-water/national-primary-drinking-water-regulations",
"secondary": "EPA Secondary Drinking Water Standards — https://www.epa.gov/sdwa/secondary-drinking-water-standards-guidance-nuisance-chemicals",
"retrieved": "2026-06-28"
},
"arsenic": {
"mcl": 0.01,
"units": "mg/L",
"type": "primary",
"label": "Arsenic",
"note": "EPA NPDWR MCL 0.010 mg/L (10 ug/L)."
},
"nitrate": {
"mcl": 10.0,
"units": "mg/L",
"type": "primary",
"basis": "as N",
"label": "Nitrate (as nitrogen)",
"note": "EPA MCL is 10 mg/L measured as nitrogen (N). If DIE reports nitrate as NO3, convert (as-NO3 MCL ~= 44.3 mg/L) or this flag is wrong."
},
"fluoride": {
"mcl": 4.0,
"units": "mg/L",
"type": "primary",
"label": "Fluoride",
"note": "Primary (health) MCL 4.0 mg/L. A secondary aesthetic SMCL of 2.0 mg/L also exists; this file uses the enforceable primary MCL."
},
"uranium": {
"mcl": 0.03,
"units": "mg/L",
"type": "primary",
"label": "Uranium",
"note": "EPA MCL 30 ug/L = 0.030 mg/L (effective 2003)."
},
"chloride": {
"mcl": 250.0,
"units": "mg/L",
"type": "secondary",
"label": "Chloride",
"note": "Secondary (aesthetic) SMCL 250 mg/L."
},
"sulfate": {
"mcl": 250.0,
"units": "mg/L",
"type": "secondary",
"label": "Sulfate",
"note": "Secondary (aesthetic) SMCL 250 mg/L."
},
"tds": {
"mcl": 500.0,
"units": "mg/L",
"type": "secondary",
"label": "Total dissolved solids",
"note": "Secondary (aesthetic) SMCL 500 mg/L."
},
"_omitted": {
"ph": "EPA secondary standard is a RANGE (6.5-8.5), not a single MCL, so it does not fit the value > mcl test and is excluded."
}
}
52 changes: 52 additions & 0 deletions orchestration/config/products.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,55 @@ products:
state: NM
sources:
exclude: []

# Per-well analyte concentration trend (Mann-Kendall + Theil-Sen, daily mean).
# One product per analyte, driven by `parameter`.
- id: nm_arsenic_trend
parameter: arsenic
output_type: ogc_analyte_trend
title: "NM Arsenic Trends"
description: "Per-well arsenic concentration trend (slope and category), all NM sources"
schedule: "0 13 * * *"
spatial_filter:
state: NM
sources:
exclude: []

- id: nm_nitrate_trend
parameter: nitrate
output_type: ogc_analyte_trend
title: "NM Nitrate Trends"
description: "Per-well nitrate concentration trend (slope and category), all NM sources"
schedule: "0 14 * * *"
spatial_filter:
state: NM
sources:
exclude: []

# One feature per well flagging drinking-water MCL exceedances. Thresholds are
# read at run time from gs://<bucket>/config/mcl.json (source of truth);
# `analytes` lists the candidates gathered for comparison.
- id: nm_mcl_exceedance
output_type: ogc_mcl_exceedance
title: "NM MCL Exceedances"
description: "Wells flagged against drinking-water MCLs, all NM sources"
schedule: "0 15 * * *"
analytes: [arsenic, nitrate, fluoride, uranium, sulfate, chloride, tds]
spatial_filter:
state: NM
sources:
exclude: []

# One feature per well: how recently it was measured (water levels), to surface
# dead/lagging monitoring points. status = active/stale at stale_days.
- id: nm_monitoring_recency
parameter: waterlevels
output_type: ogc_monitoring_recency
title: "NM Monitoring Recency"
description: "Per-well water-level monitoring recency and status, all NM sources"
schedule: "0 16 * * *"
stale_days: 365
spatial_filter:
state: NM
sources:
exclude: []
3 changes: 3 additions & 0 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
"ogc_timeseries",
"ogc_major_chemistry",
"ogc_waterlevel_trend",
"ogc_analyte_trend",
"ogc_mcl_exceedance",
"ogc_monitoring_recency",
}


Expand Down
1 change: 1 addition & 0 deletions orchestration/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.14"
dependencies = [
"dagster>=1.8",
"dagster-cloud",
"dagster-dg-cli",
"dagster-gcp>=0.24",
"dagster-webserver>=1.8",
"google-cloud-storage",
Expand Down
6 changes: 5 additions & 1 deletion orchestration/resources/die_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config:
sources_spec = product.get("sources", {})

output_type = product.get("output_type", "ogc_summary")
is_summary = output_type in ("ogc_summary", "ogc_major_chemistry")
is_summary = output_type in (
"ogc_summary",
"ogc_major_chemistry",
"ogc_mcl_exceedance",
)

payload: dict = {
"yes": True,
Expand Down
Loading
Loading