Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"""
import tempfile
import traceback
import zipfile
from pathlib import Path

import dagster as dg
Expand Down Expand Up @@ -162,28 +161,20 @@ def _combine_asset(
return _combine_asset


def _geojson_to_shapefile_zip(geojson_path: Path, layer_name: str, out_dir: Path) -> Path:
"""Convert a GeoJSON file to a zipped ESRI Shapefile whose components share
*layer_name* as their basename (so the published GeoServer layer is named
*layer_name*). Returns the path to the zip."""
def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path) -> Path:
"""Convert a GeoJSON file to a GeoPackage whose layer (table) is named
*layer_name* (so the published GeoServer layer is named *layer_name*).
GeoPackage is a single file with no field-name length limit, unlike the
zipped ESRI Shapefile this replaces. Returns the path to the .gpkg."""
gdf = gpd.read_file(geojson_path)
if gdf.empty:
raise ValueError(f"{layer_name}: GeoJSON has no features; nothing to publish")
if gdf.crs is None:
gdf = gdf.set_crs("EPSG:4326")

shp_dir = out_dir / "shp"
shp_dir.mkdir(exist_ok=True)
shp_path = shp_dir / f"{layer_name}.shp"
# pyogrio engine writes ESRI Shapefile; field names are truncated to the
# 10-char shapefile limit automatically.
gdf.to_file(shp_path, driver="ESRI Shapefile")

zip_path = out_dir / f"{layer_name}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for component in shp_dir.glob(f"{layer_name}.*"):
zf.write(component, arcname=component.name)
return zip_path
gpkg_path = out_dir / f"{layer_name}.gpkg"
gdf.to_file(gpkg_path, driver="GPKG", layer=layer_name)
return gpkg_path


def _build_geoserver_asset(product: dict, group: str):
Expand All @@ -207,10 +198,10 @@ def _geoserver_asset(
with tempfile.TemporaryDirectory() as tmpdir:
geojson = Path(tmpdir) / f"{pid}.geojson"
gcs.download_latest(pid, str(geojson))
zip_path = _geojson_to_shapefile_zip(geojson, pid, Path(tmpdir))
actions = geoserver.publish_shapefile(
gpkg_path = _geojson_to_geopackage(geojson, pid, Path(tmpdir))
actions = geoserver.publish_geopackage(
pid,
str(zip_path),
str(gpkg_path),
title=product.get("title", pid),
abstract=product.get("description", ""),
)
Expand Down
41 changes: 21 additions & 20 deletions orchestration/resources/geoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class GeoServerResource(dg.ConfigurableResource):
GEOSERVER_PASSWORD admin password
GEOSERVER_WORKSPACE target workspace (default "die")

The layer is published from a zipped ESRI Shapefile uploaded to a native
shapefile datastore (no GeoServer extensions required). GeoServer keeps its
own copy of the data, refreshed on each run.
The layer is published from a GeoPackage uploaded to a native GeoPackage
datastore (no GeoServer extensions required). GeoServer keeps its own copy
of the data, refreshed on each run.
"""

timeout: int = 60
Expand Down Expand Up @@ -69,29 +69,29 @@ def _ensure_workspace(self, base, ws, auth):
return "exists"

# -- public api -----------------------------------------------------------
def publish_shapefile(
def publish_geopackage(
self,
layer_name: str,
shapefile_zip_path: str,
gpkg_path: str,
title: Optional[str] = None,
abstract: Optional[str] = None,
) -> dict:
"""Create-or-update a native shapefile datastore named *layer_name* from
the zipped shapefile at *shapefile_zip_path* and publish its layer.
Idempotent — re-running overwrites the store's data. Returns a dict
describing the actions taken."""
"""Create-or-update a native GeoPackage datastore named *layer_name*
from the GeoPackage at *gpkg_path* and publish its layer. Idempotent —
re-running overwrites the store's data. Returns a dict describing the
actions taken."""
cfg = self._cfg()
base = cfg["url"]
ws = cfg["workspace"]
auth = cfg["auth"]

actions = {"workspace": self._ensure_workspace(base, ws, auth)}

# Delete any pre-existing datastore of this name first. PUT file.shp
# Delete any pre-existing datastore of this name first. PUT file.gpkg
# reuses an existing store's factory type, so a store left over from a
# different backend (e.g. a broken OGR store) would otherwise force a
# 500. Recreating from scratch each run keeps this self-healing; the
# data is re-uploaded every run regardless.
# different backend (e.g. a shapefile or OGR store) would otherwise
# force a 500. Recreating from scratch each run keeps this
# self-healing; the data is re-uploaded every run regardless.
ds_url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}"
r = requests.delete(
f"{ds_url}?recurse=true", auth=auth, timeout=self.timeout
Expand All @@ -100,23 +100,24 @@ def publish_shapefile(
self._raise(r)
actions["datastore_reset"] = "deleted" if r.status_code == 200 else "absent"

# Upload the zipped shapefile. PUT .../file.shp creates the datastore and
# publishes the contained layer.
with open(shapefile_zip_path, "rb") as f:
# Upload the GeoPackage. PUT .../file.gpkg creates the datastore and
# publishes the contained layer. A .gpkg is a SQLite database, hence
# the application/x-sqlite3 content type GeoServer expects.
with open(gpkg_path, "rb") as f:
data = f.read()
url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.shp?configure=all"
url = f"{base}/rest/workspaces/{ws}/datastores/{layer_name}/file.gpkg?configure=all"
r = requests.put(
url,
data=data,
auth=auth,
headers={"Content-Type": "application/zip"},
headers={"Content-Type": "application/x-sqlite3"},
timeout=self.timeout,
)
self._raise(r)
actions["upload"] = "ok"

# Set title/abstract on the published featuretype. The shapefile layer
# name defaults to the .shp basename inside the zip (== layer_name here).
# Set title/abstract on the published featuretype. The GeoPackage layer
# name is the table name written by geopandas (== layer_name here).
if title is not None or abstract is not None:
ft_url = (
f"{base}/rest/workspaces/{ws}/datastores/{layer_name}"
Expand Down
Loading