From 0301940425c5d72b621081c5ebe34b3bdc8bcae3 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 09:40:09 -0600 Subject: [PATCH] Wire USGS_API_KEY for NWIS via Dagster secret MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The USGS/NWIS water data API is heavily rate-limited without an API key. Make the key flow from a Dagster+ secret to the NWIS connector: - definitions.py: DIEConfigResource(usgs_api_key=EnvVar("USGS_API_KEY")) — resolves the secret at run time (None when unset; API still works, just throttled). - die_config.py: get_config exports the key to os.environ so the backend NWIS connector (reads os.environ["USGS_API_KEY"] at request time) picks it up. Only sets when provided, never clobbers an ambient value. - usgs/source.py: DRY the duplicated X-API-Key header logic into a _usgs_headers() helper used by all three request sites. - AGENTS.md: document the Dagster+ secrets, including USGS_API_KEY. Verified defs load with the key set and unset; CLI usgs-api-key tests pass. Co-Authored-By: Claude Opus 4.8 --- backend/connectors/usgs/source.py | 30 +++++++++++++-------------- orchestration/AGENTS.md | 10 +++++++-- orchestration/definitions.py | 7 ++++++- orchestration/resources/die_config.py | 12 +++++++++++ 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 424a7bb..d0e7874 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -40,10 +40,21 @@ get_terminal_record, ) -LIMIT = 50000 +LIMIT = 50000 TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests MAX_RETRIES = 7 + +def _usgs_headers(extra: dict | None = None) -> dict: + """Request headers for the USGS water data API. Adds the X-API-Key header + when a USGS_API_KEY is set (env var, e.g. a Dagster+ secret). Without a key + the API is heavily rate-limited.""" + headers = dict(extra or {}) + key = os.environ.get("USGS_API_KEY") + if key: + headers["X-API-Key"] = key + return headers + class NWISSiteSource(BaseSiteSource): chunk_size = 500 @@ -61,15 +72,11 @@ def tag(self): def health(self): try: - if os.environ.get("USGS_API_KEY"): - headers = {"X-API-Key": os.environ["USGS_API_KEY"]} - else: - headers = {} response = self._http_client.get( url=self.sites_url, params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=30, - headers=headers + headers=_usgs_headers() ) response.raise_for_status() return True @@ -105,15 +112,11 @@ def get_records(self): while tries < MAX_RETRIES: try: - if os.environ.get("USGS_API_KEY"): - headers = {"X-API-Key": os.environ["USGS_API_KEY"]} - else: - headers = {} response = self._http_client.get( url=self.sites_url, params=params, timeout=TIMEOUT, - headers=headers + headers=_usgs_headers() ) if response.status_code == 200: @@ -215,10 +218,7 @@ def get_records(self, site_record): while tries < MAX_RETRIES: try: - if os.environ.get("USGS_API_KEY"): - headers = {"X-API-Key": os.environ["USGS_API_KEY"], "Content-Type": "application/query-cql-json"} - else: - headers = {"Content-Type": "application/query-cql-json"} + headers = _usgs_headers({"Content-Type": "application/query-cql-json"}) response = httpx.post( url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", json=json_data, diff --git a/orchestration/AGENTS.md b/orchestration/AGENTS.md index e3a88a5..0e9c92f 100644 --- a/orchestration/AGENTS.md +++ b/orchestration/AGENTS.md @@ -65,5 +65,11 @@ uv run python -c "import orchestration.definitions; print('ok')" Dagster+ serverless builds from the repo root per `dagster_cloud.yaml` (`module_name: orchestration.definitions`). Runtime deps come from the root -`requirements.txt`. Secrets (GCS, GeoServer) are set as Dagster+ env vars, not -in code. +`requirements.txt`. Secrets are set as Dagster+ env vars, not in code: + +- `GCP_SERVICE_ACCOUNT_KEY` — GCS upload/IO-manager auth (JSON key). +- `GEOSERVER_URL` / `GEOSERVER_USER` / `GEOSERVER_PASSWORD` / `GEOSERVER_WORKSPACE`. +- `USGS_API_KEY` — USGS/NWIS API key. Without it the USGS water data API is + heavily rate-limited. Resolved via `dg.EnvVar` into `DIEConfigResource`, which + exports it to the environment for the NWIS connector. +- `DIE_FORWARD_LOGS_TO_DAGSTER` (optional) — forward DIE logs to the compute log. diff --git a/orchestration/definitions.py b/orchestration/definitions.py index cdc0f36..e9ae0eb 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -77,7 +77,12 @@ def _build_schedules(products_config: dict, jobs: dict) -> list: jobs=list(_jobs.values()), schedules=_schedules, resources={ - "die_config": DIEConfigResource(), + # USGS_API_KEY is a Dagster+ secret; EnvVar resolves it at run time and + # the resource exports it for the NWIS connector. Resolves to None when + # unset (the API still works, just rate-limited). + "die_config": DIEConfigResource( + usgs_api_key=dg.EnvVar("USGS_API_KEY"), + ), "gcs": GCSResource( bucket_name=_products_config.get("gcs_bucket", "dataservices-die-products"), ), diff --git a/orchestration/resources/die_config.py b/orchestration/resources/die_config.py index 75b914e..869c85e 100644 --- a/orchestration/resources/die_config.py +++ b/orchestration/resources/die_config.py @@ -1,3 +1,4 @@ +import os from typing import Optional import dagster as dg from backend.config import Config @@ -6,6 +7,11 @@ class DIEConfigResource(dg.ConfigurableResource): """Dagster resource that constructs a DIE Config from a product spec dict.""" + # USGS/NWIS API key. Without it the USGS water data API is heavily + # rate-limited. Sourced from the USGS_API_KEY env var (a Dagster+ secret) in + # definitions.py; exported back to the environment in get_config so the + # backend NWIS connector — which reads os.environ["USGS_API_KEY"] at request + # time — picks it up. usgs_api_key: Optional[str] = None def get_config(self, product: dict, parameter: Optional[str] = None) -> Config: @@ -26,6 +32,12 @@ def get_config(self, product: dict, parameter: Optional[str] = None) -> Config: major-chemistry product, which has no single parameter and calls this once per analyte. """ + # Make the USGS key visible to the backend NWIS connector (reads it from + # the environment). Only set when provided so we never clobber an + # ambient value with an empty one. + if self.usgs_api_key: + os.environ["USGS_API_KEY"] = self.usgs_api_key + spatial = product.get("spatial_filter", {}) sources_spec = product.get("sources", {})