Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions alembic/versions/263109252fb1_add_legacy_equipment_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
FIELDS = (
"WI_Duration",
"WI_EndFrequency",
"WI_Magnitude",
"WI_MicGain",
"WI_MinSoundDepth",
"WI_StartFrequency",
("WI_Duration", sa.Integer()),
("WI_EndFrequency", sa.Integer()),
("WI_Magnitude", sa.Integer()),
("WI_MicGain", sa.Boolean()),
("WI_MinSoundDepth", sa.Integer()),
("WI_StartFrequency", sa.Integer()),
)


def upgrade() -> None:
"""Upgrade schema."""

for field in FIELDS:
for field, column_type in FIELDS:
op.add_column(
"deployment",
sa.Column(
f"nma_{field}",
sa.Integer(),
column_type,
nullable=True,
),
)


def downgrade() -> None:
"""Downgrade schema."""
for field in FIELDS:
for field, _ in FIELDS:
op.drop_column("deployment", f"nma_{field}")
46 changes: 46 additions & 0 deletions alembic/versions/e123456789ab_add_observation_data_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""add nma_data_quality to observation

Revision ID: e123456789ab
Revises: b12e3919077e
Create Date: 2026-02-05 12:00:00.000000

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "e123456789ab"
down_revision: Union[str, Sequence[str], None] = "b12e3919077e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
op.add_column(
"observation",
sa.Column(
"nma_data_quality",
sa.String(length=100),
sa.ForeignKey("lexicon_term.term", onupdate="CASCADE"),
nullable=True,
),
)
op.add_column(
"observation_version",
sa.Column(
"nma_data_quality",
sa.String(length=100),
sa.ForeignKey("lexicon_term.term", onupdate="CASCADE"),
nullable=True,
),
)


def downgrade() -> None:
"""Downgrade schema."""
op.drop_column("observation_version", "nma_data_quality")
op.drop_column("observation", "nma_data_quality")
10 changes: 5 additions & 5 deletions core/lexicon.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,21 +656,21 @@
"data_quality"
],
"term": "Water level accurate to within two hundreths of a foot",
"definition": "Good"
"definition": "Water level accurate to within two hundreths of a foot"
},
{
"categories": [
"data_quality"
],
"term": "Water level accurate to within one foot",
"definition": "Fair"
"definition": "Water level accurate to within one foot"
},
{
"categories": [
"data_quality"
],
"term": "Water level accuracy not to nearest foot or water level not repeatable",
"definition": "Poor"
"definition": "Water level accuracy not to nearest foot or water level not repeatable"
},
{
"categories": [
Expand Down Expand Up @@ -712,7 +712,7 @@
"data_quality"
],
"term": "None",
"definition": "NA"
"definition": "None"
},
{
"categories": [
Expand Down Expand Up @@ -8149,4 +8149,4 @@
"definition": "Data were not field checked but are considered reliable"
}
]
}
}
4 changes: 2 additions & 2 deletions db/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import TYPE_CHECKING

from sqlalchemy import Integer, ForeignKey, Date, Numeric, Text
from sqlalchemy import Integer, ForeignKey, Date, Numeric, Text, Boolean
from sqlalchemy.orm import relationship, Mapped, mapped_column

from db.base import Base, AutoBaseMixin, ReleaseMixin, lexicon_term
Expand Down Expand Up @@ -49,7 +49,7 @@ class Deployment(Base, AutoBaseMixin, ReleaseMixin):
nma_WI_Duration: Mapped[int] = mapped_column(Integer, nullable=True)
nma_WI_EndFrequency: Mapped[int] = mapped_column(Integer, nullable=True)
nma_WI_Magnitude: Mapped[int] = mapped_column(Integer, nullable=True)
nma_WI_MicGain: Mapped[int] = mapped_column(Integer, nullable=True)
nma_WI_MicGain: Mapped[bool] = mapped_column(Boolean, nullable=True)
Comment thread
jirhiker marked this conversation as resolved.
nma_WI_MinSoundDepth: Mapped[int] = mapped_column(Integer, nullable=True)
nma_WI_StartFrequency: Mapped[int] = mapped_column(Integer, nullable=True)

Expand Down
8 changes: 6 additions & 2 deletions db/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
# ===============================================================================
from datetime import datetime
from typing import TYPE_CHECKING

