diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index d3ff34e..d9f36ea 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -92,6 +92,79 @@ def _make_feature(record, collection_id: str) -> dict: } +def _num(value) -> Optional[float]: + """Coerce a value to float, or None when missing/unparseable.""" + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _pivot_by_well(records: list) -> dict: + """Pivot a flat list of per-(well, analyte) SummaryRecords into one dict per + well, keyed ``(source, id)``. Each well carries its geometry/depth plus an + ``analytes`` map ``{name: {"value", "units", "date"}}``. Same identity and + well_depth-carry rules as :func:`dump_major_chemistry_collection`.""" + wells: dict = {} + for r in records: + source = getattr(r, "source", "") or "" + rid = getattr(r, "id", "") or "" + key = (source, rid) + well = wells.get(key) + if well is None: + well = { + "source": source, + "id": rid, + "name": getattr(r, "name", None), + "latitude": getattr(r, "latitude", None), + "longitude": getattr(r, "longitude", None), + "elevation": getattr(r, "elevation", None), + "well_depth": getattr(r, "well_depth", None), + "well_depth_units": getattr(r, "well_depth_units", None), + "analytes": {}, + } + wells[key] = well + # well_depth can be absent on some analyte records; keep first non-null. + if well["well_depth"] is None and getattr(r, "well_depth", None) is not None: + well["well_depth"] = getattr(r, "well_depth", None) + well["well_depth_units"] = getattr(r, "well_depth_units", None) + + analyte = getattr(r, "parameter_name", None) + if analyte: + well["analytes"][analyte] = { + "value": getattr(r, "latest_value", None), + "units": getattr(r, "latest_units", None), + "date": getattr(r, "latest_date", None), + } + return wells + + +def _well_feature(source, rid, well: dict, props: dict) -> dict: + """Build a Feature for a pivoted well (geometry from the well dict).""" + return { + "type": "Feature", + "id": _feature_id(source, rid), + "geometry": _point_geometry( + well["latitude"], well["longitude"], well["elevation"] + ), + "properties": props, + } + + +def _site_feature(site: dict, props: dict) -> dict: + """Build a Feature for a site payload dict (geometry from the site dict).""" + return { + "type": "Feature", + "id": _feature_id(site.get("source") or "", site.get("id") or ""), + "geometry": _point_geometry( + site.get("latitude"), site.get("longitude"), site.get("elevation") + ), + "properties": props, + } + + def dump_summary_collection(path: str, records: list, meta: dict) -> dict: """ Write an OGC FeatureCollection of summary/site records to *path*. @@ -504,3 +577,368 @@ def dump_timeseries_collection( }) return _dump_collection(path, collection_id, features, meta) + + +# Water hardness as CaCO3 (mg/L) = 2.497*Ca + 4.118*Mg, with Ca, Mg in mg/L +# (standard USGS factors: each ion's CaCO3 equivalent weight / its own). +_HARDNESS_CA_FACTOR = 2.497 +_HARDNESS_MG_FACTOR = 4.118 + +HARDNESS_METHOD_DESCRIPTION = ( + "Total hardness as CaCO3 (mg/L) = 2.497*calcium + 4.118*magnesium, using each " + "well's latest calcium and magnesium (both in mg/L). Class (USGS): soft " + "(<60), moderate (60-120), hard (120-180), very hard (>=180); 'insufficient' " + "when calcium or magnesium is missing. Calcium and magnesium are each the " + "latest value reported and may carry different dates." +) + + +def _hardness_class(hardness: Optional[float]) -> str: + if hardness is None: + return "insufficient" + if hardness < 60: + return "soft" + if hardness < 120: + return "moderate" + if hardness < 180: + return "hard" + return "very hard" + + +def dump_hardness_collection(path: str, records: list, meta: dict) -> dict: + """ + Write an OGC FeatureCollection of per-well water hardness, one Feature per + well. *records* is a flat list of SummaryRecord (one per well+analyte), + pivoted by well; only calcium and magnesium are used. + + Per well: ``calcium``, ``calcium_date``, ``magnesium``, ``magnesium_date``, + ``hardness_caco3`` (mg/L as CaCO3, None when either ion is missing), + ``hardness_units``, and ``hardness_class``. The collection carries + ``hardness_method``. + """ + collection_id = meta.get("id", "collection") + wells = _pivot_by_well(records) + + features = [] + for (source, rid), well in wells.items(): + analytes = well["analytes"] + ca = _num(analytes.get("calcium", {}).get("value")) + mg = _num(analytes.get("magnesium", {}).get("value")) + if ca is None or mg is None: + hardness = None + else: + hardness = round( + _HARDNESS_CA_FACTOR * ca + _HARDNESS_MG_FACTOR * mg, 1 + ) + + props = { + "source": source, + "id": rid, + "name": well["name"], + "well_depth": well["well_depth"], + "well_depth_units": well["well_depth_units"], + "calcium": analytes.get("calcium", {}).get("value"), + "calcium_date": analytes.get("calcium", {}).get("date"), + "magnesium": analytes.get("magnesium", {}).get("value"), + "magnesium_date": analytes.get("magnesium", {}).get("date"), + "hardness_caco3": hardness, + "hardness_units": "mg/L as CaCO3", + "hardness_class": _hardness_class(hardness), + } + features.append(_well_feature(source, rid, well, props)) + + return _dump_collection( + path, collection_id, features, meta, + extra={"hardness_method": HARDNESS_METHOD_DESCRIPTION}, + ) + + +# Equivalent weights (mg per milliequivalent) for the major ions, used to convert +# mg/L concentrations to meq/L before computing milliequivalent percentages. +_EQUIVALENT_WEIGHTS = { + "calcium": 20.04, + "magnesium": 12.15, + "sodium": 22.99, + "potassium": 39.10, + "bicarbonate": 61.02, + "carbonate": 30.00, + "chloride": 35.45, + "sulfate": 48.03, +} + +WATER_TYPE_METHOD_DESCRIPTION = ( + "Hydrochemical (Piper) water type from the latest major-ion chemistry. Each " + "ion (mg/L) is converted to meq/L by its equivalent weight; cations are Ca, " + "Mg, and Na+K, anions are HCO3+CO3, Cl, and SO4. The dominant cation and " + "anion are those exceeding 50% of their group's meq total, else 'mixed'; " + "water_type is 'dominantCation-dominantAnion'. 'insufficient' when no cations " + "or no anions are reported. charge_balance_pct = 100*(cations-anions)/" + "(cations+anions); magnitudes above ~10% indicate suspect/incomplete analyses." +) + + +def _meq(analytes: dict, name: str) -> float: + """meq/L for one ion from the pivoted analyte map (0.0 when absent).""" + value = _num(analytes.get(name, {}).get("value")) + if value is None: + return 0.0 + return value / _EQUIVALENT_WEIGHTS[name] + + +def _dominant(percentages: dict) -> str: + """Label of the >50% member, else 'mixed'.""" + label, pct = max(percentages.items(), key=lambda kv: kv[1]) + return label if pct > 50 else "mixed" + + +def dump_water_type_collection(path: str, records: list, meta: dict) -> dict: + """ + Write an OGC FeatureCollection of per-well hydrochemical (Piper) water type, + one Feature per well. *records* is a flat list of SummaryRecord (one per + well+analyte), pivoted by well; the eight major ions are used. + + Per well: ``water_type``, ``dominant_cation``, ``dominant_anion``, the + milliequivalent percentages ``ca_pct``/``mg_pct``/``na_k_pct`` (cations) and + ``hco3_pct``/``cl_pct``/``so4_pct`` (anions), ``cation_meq_total``, + ``anion_meq_total``, and ``charge_balance_pct``. Wells with no cations or no + anions get ``water_type='insufficient'`` and null percentages. The collection + carries ``water_type_method``. + """ + collection_id = meta.get("id", "collection") + wells = _pivot_by_well(records) + + features = [] + for (source, rid), well in wells.items(): + a = well["analytes"] + ca = _meq(a, "calcium") + mg = _meq(a, "magnesium") + na_k = _meq(a, "sodium") + _meq(a, "potassium") + hco3_co3 = _meq(a, "bicarbonate") + _meq(a, "carbonate") + cl = _meq(a, "chloride") + so4 = _meq(a, "sulfate") + + cation_total = ca + mg + na_k + anion_total = hco3_co3 + cl + so4 + + props = { + "source": source, + "id": rid, + "name": well["name"], + "well_depth": well["well_depth"], + "well_depth_units": well["well_depth_units"], + "cation_meq_total": round(cation_total, 3), + "anion_meq_total": round(anion_total, 3), + } + if cation_total <= 0 or anion_total <= 0: + props.update({ + "water_type": "insufficient", + "dominant_cation": None, + "dominant_anion": None, + "ca_pct": None, "mg_pct": None, "na_k_pct": None, + "hco3_pct": None, "cl_pct": None, "so4_pct": None, + "charge_balance_pct": None, + }) + else: + ca_pct = 100 * ca / cation_total + mg_pct = 100 * mg / cation_total + na_k_pct = 100 * na_k / cation_total + hco3_pct = 100 * hco3_co3 / anion_total + cl_pct = 100 * cl / anion_total + so4_pct = 100 * so4 / anion_total + dom_cation = _dominant( + {"Ca": ca_pct, "Mg": mg_pct, "Na+K": na_k_pct} + ) + dom_anion = _dominant({"HCO3": hco3_pct, "Cl": cl_pct, "SO4": so4_pct}) + props.update({ + "water_type": f"{dom_cation}-{dom_anion}", + "dominant_cation": dom_cation, + "dominant_anion": dom_anion, + "ca_pct": round(ca_pct, 1), + "mg_pct": round(mg_pct, 1), + "na_k_pct": round(na_k_pct, 1), + "hco3_pct": round(hco3_pct, 1), + "cl_pct": round(cl_pct, 1), + "so4_pct": round(so4_pct, 1), + "charge_balance_pct": round( + 100 * (cation_total - anion_total) + / (cation_total + anion_total), + 1, + ), + }) + features.append(_well_feature(source, rid, well, props)) + + return _dump_collection( + path, collection_id, features, meta, + extra={"water_type_method": WATER_TYPE_METHOD_DESCRIPTION}, + ) + + +DATA_DENSITY_METHOD_DESCRIPTION = ( + "Per-well measurement coverage. observation_count is the raw count of valid " + "numeric readings; record_count is the number of distinct calendar days with " + "a reading; span_years is the time from first to last day. mean_interval_days " + "is the average gap between distinct days; observations_per_year is " + "observation_count / span_years." +) + + +def dump_data_density_collection( + path: str, + site_records: list, + timeseries_records: list, + meta: dict, + *, + parameter_name: Optional[str] = None, +) -> dict: + """ + Write an OGC FeatureCollection of per-well data density / coverage, one + Feature per well. *site_records* and *timeseries_records* are index-aligned + payload dicts (as produced by the source assets). + + Per well: ``observation_count`` (raw valid readings), ``record_count`` + (distinct days), ``first_observation_datetime``, ``last_observation_datetime``, + ``span_years``, ``mean_interval_days``, and ``observations_per_year``. The + collection carries ``data_density_method``. + """ + collection_id = meta.get("id", "collection") + + features = [] + for site, obs_list in zip(site_records, timeseries_records): + observation_count, pairs = _daily_series(obs_list, "min") + record_count = len(pairs) + if record_count: + first_e, last_e = pairs[0][0], pairs[-1][0] + span_years = (last_e - first_e) / _SECONDS_PER_YEAR + else: + first_e = last_e = None + span_years = 0.0 + + mean_interval_days = ( + round((span_years * 365.25) / (record_count - 1), 1) + if record_count > 1 else None + ) + observations_per_year = ( + round(observation_count / span_years, 2) if span_years > 0 else None + ) + + props = { + "source": site.get("source") or "", + "id": site.get("id") or "", + "name": site.get("name"), + "parameter_name": parameter_name, + "well_depth": site.get("well_depth"), + "well_depth_units": site.get("well_depth_units"), + "observation_count": observation_count, + "record_count": record_count, + "first_observation_datetime": _iso_utc(first_e), + "last_observation_datetime": _iso_utc(last_e), + "span_years": round(span_years, 3), + "mean_interval_days": mean_interval_days, + "observations_per_year": observations_per_year, + } + features.append(_site_feature(site, props)) + + return _dump_collection( + path, collection_id, features, meta, + extra={"data_density_method": DATA_DENSITY_METHOD_DESCRIPTION}, + ) + + +WATERLEVEL_CHANGE_METHOD_TEMPLATE = ( + "Depth-to-water change over the most recent {window} years of each well's " + "record. Observations are downsampled to one point per calendar day (daily " + "MINIMUM depth-to-water, the shallowest reading). The end point is the latest " + "daily value; the start point is the daily value closest to {window} years " + "before it, accepted only when within half that window of the target " + "(else status='insufficient'). change_ft = end - start. A POSITIVE change " + "means depth-to-water INCREASED (water table DECLINED); negative means the " + "water table ROSE." +) + + +def dump_waterlevel_change_collection( + path: str, + site_records: list, + timeseries_records: list, + meta: dict, + *, + window_years: float, + reducer: str = "min", +) -> dict: + """ + Write an OGC FeatureCollection of per-well depth-to-water change over a + trailing window, one Feature per well. *site_records* and + *timeseries_records* are index-aligned payload dicts. + + Per well: ``window_years`` (requested), ``actual_window_years``, + ``dtw_start``/``dtw_end`` (ft), ``start_date``/``end_date``, ``change_ft`` + (end - start; positive = water table declined), ``direction`` + (declining/rising/stable), ``n_observations_in_window`` (distinct days from + start to end), ``observation_count`` (raw), and ``status`` (ok/insufficient). + The collection carries ``change_method``. + """ + collection_id = meta.get("id", "collection") + target_span = window_years * _SECONDS_PER_YEAR + tolerance = 0.5 * target_span + + features = [] + for site, obs_list in zip(site_records, timeseries_records): + observation_count, pairs = _daily_series(obs_list, reducer) + + status = "insufficient" + start_e = end_e = None + dtw_start = dtw_end = change_ft = None + actual_window_years = direction = None + n_in_window = 0 + + if len(pairs) >= 2: + end_e, dtw_end = pairs[-1] + dtw_end = round(dtw_end, 3) + target = end_e - target_span + # Closest daily point to the window-start target, excluding the end. + cand_epoch, cand_val = min( + pairs[:-1], key=lambda p: abs(p[0] - target) + ) + if abs(cand_epoch - target) <= tolerance: + start_e, dtw_start = cand_epoch, round(cand_val, 3) + change_ft = round(dtw_end - dtw_start, 3) + actual_window_years = round( + (end_e - start_e) / _SECONDS_PER_YEAR, 3 + ) + n_in_window = sum(1 for p in pairs if start_e <= p[0] <= end_e) + direction = ( + "declining" if change_ft > 0 + else "rising" if change_ft < 0 else "stable" + ) + status = "ok" + + props = { + "source": site.get("source") or "", + "id": site.get("id") or "", + "name": site.get("name"), + "parameter_name": "waterlevels", + "well_depth": site.get("well_depth"), + "well_depth_units": site.get("well_depth_units"), + "window_years": window_years, + "actual_window_years": actual_window_years, + "dtw_start": dtw_start, + "dtw_end": dtw_end, + "start_date": _iso_utc(start_e), + "end_date": _iso_utc(end_e), + "change_ft": change_ft, + "change_units": "ft", + "direction": direction, + "n_observations_in_window": n_in_window, + "observation_count": observation_count, + "status": status, + } + features.append(_site_feature(site, props)) + + return _dump_collection( + path, collection_id, features, meta, + extra={ + "change_method": WATERLEVEL_CHANGE_METHOD_TEMPLATE.format( + window=window_years + ) + }, + ) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 2c5fc15..ce2fc70 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -75,12 +75,16 @@ from backend.config import PARAMETER_SOURCE_MAP, WATERLEVELS from backend.persisters.ogc_features import ( ANALYTE_TREND_METHOD_DESCRIPTION, + dump_data_density_collection, + dump_hardness_collection, dump_major_chemistry_collection, dump_mcl_exceedance_collection, dump_monitoring_recency_collection, dump_summary_collection, dump_timeseries_collection, dump_trend_collection, + dump_water_type_collection, + dump_waterlevel_change_collection, ) from backend.record import ParameterRecord, SiteRecord, SummaryRecord from backend.unifier import unify_source_both @@ -125,9 +129,10 @@ def _product_params(product: dict) -> list[str]: output_type = product.get("output_type") if output_type == "ogc_major_chemistry": return list(_MAJOR_CHEMISTRY) - if output_type == "ogc_mcl_exceedance": - # Candidate analytes for the static asset graph; the MCL JSON is the - # source of truth for which actually have thresholds. + if output_type in ("ogc_mcl_exceedance", "ogc_hardness", "ogc_water_type"): + # Multi-analyte products with an explicit `analytes` list. (For MCL the + # candidate set for the static graph; the MCL JSON is the source of truth + # for which actually have thresholds.) return list(product["analytes"]) return [product["parameter"]] @@ -329,9 +334,29 @@ def _combine_asset( thresholds = gcs.read_json(_MCL_KEY) records = [SummaryRecord(p) for p in all_records] dump_mcl_exceedance_collection(str(out), records, meta, thresholds) + elif output_type == "ogc_hardness": + # Pivots per-analyte summaries to one feature per well, using + # calcium + magnesium to compute total hardness as CaCO3. + records = [SummaryRecord(p) for p in all_records] + dump_hardness_collection(str(out), records, meta) + elif output_type == "ogc_water_type": + # Pivots the major-ion summaries to one feature per well and + # classifies the hydrochemical (Piper) water type. + records = [SummaryRecord(p) for p in all_records] + dump_water_type_collection(str(out), records, meta) elif output_type == "ogc_summary": records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) + elif output_type == "ogc_data_density": + dump_data_density_collection( + str(out), all_sites, all_timeseries, meta, + parameter_name=product.get("parameter"), + ) + elif output_type == "ogc_waterlevel_change": + dump_waterlevel_change_collection( + str(out), all_sites, all_timeseries, meta, + window_years=float(product.get("window_years", 5)), + ) elif output_type == "ogc_waterlevel_trend": # all_sites/all_timeseries are index-aligned payload dicts (see # source asset); consumed as dicts to keep memory bounded. diff --git a/orchestration/config/products.yaml b/orchestration/config/products.yaml index c6f7cf0..3ea5c76 100644 --- a/orchestration/config/products.yaml +++ b/orchestration/config/products.yaml @@ -124,3 +124,60 @@ products: state: NM sources: exclude: [] + + # One feature per well: total hardness as CaCO3 from latest calcium + + # magnesium, plus a soft/moderate/hard/very-hard class. Multi-analyte (no + # `parameter`); shares the analyte cohort's calcium/magnesium source fetches. + - id: nm_hardness + output_type: ogc_hardness + title: "NM Water Hardness" + description: "Per-well total hardness as CaCO3 (calcium + magnesium), all NM sources" + schedule: "0 11 * * *" + analytes: [calcium, magnesium] + spatial_filter: + state: NM + sources: + exclude: [] + + # One feature per well: hydrochemical (Piper) water type from the latest + # major-ion chemistry (meq% of cations Ca/Mg/Na+K and anions HCO3+CO3/Cl/SO4). + # Multi-analyte; shares the analyte cohort's major-ion source fetches. + - id: nm_water_type + output_type: ogc_water_type + title: "NM Water Type" + description: "Per-well hydrochemical (Piper) water type from major-ion chemistry, all NM sources" + schedule: "0 11 * * *" + analytes: [calcium, magnesium, sodium, potassium, bicarbonate, carbonate, chloride, sulfate] + spatial_filter: + state: NM + sources: + exclude: [] + + # One feature per well: measurement coverage (observation/day counts, span, + # mean interval, observations per year) for water levels. Timeseries-mode; + # shares the water-level cohort's source fetches. + - id: nm_waterlevel_data_density + parameter: waterlevels + output_type: ogc_data_density + title: "NM Water Level Data Density" + description: "Per-well water-level measurement coverage and frequency, all NM sources" + schedule: "0 7 * * *" + spatial_filter: + state: NM + sources: + exclude: [] + + # One feature per well: depth-to-water change over the most recent + # `window_years` years (end minus start; positive = water table declined). + # Timeseries-mode; shares the water-level cohort's source fetches. + - id: nm_waterlevel_change + parameter: waterlevels + output_type: ogc_waterlevel_change + title: "NM Water Level Change" + description: "Per-well depth-to-water change over a trailing window, all NM sources" + schedule: "0 7 * * *" + window_years: 5 + spatial_filter: + state: NM + sources: + exclude: [] diff --git a/orchestration/definitions.py b/orchestration/definitions.py index df02dfe..9bca2db 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -65,6 +65,10 @@ def load_input(self, context: dg.InputContext): "ogc_analyte_trend", "ogc_mcl_exceedance", "ogc_monitoring_recency", + "ogc_hardness", + "ogc_water_type", + "ogc_data_density", + "ogc_waterlevel_change", } diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index d5547cd..7cce477 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -419,3 +419,195 @@ def test_daily_mean_and_units(self, tmp_path): assert props["parameter_name"] == "arsenic" assert props["slope_units"] == "mg/L/year" assert props["trend_category"] == "increasing" + + +from backend.persisters.ogc_features import ( + dump_hardness_collection, + dump_water_type_collection, + dump_data_density_collection, + dump_waterlevel_change_collection, +) + + +class TestHardnessCollection: + def test_computes_hardness_and_class(self, tmp_path): + recs = [ + _make_chem_record("WQP", "W1", "calcium", 80.0, well_depth=120.0), + _make_chem_record("WQP", "W1", "magnesium", 30.0), + ] + out = tmp_path / "h.geojson" + result = dump_hardness_collection(str(out), recs, {"id": "nm_hardness"}) + props = result["features"][0]["properties"] + # 2.497*80 + 4.118*30 = 199.76 + 123.54 = 323.3 + assert props["hardness_caco3"] == 323.3 + assert props["hardness_class"] == "very hard" + assert props["calcium"] == 80.0 + assert props["magnesium"] == 30.0 + assert props["well_depth"] == 120.0 + assert "hardness_method" in result + + def test_class_boundaries(self, tmp_path): + # soft: Ca=10, Mg=0 -> 24.97; moderate: Ca=30 -> 74.91; + # hard: Ca=60 -> 149.82 + cases = [ + ("S", 10.0, "soft"), + ("M", 30.0, "moderate"), + ("H", 60.0, "hard"), + ] + recs = [] + for rid, ca, _ in cases: + recs.append(_make_chem_record("WQP", rid, "calcium", ca)) + recs.append(_make_chem_record("WQP", rid, "magnesium", 0.0)) + out = tmp_path / "h.geojson" + result = dump_hardness_collection(str(out), recs, {"id": "nm_hardness"}) + by_id = {f["id"]: f["properties"] for f in result["features"]} + for rid, _, expected in cases: + assert by_id[f"WQP:{rid}"]["hardness_class"] == expected + + def test_missing_ion_is_insufficient(self, tmp_path): + recs = [_make_chem_record("WQP", "W1", "calcium", 80.0)] # no magnesium + out = tmp_path / "h.geojson" + result = dump_hardness_collection(str(out), recs, {"id": "nm_hardness"}) + props = result["features"][0]["properties"] + assert props["hardness_caco3"] is None + assert props["hardness_class"] == "insufficient" + + +class TestWaterTypeCollection: + def test_classifies_ca_hco3(self, tmp_path): + # Ca-dominant cation, HCO3-dominant anion. + recs = [ + _make_chem_record("WQP", "W1", "calcium", 100.0), # 4.99 meq + _make_chem_record("WQP", "W1", "magnesium", 1.0), # 0.08 meq + _make_chem_record("WQP", "W1", "sodium", 1.0), # 0.04 meq + _make_chem_record("WQP", "W1", "bicarbonate", 300.0), # 4.92 meq + _make_chem_record("WQP", "W1", "chloride", 1.0), # 0.03 meq + _make_chem_record("WQP", "W1", "sulfate", 1.0), # 0.02 meq + ] + out = tmp_path / "wt.geojson" + result = dump_water_type_collection(str(out), recs, {"id": "nm_water_type"}) + props = result["features"][0]["properties"] + assert props["dominant_cation"] == "Ca" + assert props["dominant_anion"] == "HCO3" + assert props["water_type"] == "Ca-HCO3" + assert props["ca_pct"] > 50 + assert "water_type_method" in result + + def test_mixed_when_no_majority(self, tmp_path): + # Cations split roughly evenly across Ca / Mg / Na+K -> mixed cation. + recs = [ + _make_chem_record("WQP", "W1", "calcium", 20.04), # 1.0 meq + _make_chem_record("WQP", "W1", "magnesium", 12.15), # 1.0 meq + _make_chem_record("WQP", "W1", "sodium", 22.99), # 1.0 meq + _make_chem_record("WQP", "W1", "bicarbonate", 305.1), # ~5 meq -> HCO3 + ] + out = tmp_path / "wt.geojson" + result = dump_water_type_collection(str(out), recs, {"id": "nm_water_type"}) + props = result["features"][0]["properties"] + assert props["dominant_cation"] == "mixed" + assert props["dominant_anion"] == "HCO3" + assert props["water_type"] == "mixed-HCO3" + + def test_insufficient_when_no_anions(self, tmp_path): + recs = [_make_chem_record("WQP", "W1", "calcium", 100.0)] + out = tmp_path / "wt.geojson" + result = dump_water_type_collection(str(out), recs, {"id": "nm_water_type"}) + props = result["features"][0]["properties"] + assert props["water_type"] == "insufficient" + assert props["ca_pct"] is None + + def test_charge_balance_reported(self, tmp_path): + recs = [ + _make_chem_record("WQP", "W1", "calcium", 20.04), # 1.0 meq cation + _make_chem_record("WQP", "W1", "chloride", 35.45), # 1.0 meq anion + ] + out = tmp_path / "wt.geojson" + result = dump_water_type_collection(str(out), recs, {"id": "nm_water_type"}) + props = result["features"][0]["properties"] + assert props["charge_balance_pct"] == 0.0 + + +class TestDataDensityCollection: + def test_counts_and_span(self, tmp_path): + site = _trend_site(rid="W1") + # 3 distinct days, one day with two readings -> 4 raw obs, 3 days. + obs = [ + _trend_obs("2010-01-01", 50.0), + _trend_obs("2010-01-01", 51.0), + _trend_obs("2012-01-01", 52.0), + _trend_obs("2014-01-01", 53.0), + ] + out = tmp_path / "dd.geojson" + result = dump_data_density_collection( + str(out), [site], [obs], {"id": "nm_dd"}, parameter_name="waterlevels" + ) + props = result["features"][0]["properties"] + assert props["observation_count"] == 4 + assert props["record_count"] == 3 + assert props["parameter_name"] == "waterlevels" + assert round(props["span_years"]) == 4 + assert props["mean_interval_days"] is not None + assert "data_density_method" in result + + def test_empty_well(self, tmp_path): + site = _trend_site(rid="W1") + out = tmp_path / "dd.geojson" + result = dump_data_density_collection(str(out), [site], [[]], {"id": "nm_dd"}) + props = result["features"][0]["properties"] + assert props["observation_count"] == 0 + assert props["record_count"] == 0 + assert props["mean_interval_days"] is None + assert props["observations_per_year"] is None + + +class TestWaterLevelChangeCollection: + def test_change_over_window(self, tmp_path): + site = _trend_site(rid="W1") + # Yearly readings 2010-2020; window 5yr -> start near 2015, end 2020. + obs = [_trend_obs(f"{2010 + i}-01-01", 50.0 + i) for i in range(11)] + out = tmp_path / "ch.geojson" + result = dump_waterlevel_change_collection( + str(out), [site], [obs], {"id": "nm_change"}, window_years=5 + ) + props = result["features"][0]["properties"] + assert props["status"] == "ok" + assert props["dtw_end"] == 60.0 # 2020 value + assert props["dtw_start"] == 55.0 # 2015 value + assert props["change_ft"] == 5.0 # deeper -> declining + assert props["direction"] == "declining" + assert round(props["actual_window_years"]) == 5 + assert "change_method" in result + + def test_rising_water_table(self, tmp_path): + site = _trend_site(rid="W1") + obs = [_trend_obs(f"{2010 + i}-01-01", 60.0 - i) for i in range(11)] + out = tmp_path / "ch.geojson" + result = dump_waterlevel_change_collection( + str(out), [site], [obs], {"id": "nm_change"}, window_years=5 + ) + props = result["features"][0]["properties"] + assert props["change_ft"] == -5.0 + assert props["direction"] == "rising" + + def test_insufficient_when_no_start_in_window(self, tmp_path): + site = _trend_site(rid="W1") + # Only two readings 10 years apart; for a 5yr window the nearest start + # candidate (2010) is >half-window from the 2015 target -> insufficient. + obs = [_trend_obs("2010-01-01", 50.0), _trend_obs("2020-01-01", 60.0)] + out = tmp_path / "ch.geojson" + result = dump_waterlevel_change_collection( + str(out), [site], [obs], {"id": "nm_change"}, window_years=5 + ) + props = result["features"][0]["properties"] + assert props["status"] == "insufficient" + assert props["change_ft"] is None + + def test_single_reading_insufficient(self, tmp_path): + site = _trend_site(rid="W1") + out = tmp_path / "ch.geojson" + result = dump_waterlevel_change_collection( + str(out), [site], [[_trend_obs("2020-01-01", 50.0)]], + {"id": "nm_change"}, window_years=5, + ) + props = result["features"][0]["properties"] + assert props["status"] == "insufficient"