diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 686e428..9862d0a 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -172,6 +172,11 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path) - if gdf.crs is None: gdf = gdf.set_crs("EPSG:4326") + # Sites with an elevation get 3D point geometry in the GeoJSON. GeoServer's + # GeoPackage reader rejects a 3D CRS ("WGS 84 has 3 dimensions") when + # computing bounds, so flatten to 2D — elevation remains an attribute. + gdf["geometry"] = gdf.geometry.force_2d() + gpkg_path = out_dir / f"{layer_name}.gpkg" gdf.to_file(gpkg_path, driver="GPKG", layer=layer_name) return gpkg_path diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 5cc5233..1cea661 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -2,10 +2,10 @@ import dagster as dg import yaml -from dagster_gcp.gcs import GCSPickleIOManager, GCSResource as DagsterGCSResource +from dagster_gcp.gcs import GCSPickleIOManager from orchestration.resources.die_config import DIEConfigResource -from orchestration.resources.gcs import GCSResource +from orchestration.resources.gcs import GCSResource, AuthedGCSResource from orchestration.resources.geoserver import GeoServerResource from orchestration.assets.products import build_product_assets @@ -65,7 +65,7 @@ def _build_schedules(products_config: dict) -> list: # geoserver) on its own can't load its source inputs from a prior run # and fails with FileNotFoundError. "io_manager": GCSPickleIOManager( - gcs=DagsterGCSResource(), + gcs=AuthedGCSResource(), gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"), gcs_prefix="dagster-io", ), diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 5bdf513..9c77198 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -13,6 +13,19 @@ _GCS_AVAILABLE = False +def _storage_client(): + """Build a GCS client. Dagster+ serverless has no Application Default + Credentials, so prefer an explicit service-account key from a Dagster+ + secret env var; fall back to ADC (local dev with + `gcloud auth application-default login`).""" + if not _GCS_AVAILABLE: + raise ImportError("google-cloud-storage not installed") + key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY") + if key: + return storage.Client.from_service_account_info(json.loads(key)) + return storage.Client() + + class GCSResource(dg.ConfigurableResource): """ Upload OGC Feature Collection GeoJSON files to GCS. @@ -26,16 +39,7 @@ class GCSResource(dg.ConfigurableResource): products_prefix: str = "products" def _client(self): - if not _GCS_AVAILABLE: - raise ImportError("google-cloud-storage not installed") - # Dagster+ serverless has no Application Default Credentials. Prefer an - # explicit service-account key from a Dagster+ secret env var; fall back - # to ADC (local dev with `gcloud auth application-default login`). - key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY") - if key: - info = json.loads(key) - return storage.Client.from_service_account_info(info) - return storage.Client() + return _storage_client() def download_latest(self, product_id: str, dest_path: str) -> str: """Download a product's latest.geojson to *dest_path*. Returns the path.""" @@ -90,3 +94,16 @@ def upload_product( "file_size_bytes": file_size, "run_date": run_date, } + + +from dagster_gcp.gcs import GCSResource as _DagsterGCSResource # noqa: E402 + + +class AuthedGCSResource(_DagsterGCSResource): + """dagster_gcp GCSResource for the GCS IO manager. The stock resource builds + its client via Application Default Credentials, which Dagster+ serverless + lacks — so authenticate from GCP_SERVICE_ACCOUNT_KEY like GCSResource above. + """ + + def get_client(self): + return _storage_client() diff --git a/orchestration/resources/geoserver.py b/orchestration/resources/geoserver.py index 5b406be..8411ee6 100644 --- a/orchestration/resources/geoserver.py +++ b/orchestration/resources/geoserver.py @@ -100,12 +100,14 @@ def publish_geopackage( self._raise(r) actions["datastore_reset"] = "deleted" if r.status_code == 200 else "absent" - # Upload the GeoPackage. PUT .../file.gpkg creates the datastore and - # publishes the contained layer. A .gpkg is a SQLite database, hence - # the application/x-sqlite3 content type GeoServer expects. + # Upload the GeoPackage to create the datastore only. Unlike the + # shapefile flow, configure=all does NOT reliably publish a gpkg's + # layer, so use configure=none and publish the featuretype explicitly + # below. A .gpkg is a SQLite database, hence the application/x-sqlite3 + # content type GeoServer expects. with open(gpkg_path, "rb") as f: data = f.read() - url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=all" + url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=none" r = requests.put( url, data=data, @@ -116,30 +118,28 @@ def publish_geopackage( self._raise(r) actions["upload"] = "ok" - # Set title/abstract on the published featuretype. The GeoPackage layer - # name is the table name written by geopandas (== layer_name here). - if title is not None or abstract is not None: - ft_url = ( - f"{base}/rest/workspaces/{ws}/datastores/{layer_name}" - f"/featuretypes/{layer_name}" - ) - r = requests.put( - ft_url, - auth=auth, - json={ - "featureType": { - "title": title or layer_name, - "abstract": abstract or "", - } - }, - headers={"Content-Type": "application/json"}, - timeout=self.timeout, - ) - # non-fatal: data is already published even if metadata update fails - if r.status_code < 400: - actions["metadata"] = "updated" - else: - actions["metadata"] = f"skipped ({r.status_code})" + # Publish the layer from the gpkg's table. nativeName is the table name + # written by geopandas (== layer_name); name is the published layer + # name. The store was recreated above, so this POST always creates a + # fresh featuretype. srs is set explicitly — the data is WGS84. + ft_url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/featuretypes" + r = requests.post( + ft_url, + auth=auth, + json={ + "featureType": { + "name": layer_name, + "nativeName": layer_name, + "title": title or layer_name, + "abstract": abstract or "", + "srs": "EPSG:4326", + } + }, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + self._raise(r) + actions["layer"] = "published" actions["layer_url"] = f"{base}/{ws}/wms?layers={ws}:{layer_name}" return actions