from sqlalchemy import (
ForeignKey,
DateTime,
Expand All @@ -23,8 +25,6 @@

from db.base import Base, AutoBaseMixin, ReleaseMixin, lexicon_term

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from db.sample import Sample
from db.sensor import Sensor
Expand Down Expand Up @@ -64,6 +64,10 @@ class Observation(Base, AutoBaseMixin, ReleaseMixin):
)
unit: Mapped[str] = lexicon_term(nullable=False)
notes: Mapped[str] = mapped_column(nullable=True)
nma_data_quality: Mapped[str] = lexicon_term(
nullable=True,
comment="Legacy WaterLevels DataQuality mapped to lexicon term",
)

# groundwater
measuring_point_height: Mapped[float] = mapped_column(
Expand Down
3 changes: 2 additions & 1 deletion schemas/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
)
from typing_extensions import Self

from core.enums import Unit
from schemas import (
BaseCreateModel,
BaseUpdateModel,
BaseResponseModel,
UTCAwareDatetime,
)
from schemas.parameter import ParameterResponse
from core.enums import Unit

# class GeothermalMixin:
# depth: float
Expand Down Expand Up @@ -111,6 +111,7 @@ class BaseObservationResponse(BaseResponseModel):
parameter: ParameterResponse
value: float | None
unit: Unit
nma_data_quality: str | None = None


class GroundwaterLevelObservationResponse(BaseObservationResponse):
Expand Down
24 changes: 24 additions & 0 deletions tests/test_sensor_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import numpy as np
import pandas as pd

from transfers.sensor_transfer import _coerce_wi_mic_gain


def test_coerce_wi_mic_gain_numeric():
assert _coerce_wi_mic_gain(1) is True
assert _coerce_wi_mic_gain(0) is False
assert _coerce_wi_mic_gain(1.0) is True


def test_coerce_wi_mic_gain_strings():
assert _coerce_wi_mic_gain("1") is True
assert _coerce_wi_mic_gain("0") is False
assert _coerce_wi_mic_gain(" true ") is True
assert _coerce_wi_mic_gain("False") is False


def test_coerce_wi_mic_gain_handles_none_like():
assert _coerce_wi_mic_gain(None) is None
assert _coerce_wi_mic_gain(" ") is None
assert _coerce_wi_mic_gain(pd.NA) is None
assert _coerce_wi_mic_gain(np.nan) is None
55 changes: 55 additions & 0 deletions tests/test_transfer_legacy_dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import pandas as pd
import pytest

from db import Sample
from transfers.util import make_location
from transfers.waterlevels_transfer import WaterLevelTransferer

# ============================================================================
# FIXTURES
Expand Down Expand Up @@ -173,6 +175,59 @@ def test_make_location_maps_data_reliability_code(mock_lexicon_mapper):
assert location.nma_data_reliability == mock_lexicon_mapper.map_value.return_value


def test_make_observation_maps_data_quality():
transfer = WaterLevelTransferer.__new__(WaterLevelTransferer)
transfer.groundwater_parameter_id = 1

row = pd.Series(
{
"MPHeight": 1.0,
"DepthToWater": 10.0,
"DepthToWaterBGS": 9.0,
"GlobalID": "TEST-GLOBAL",
"DataQuality": "U2",
}
)

sample = Sample(
field_activity_id=1,
sample_date=datetime.datetime.now(datetime.timezone.utc),
sample_name="test-sample",
sample_matrix="water",
sample_method="grab sample",
qc_type="Normal",
)

with patch("transfers.waterlevels_transfer.lexicon_mapper") as mapper:
mapper.map_value.return_value = "Mapped Quality"
observation = transfer._make_observation(
row, sample, datetime.datetime.now(datetime.timezone.utc), "Reason"
)
mapper.map_value.assert_any_call("LU_DataQuality:U2")
assert observation.nma_data_quality == "Mapped Quality"


def test_get_dt_utc_respects_time_datum():
transfer = WaterLevelTransferer.__new__(WaterLevelTransferer)
transfer.errors = []
transfer.source_table = "WaterLevels"
base = {
"PointID": "TEST",
"OBJECTID": 1,
"DateMeasured": "2025-01-01",
"TimeMeasured": "10:00:00.000000",
}

row_mst = pd.Series({**base, "TimeDatum": "MST"})
dt_mst = transfer._get_dt_utc(row_mst)
assert dt_mst.tzinfo == datetime.timezone.utc
assert dt_mst.hour == 17

