Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions backend/connectors/usgs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,21 @@
get_terminal_record,
)

LIMIT = 50000
LIMIT = 50000
TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests
MAX_RETRIES = 7


def _usgs_headers(extra: dict | None = None) -> dict:
"""Request headers for the USGS water data API. Adds the X-API-Key header
when a USGS_API_KEY is set (env var, e.g. a Dagster+ secret). Without a key
the API is heavily rate-limited."""
headers = dict(extra or {})
key = os.environ.get("USGS_API_KEY")
if key:
headers["X-API-Key"] = key
return headers

class NWISSiteSource(BaseSiteSource):
chunk_size = 500

Expand All @@ -61,15 +72,11 @@ def tag(self):

def health(self):
try:
if os.environ.get("USGS_API_KEY"):
headers = {"X-API-Key": os.environ["USGS_API_KEY"]}
else:
headers = {}
response = self._http_client.get(
url=self.sites_url,
params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"},
timeout=30,
headers=headers
headers=_usgs_headers()
)
response.raise_for_status()
return True
Expand Down Expand Up @@ -105,15 +112,11 @@ def get_records(self):

while tries < MAX_RETRIES:
try:
if os.environ.get("USGS_API_KEY"):
headers = {"X-API-Key": os.environ["USGS_API_KEY"]}
else:
headers = {}
response = self._http_client.get(
url=self.sites_url,
params=params,
timeout=TIMEOUT,
headers=headers
headers=_usgs_headers()
)

if response.status_code == 200:
Expand Down Expand Up @@ -215,10 +218,7 @@ def get_records(self, site_record):

while tries < MAX_RETRIES:
try:
if os.environ.get("USGS_API_KEY"):
headers = {"X-API-Key": os.environ["USGS_API_KEY"], "Content-Type": "application/query-cql-json"}
else:
headers = {"Content-Type": "application/query-cql-json"}
headers = _usgs_headers({"Content-Type": "application/query-cql-json"})
response = httpx.post(
url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items",
json=json_data,
Expand Down
10 changes: 8 additions & 2 deletions orchestration/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,11 @@ uv run python -c "import orchestration.definitions; print('ok')"

Dagster+ serverless builds from the repo root per `dagster_cloud.yaml`
(`module_name: orchestration.definitions`). Runtime deps come from the root
`requirements.txt`. Secrets (GCS, GeoServer) are set as Dagster+ env vars, not
in code.
`requirements.txt`. Secrets are set as Dagster+ env vars, not in code:

- `GCP_SERVICE_ACCOUNT_KEY` — GCS upload/IO-manager auth (JSON key).
- `GEOSERVER_URL` / `GEOSERVER_USER` / `GEOSERVER_PASSWORD` / `GEOSERVER_WORKSPACE`.
- `USGS_API_KEY` — USGS/NWIS API key. Without it the USGS water data API is
heavily rate-limited. Resolved via `dg.EnvVar` into `DIEConfigResource`, which
exports it to the environment for the NWIS connector.
- `DIE_FORWARD_LOGS_TO_DAGSTER` (optional) — forward DIE logs to the compute log.
7 changes: 6 additions & 1 deletion orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ def _build_schedules(products_config: dict, jobs: dict) -> list:
jobs=list(_jobs.values()),
schedules=_schedules,
resources={
"die_config": DIEConfigResource(),
# USGS_API_KEY is a Dagster+ secret; EnvVar resolves it at run time and
# the resource exports it for the NWIS connector. Resolves to None when
# unset (the API still works, just rate-limited).
"die_config": DIEConfigResource(
usgs_api_key=dg.EnvVar("USGS_API_KEY"),
),
"gcs": GCSResource(
bucket_name=_products_config.get("gcs_bucket", "dataservices-die-products"),
),
Expand Down
12 changes: 12 additions & 0 deletions orchestration/resources/die_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Optional
import dagster as dg
from backend.config import Config
Expand All @@ -6,6 +7,11 @@
class DIEConfigResource(dg.ConfigurableResource):
"""Dagster resource that constructs a DIE Config from a product spec dict."""

# USGS/NWIS API key. Without it the USGS water data API is heavily
# rate-limited. Sourced from the USGS_API_KEY env var (a Dagster+ secret) in
# definitions.py; exported back to the environment in get_config so the
# backend NWIS connector — which reads os.environ["USGS_API_KEY"] at request
# time — picks it up.
usgs_api_key: Optional[str] = None

def get_config(self, product: dict, parameter: Optional[str] = None) -> Config:
Expand All @@ -26,6 +32,12 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config:
major-chemistry product, which has no single parameter and calls this
once per analyte.
"""
# Make the USGS key visible to the backend NWIS connector (reads it from
# the environment). Only set when provided so we never clobber an
# ambient value with an empty one.
if self.usgs_api_key:
os.environ["USGS_API_KEY"] = self.usgs_api_key

spatial = product.get("spatial_filter", {})
sources_spec = product.get("sources", {})

Expand Down
Loading