From 83994122ea8a6d7deb5d3bb585c2532281f9212f Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 25 Jun 2026 12:09:58 -0600 Subject: [PATCH 1/3] Publish gpkg featuretype explicitly (layer was not created) For GeoPackage, PUT file.gpkg?configure=all created the datastore but did not publish a layer, leaving a store with no associated layer. Switch to configure=none and POST the featuretype explicitly (name/nativeName = gpkg table name, srs EPSG:4326), folding title/abstract into that call. Co-Authored-By: Claude Opus 4.8 --- orchestration/resources/geoserver.py | 56 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/orchestration/resources/geoserver.py b/orchestration/resources/geoserver.py index 5b406be..8411ee6 100644 --- a/orchestration/resources/geoserver.py +++ b/orchestration/resources/geoserver.py @@ -100,12 +100,14 @@ def publish_geopackage( self._raise(r) actions["datastore_reset"] = "deleted" if r.status_code == 200 else "absent" - # Upload the GeoPackage. PUT .../file.gpkg creates the datastore and - # publishes the contained layer. A .gpkg is a SQLite database, hence - # the application/x-sqlite3 content type GeoServer expects. + # Upload the GeoPackage to create the datastore only. Unlike the + # shapefile flow, configure=all does NOT reliably publish a gpkg's + # layer, so use configure=none and publish the featuretype explicitly + # below. A .gpkg is a SQLite database, hence the application/x-sqlite3 + # content type GeoServer expects. with open(gpkg_path, "rb") as f: data = f.read() - url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=all" + url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=none" r = requests.put( url, data=data, @@ -116,30 +118,28 @@ def publish_geopackage( self._raise(r) actions["upload"] = "ok" - # Set title/abstract on the published featuretype. The GeoPackage layer - # name is the table name written by geopandas (== layer_name here). - if title is not None or abstract is not None: - ft_url = ( - f"{base}/rest/workspaces/{ws}/datastores/{layer_name}" - f"/featuretypes/{layer_name}" - ) - r = requests.put( - ft_url, - auth=auth, - json={ - "featureType": { - "title": title or layer_name, - "abstract": abstract or "", - } - }, - headers={"Content-Type": "application/json"}, - timeout=self.timeout, - ) - # non-fatal: data is already published even if metadata update fails - if r.status_code < 400: - actions["metadata"] = "updated" - else: - actions["metadata"] = f"skipped ({r.status_code})" + # Publish the layer from the gpkg's table. nativeName is the table name + # written by geopandas (== layer_name); name is the published layer + # name. The store was recreated above, so this POST always creates a + # fresh featuretype. srs is set explicitly — the data is WGS84. + ft_url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/featuretypes" + r = requests.post( + ft_url, + auth=auth, + json={ + "featureType": { + "name": layer_name, + "nativeName": layer_name, + "title": title or layer_name, + "abstract": abstract or "", + "srs": "EPSG:4326", + } + }, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + self._raise(r) + actions["layer"] = "published" actions["layer_url"] = f"{base}/{ws}/wms?layers={ws}:{layer_name}" return actions From 2215996190d115e8c721ace6f2200fb08c5a42f4 Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 25 Jun 2026 13:18:16 -0600 Subject: [PATCH 2/3] Flatten geometry to 2D before writing GeoPackage Sites with an elevation produce 3D point geometry ([lon, lat, elev]). GeoServer's GeoPackage reader then fails computing bounds: MismatchedDimensionException: Argument "WGS 84" has 3 dimensions, while 2 was expected. Call force_2d() on the GeoDataFrame before to_file(GPKG). Elevation is still carried as an attribute; only the geometry dimensionality changes. The GeoJSON product on GCS is unaffected. Co-Authored-By: Claude Opus 4.8 --- orchestration/assets/products.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 686e428..9862d0a 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -172,6 +172,11 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path) - if gdf.crs is None: gdf = gdf.set_crs("EPSG:4326") + # Sites with an elevation get 3D point geometry in the GeoJSON. GeoServer's + # GeoPackage reader rejects a 3D CRS ("WGS 84 has 3 dimensions") when + # computing bounds, so flatten to 2D — elevation remains an attribute. + gdf["geometry"] = gdf.geometry.force_2d() + gpkg_path = out_dir / f"{layer_name}.gpkg" gdf.to_file(gpkg_path, driver="GPKG", layer=layer_name) return gpkg_path From 3b35f1ff0b4fb3bae3997cbc05b9e0dd094570fd Mon Sep 17 00:00:00 2001 From: jross Date: Thu, 25 Jun 2026 13:24:59 -0600 Subject: [PATCH 3/3] Auth GCS IO manager via service-account key (no ADC on serverless) The GCSPickleIOManager used dagster_gcp's GCSResource, whose get_client() goes through Application Default Credentials. Dagster+ serverless has no ADC, so asset I/O failed: google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. Add AuthedGCSResource (dagster_gcp GCSResource subclass) that builds the client from GCP_SERVICE_ACCOUNT_KEY, the same secret the product-upload GCSResource already uses, and wire it into the IO manager. Factored the client builder into _storage_client() shared by both. Co-Authored-By: Claude Opus 4.8 --- orchestration/definitions.py | 6 +++--- orchestration/resources/gcs.py | 37 +++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 5cc5233..1cea661 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -2,10 +2,10 @@ import dagster as dg import yaml -from dagster_gcp.gcs import GCSPickleIOManager, GCSResource as DagsterGCSResource +from dagster_gcp.gcs import GCSPickleIOManager from orchestration.resources.die_config import DIEConfigResource -from orchestration.resources.gcs import GCSResource +from orchestration.resources.gcs import GCSResource, AuthedGCSResource from orchestration.resources.geoserver import GeoServerResource from orchestration.assets.products import build_product_assets @@ -65,7 +65,7 @@ def _build_schedules(products_config: dict) -> list: # geoserver) on its own can't load its source inputs from a prior run # and fails with FileNotFoundError. "io_manager": GCSPickleIOManager( - gcs=DagsterGCSResource(), + gcs=AuthedGCSResource(), gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"), gcs_prefix="dagster-io", ), diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 5bdf513..9c77198 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -13,6 +13,19 @@ _GCS_AVAILABLE = False +def _storage_client(): + """Build a GCS client. Dagster+ serverless has no Application Default + Credentials, so prefer an explicit service-account key from a Dagster+ + secret env var; fall back to ADC (local dev with + `gcloud auth application-default login`).""" + if not _GCS_AVAILABLE: + raise ImportError("google-cloud-storage not installed") + key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY") + if key: + return storage.Client.from_service_account_info(json.loads(key)) + return storage.Client() + + class GCSResource(dg.ConfigurableResource): """ Upload OGC Feature Collection GeoJSON files to GCS. @@ -26,16 +39,7 @@ class GCSResource(dg.ConfigurableResource): products_prefix: str = "products" def _client(self): - if not _GCS_AVAILABLE: - raise ImportError("google-cloud-storage not installed") - # Dagster+ serverless has no Application Default Credentials. Prefer an - # explicit service-account key from a Dagster+ secret env var; fall back - # to ADC (local dev with `gcloud auth application-default login`). - key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY") - if key: - info = json.loads(key) - return storage.Client.from_service_account_info(info) - return storage.Client() + return _storage_client() def download_latest(self, product_id: str, dest_path: str) -> str: """Download a product's latest.geojson to *dest_path*. Returns the path.""" @@ -90,3 +94,16 @@ def upload_product( "file_size_bytes": file_size, "run_date": run_date, } + + +from dagster_gcp.gcs import GCSResource as _DagsterGCSResource # noqa: E402 + + +class AuthedGCSResource(_DagsterGCSResource): + """dagster_gcp GCSResource for the GCS IO manager. The stock resource builds + its client via Application Default Credentials, which Dagster+ serverless + lacks — so authenticate from GCP_SERVICE_ACCOUNT_KEY like GCSResource above. + """ + + def get_client(self): + return _storage_client()