Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path) -
if gdf.crs is None:
gdf = gdf.set_crs("EPSG:4326")

# Sites with an elevation get 3D point geometry in the GeoJSON. GeoServer's
# GeoPackage reader rejects a 3D CRS ("WGS 84 has 3 dimensions") when
# computing bounds, so flatten to 2D — elevation remains an attribute.
gdf["geometry"] = gdf.geometry.force_2d()

gpkg_path = out_dir / f"{layer_name}.gpkg"
gdf.to_file(gpkg_path, driver="GPKG", layer=layer_name)
return gpkg_path
Expand Down
6 changes: 3 additions & 3 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import dagster as dg
import yaml
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource as DagsterGCSResource
from dagster_gcp.gcs import GCSPickleIOManager

from orchestration.resources.die_config import DIEConfigResource
from orchestration.resources.gcs import GCSResource
from orchestration.resources.gcs import GCSResource, AuthedGCSResource
from orchestration.resources.geoserver import GeoServerResource
from orchestration.assets.products import build_product_assets

Expand Down Expand Up @@ -65,7 +65,7 @@ def _build_schedules(products_config: dict) -> list:
# geoserver) on its own can't load its source inputs from a prior run
# and fails with FileNotFoundError.
"io_manager": GCSPickleIOManager(
gcs=DagsterGCSResource(),
gcs=AuthedGCSResource(),
gcs_bucket=_products_config.get("gcs_bucket", "dataservices-die-products"),
gcs_prefix="dagster-io",
),
Expand Down
37 changes: 27 additions & 10 deletions orchestration/resources/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@
_GCS_AVAILABLE = False


def _storage_client():
"""Build a GCS client. Dagster+ serverless has no Application Default
Credentials, so prefer an explicit service-account key from a Dagster+
secret env var; fall back to ADC (local dev with
`gcloud auth application-default login`)."""
if not _GCS_AVAILABLE:
raise ImportError("google-cloud-storage not installed")
key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY")
if key:
return storage.Client.from_service_account_info(json.loads(key))
return storage.Client()


class GCSResource(dg.ConfigurableResource):
"""
Upload OGC Feature Collection GeoJSON files to GCS.
Expand All @@ -26,16 +39,7 @@ class GCSResource(dg.ConfigurableResource):
products_prefix: str = "products"

def _client(self):
if not _GCS_AVAILABLE:
raise ImportError("google-cloud-storage not installed")
# Dagster+ serverless has no Application Default Credentials. Prefer an
# explicit service-account key from a Dagster+ secret env var; fall back
# to ADC (local dev with `gcloud auth application-default login`).
key = os.environ.get("GCP_SERVICE_ACCOUNT_KEY")
if key:
info = json.loads(key)
return storage.Client.from_service_account_info(info)
return storage.Client()
return _storage_client()

def download_latest(self, product_id: str, dest_path: str) -> str:
"""Download a product's latest.geojson to *dest_path*. Returns the path."""
Expand Down Expand Up @@ -90,3 +94,16 @@ def upload_product(
"file_size_bytes": file_size,
"run_date": run_date,
}


from dagster_gcp.gcs import GCSResource as _DagsterGCSResource # noqa: E402


class AuthedGCSResource(_DagsterGCSResource):
"""dagster_gcp GCSResource for the GCS IO manager. The stock resource builds
its client via Application Default Credentials, which Dagster+ serverless
lacks — so authenticate from GCP_SERVICE_ACCOUNT_KEY like GCSResource above.
"""

def get_client(self):
return _storage_client()
56 changes: 28 additions & 28 deletions orchestration/resources/geoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ def publish_geopackage(
self._raise(r)
actions["datastore_reset"] = "deleted" if r.status_code == 200 else "absent"

# Upload the GeoPackage. PUT .../file.gpkg creates the datastore and
# publishes the contained layer. A .gpkg is a SQLite database, hence
# the application/x-sqlite3 content type GeoServer expects.
# Upload the GeoPackage to create the datastore only. Unlike the
# shapefile flow, configure=all does NOT reliably publish a gpkg's
# layer, so use configure=none and publish the featuretype explicitly
# below. A .gpkg is a SQLite database, hence the application/x-sqlite3
# content type GeoServer expects.
with open(gpkg_path, "rb") as f:
data = f.read()
url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=all"
url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=none"
r = requests.put(
url,
data=data,
Expand All @@ -116,30 +118,28 @@ def publish_geopackage(
self._raise(r)
actions["upload"] = "ok"

# Set title/abstract on the published featuretype. The GeoPackage layer
# name is the table name written by geopandas (== layer_name here).
if title is not None or abstract is not None:
ft_url = (
f"{base}/rest/workspaces/{ws}/datastores/{layer_name}"
f"/featuretypes/{layer_name}"
)
r = requests.put(
ft_url,
auth=auth,
json={
"featureType": {
"title": title or layer_name,
"abstract": abstract or "",
}
},
headers={"Content-Type": "application/json"},
timeout=self.timeout,
)
# non-fatal: data is already published even if metadata update fails
if r.status_code < 400:
actions["metadata"] = "updated"
else:
actions["metadata"] = f"skipped ({r.status_code})"
# Publish the layer from the gpkg's table. nativeName is the table name
# written by geopandas (== layer_name); name is the published layer
# name. The store was recreated above, so this POST always creates a
# fresh featuretype. srs is set explicitly — the data is WGS84.
ft_url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/featuretypes"
r = requests.post(
ft_url,
auth=auth,
json={
"featureType": {
"name": layer_name,
"nativeName": layer_name,
"title": title or layer_name,
"abstract": abstract or "",
"srs": "EPSG:4326",
}
},
headers={"Content-Type": "application/json"},
timeout=self.timeout,
)
self._raise(r)
actions["layer"] = "published"

actions["layer_url"] = f"{base}/{ws}/wms?layers={ws}:{layer_name}"
return actions
Loading