-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcut_protected_areas_by_county.py
More file actions
345 lines (273 loc) · 11.2 KB
/
cut_protected_areas_by_county.py
File metadata and controls
345 lines (273 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""Cut public lands polygons by county boundaries.
This script prepares two vector layers (public lands and counties) and produces a
new vector layer where public lands geometries are intersected with county
polygons. Public lands features fully contained by a county remain unchanged,
while features that cross county boundaries are split into multiple features.
Processing is organized as a task graph with intermediate artifacts written to
`INTERMEDIATE_DIR`. The final cut dataset is written to `OUT_DIR`.
Typical pipeline:
1. Fix invalid geometries in both layers.
2. Reproject counties to the public lands CRS (work CRS).
3. Simplify public lands geometries.
4. Drop null and zero-area geometries.
5. Spatially prefilter intersecting features (sjoin) and cut via overlay
intersection.
Outputs are written as GeoPackage layers when the destination path ends in
`.gpkg`; otherwise, the driver is inferred by GeoPandas.
"""
from pathlib import Path
import logging
import pandas as pd
import geopandas as gpd
from tqdm.auto import tqdm
from shapely import make_valid
from ecoshard import taskgraph
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s %(filename)s:%(lineno)d %(message)s",
)
tqdm.pandas()
OUT_DIR = "./output"
INTERMEDIATE_DIR = "./intermediate"
PUBLIC_LANDS_VECTOR_PATH = "./data/public_lands_only_fixed.gpkg"
COUNTIES_VECTOR_PATH = "./data/gz_2010_us_050_00_5m/gz_2010_us_050_00_5m.shp"
CUT_PUBLIC_LANDS_VECTOR_PATH = (
f"{OUT_DIR}/gz_2010_us_050_00_5m_cut_by_public_lands.gpkg"
)
SIMPLIFY_TOLERANCE_IN_TARGET_UNITS = 50
def _layer_from_path(path: str) -> str:
"""Derive a default layer name from a filesystem path.
For a path like `/some/dir/foo.gpkg`, this returns `foo`.
Args:
path: Path to a dataset file.
Returns:
The filename stem (final path component without extension).
"""
return Path(path).stem
def _read(path: str) -> gpd.GeoDataFrame:
"""Read a vector dataset into a GeoDataFrame.
Args:
path: Path to a vector dataset readable by GeoPandas.
Returns:
A GeoDataFrame containing the dataset's features and geometry column.
"""
return gpd.read_file(path)
def _write(gdf: gpd.GeoDataFrame, path: str, layer: str | None = None) -> None:
"""Write a GeoDataFrame to disk.
If `path` ends with `.gpkg`, the output is written as a GeoPackage and the
layer name is set to `layer` or (if unset) the stem of `path`. Otherwise,
GeoPandas infers the driver from the file extension.
Args:
gdf: GeoDataFrame to write.
path: Destination path.
layer: Optional layer name (used for GeoPackage outputs only).
Returns:
None
"""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
if p.suffix.lower() == ".gpkg":
gdf.to_file(path, driver="GPKG", layer=layer or p.stem, index=False)
else:
gdf.to_file(path, index=False)
def fix_geometries(in_path: str, out_path: str, layer: str) -> None:
"""Fix invalid geometries and write the result to a new dataset.
This uses `shapely.make_valid` to repair invalid geometries (e.g. self-
intersections). The operation is applied to every feature geometry and the
result is written to `out_path`.
Args:
in_path: Input vector dataset path.
out_path: Output vector dataset path.
layer: Output layer name (for GeoPackage outputs).
Returns:
None
"""
gdf = _read(in_path)
gdf.geometry = make_valid(gdf.geometry)
_write(gdf, out_path, layer=layer)
def reproject(in_path: str, out_path: str, target_crs, layer: str) -> None:
"""Reproject a vector dataset to a target CRS and write the result.
Args:
in_path: Input vector dataset path.
out_path: Output vector dataset path.
target_crs: CRS passed to GeoPandas `to_crs` (e.g. EPSG int/string or
CRS object).
layer: Output layer name (for GeoPackage outputs).
Returns:
None
"""
gdf = _read(in_path)
gdf = gdf.to_crs(target_crs)
_write(gdf, out_path, layer=layer)
def simplify(
in_path: str, out_path: str, tolerance: float, layer: str | None = None
) -> None:
"""Simplify geometries and write the result.
Uses Shapely geometry simplification with `preserve_topology=True`.
Args:
in_path: Input vector dataset path.
out_path: Output vector dataset path.
tolerance: Simplification tolerance in the dataset's coordinate units.
layer: Optional output layer name (for GeoPackage outputs).
Returns:
None
"""
gdf = _read(in_path)
gdf.geometry = gdf.geometry.simplify(tolerance, preserve_topology=True)
_write(gdf, out_path, layer=layer)
def drop_zero_area(in_path: str, out_path: str, layer: str | None = None) -> None:
"""Drop null and zero-area geometries and write the result.
Note that `.area` is computed in the layer CRS; for meaningful area filtering
this should be a projected CRS.
Args:
in_path: Input vector dataset path.
out_path: Output vector dataset path.
layer: Optional output layer name (for GeoPackage outputs).
Returns:
None
"""
gdf = _read(in_path)
gdf = gdf[gdf.geometry.notna() & (gdf.geometry.area > 0)].copy()
_write(gdf, out_path, layer=layer)
def cut_public_lands_by_counties(
public_lands_path: str,
counties_path: str,
out_path: str,
out_layer: str = "public_lands_cut",
county_keep_cols: list[str] | None = None,
) -> None:
"""Cut public lands polygons by county boundaries.
The cut operation is implemented as an overlay intersection between public
lands and county polygons. This yields:
- One output feature for each public-lands portion within a county.
- Public-lands features fully contained by a county remain geometrically
unchanged (aside from possible vertex ordering).
- Public-lands features that cross county boundaries are split into
multiple output features, one per intersected county.
Prior to overlay, a spatial join is used to restrict processing to
intersecting features only.
Args:
public_lands_path: Path to the prepared public lands dataset.
counties_path: Path to the prepared counties dataset (must be in the same
CRS as `public_lands_path`).
out_path: Destination path for the cut output dataset.
out_layer: Layer name for GeoPackage outputs.
county_keep_cols: Optional list of county columns to retain in the output
(in addition to geometry). Columns not present in `counties` are
ignored. If omitted, only county geometry is used.
Returns:
None
"""
public_lands = _read(public_lands_path).reset_index(drop=True)
counties = _read(counties_path).reset_index(drop=True)
public_lands["_pl_id"] = public_lands.index
keep_cols = county_keep_cols or ["geometry"]
keep_cols = [c for c in keep_cols if c in counties.columns]
if "geometry" not in keep_cols:
keep_cols = ["geometry"] + keep_cols
hits = gpd.sjoin(
public_lands[["geometry", "_pl_id"]],
counties[["geometry"]],
how="inner",
predicate="intersects",
)
pl_idx = hits.index.unique()
cty_idx = hits["index_right"].unique()
public_lands_sub = public_lands.loc[pl_idx].copy()
counties_sub = counties.loc[cty_idx, keep_cols].copy()
cut = gpd.overlay(
public_lands_sub, counties_sub, how="intersection", keep_geom_type=False
)
cut = cut[cut.geometry.notna() & (cut.geometry.area > 0)].copy()
for col in cut.columns:
if col != "geometry" and pd.api.types.is_object_dtype(cut[col]):
cut[col] = cut[col].astype(str)
_write(cut, out_path, layer=out_layer)
def main():
"""Run the end-to-end cutting workflow using a TaskGraph pipeline.
This function:
- Creates output and intermediate directories.
- Creates a TaskGraph to manage intermediate steps and dependencies.
- Fixes geometries for both layers.
- Reprojects counties to the public lands CRS.
- Simplifies public lands geometries.
- Drops null/zero-area geometries for both layers.
- Cuts public lands by counties via overlay intersection and writes the
final GeoPackage.
Returns:
None
"""
out_dir = Path(OUT_DIR)
out_dir.mkdir(parents=True, exist_ok=True)
intermediate_dir = Path(INTERMEDIATE_DIR)
intermediate_dir.mkdir(parents=True, exist_ok=True)
task_graph = taskgraph.TaskGraph(str(intermediate_dir), 2, 15.0)
pl_fixed = str(intermediate_dir / "public_lands_fixed.gpkg")
cty_fixed = str(intermediate_dir / "counties_fixed.gpkg")
fix_public_lands_task = task_graph.add_task(
func=fix_geometries,
args=(PUBLIC_LANDS_VECTOR_PATH, pl_fixed, "public_lands_fixed"),
target_path_list=[pl_fixed],
task_name="fix public lands",
)
fix_counties_task = task_graph.add_task(
func=fix_geometries(COUNTIES_VECTOR_PATH, cty_fixed, "counties_fixed"),
target_path_list=[cty_fixed],
task_name="fix counties",
)
fix_public_lands_task.join()
work_crs = _read(pl_fixed).crs
cty_reproj = str(intermediate_dir / "counties_reprojected_to_public_lands.gpkg")
reproject_counties_task = task_graph.add_task(
func=reproject,
args=(cty_fixed, cty_reproj, work_crs, "counties_reprojected"),
target_path_list=[cty_reproj],
dependent_task_list=[fix_counties_task],
task_name="reproject fixed counties",
)
pl_simpl = str(intermediate_dir / "public_lands_simplified.gpkg")
public_lands_simplified_task = task_graph.add_task(
func=simplify,
args=(
pl_fixed,
pl_simpl,
SIMPLIFY_TOLERANCE_IN_TARGET_UNITS,
"public_lands_simplified",
),
target_path_list=[pl_simpl],
dependent_task_list=[fix_public_lands_task],
task_name="simplify public lands",
)
pl_clean = str(intermediate_dir / "public_lands_nonzero_area.gpkg")
cty_clean = str(intermediate_dir / "counties_nonzero_area.gpkg")
clean_public_lands_task = task_graph.add_task(
func=drop_zero_area,
args=(pl_simpl, pl_clean, "public_lands_nonzero_area"),
target_path_list=[pl_clean],
dependent_task_list=[public_lands_simplified_task],
task_name="drop zero area public_lands_nonzero_area",
)
clean_counties_task = task_graph.add_task(
func=drop_zero_area,
args=(cty_reproj, cty_clean, "counties_nonzero_area"),
target_path_list=[cty_clean],
dependent_task_list=[reproject_counties_task],
task_name="drop zero area counties_nonzero_area",
)
task_graph.add_task(
func=cut_public_lands_by_counties,
args=(
cty_clean,
pl_clean,
CUT_PUBLIC_LANDS_VECTOR_PATH,
"public_lands_cut",
["geometry"],
),
target_path_list=[CUT_PUBLIC_LANDS_VECTOR_PATH],
dependent_task_list=[clean_public_lands_task, clean_counties_task],
task_name="cut public lands by county",
)
task_graph.join()
task_graph.close()
if __name__ == "__main__":
main()