row_mdt = pd.Series({**base, "TimeDatum": "MDT"})
dt_mdt = transfer._get_dt_utc(row_mdt)
assert dt_mdt.hour == 16


def test_make_location_with_very_old_site_date(mock_lexicon_mapper):
"""Test that very old SiteDates (1950s) are preserved correctly"""
row = pd.Series(
Expand Down
22 changes: 21 additions & 1 deletion transfers/sensor_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@
}


def _coerce_wi_mic_gain(value):
if value is None or (isinstance(value, str) and not value.strip()):
return None
if isinstance(value, str):
value = value.strip()
if pd.isna(value):
return None
if isinstance(value, bool):
return value
try:
return bool(int(float(value)))
except (ValueError, TypeError):
lowered = str(value).strip().lower()
if lowered in {"true", "t", "yes", "y"}:
return True
if lowered in {"false", "f", "no", "n"}:
return False
return None


class SensorTransferer(ThingBasedTransferer):
source_table = "Equipment"

Expand Down Expand Up @@ -221,7 +241,7 @@ def _group_step(self, session: Session, row: pd.Series, db_item: Base):
nma_WI_Duration=row.WI_Duration,
nma_WI_EndFrequency=row.WI_EndFrequency,
nma_WI_Magnitude=row.WI_Magnitude,
nma_WI_MicGain=row.WI_MicGain,
nma_WI_MicGain=_coerce_wi_mic_gain(row.WI_MicGain),
nma_WI_MinSoundDepth=row.WI_MinSoundDepth,
nma_WI_StartFrequency=row.WI_StartFrequency,
)
Expand Down
28 changes: 25 additions & 3 deletions transfers/waterlevels_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ===============================================================================
import json
import uuid
from datetime import datetime
from datetime import datetime, timezone, timedelta

import pandas as pd
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -197,6 +197,17 @@ def _make_observation(
else:
value = row.DepthToWater

data_quality = None
dq_raw = getattr(row, "DataQuality", None)
if dq_raw and pd.notna(dq_raw):
dq_code = str(dq_raw).strip()
try:
data_quality = lexicon_mapper.map_value(f"LU_DataQuality:{dq_code}")
except KeyError:
logger.warning(
f"{SPACE_6}Unknown DataQuality code '{dq_code}' for WaterLevels record {row.GlobalID}"
)

# TODO: after sensors have been added to the database update sensor_id (or sensor) for waterlevels that come from db sensors (like e probes?)

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'db' to 'DB' for consistency with standard database abbreviation conventions.

Suggested change
# TODO: after sensors have been added to the database update sensor_id (or sensor) for waterlevels that come from db sensors (like e probes?)
# TODO: after sensors have been added to the database update sensor_id (or sensor) for waterlevels that come from DB sensors (like e probes?)

Copilot uses AI. Check for mistakes.
observation = Observation(
nma_pk_waterlevels=row.GlobalID,
Expand All @@ -209,6 +220,7 @@ def _make_observation(
unit="ft",
measuring_point_height=measuring_point_height,
groundwater_level_reason=glv,
nma_data_quality=data_quality,
)
return observation

Expand Down Expand Up @@ -317,13 +329,13 @@ def _get_dt_utc(self, row) -> datetime | None:
t = row.TimeMeasured
# Truncate microseconds to 6 digits if present
if "." in t:
t = t[:-6]
dot_index = t.find(".")
t = t[: dot_index + 7]

dt_measured = f"{row.DateMeasured} {t}"

try:
dt = datetime.strptime(dt_measured, fmt)
return convert_mt_to_utc(dt)
except ValueError as e:
self._capture_error(row.PointID, str(e), "DateMeasured")
logger.critical(
Expand All @@ -332,5 +344,15 @@ def _get_dt_utc(self, row) -> datetime | None:
)
return None

time_datum = getattr(row, "TimeDatum", None)
if time_datum and pd.notna(time_datum):
datum = str(time_datum).strip().upper()
if datum in {"MST", "MDT"}:
offset_hours = -7 if datum == "MST" else -6
tz = timezone(timedelta(hours=offset_hours))
return dt.replace(tzinfo=tz).astimezone(timezone.utc)

return convert_mt_to_utc(dt)


# ============= EOF =============================================
Loading