diff --git a/modflow_devtools/models.py b/modflow_devtools/models.py index 7388f547..32e17f2a 100644 --- a/modflow_devtools/models.py +++ b/modflow_devtools/models.py @@ -5,7 +5,7 @@ import hashlib import importlib.resources as pkg_resources from abc import ABC, abstractmethod -from collections.abc import Callable +from collections.abc import Callable, Iterable from functools import partial from os import PathLike from pathlib import Path @@ -45,12 +45,6 @@ def _sha256(path: Path) -> str: class ModelRegistry(ABC): - @property - @abstractmethod - def path(self) -> Path: - """The path to the registry's data.""" - ... - @property @abstractmethod def files(self) -> dict: @@ -65,7 +59,7 @@ def files(self) -> dict: @abstractmethod def models(self) -> dict: """ - A map of model name to the model's input file set. + A map of model name to the model's input files. """ ... @@ -102,7 +96,11 @@ class LocalRegistry(ModelRegistry): exclude: ClassVar = [".DS_Store", "compare"] - def __init__(self, path: str | PathLike, namefile_pattern: str = "mfsim.nam"): + def __init__( + self, + path: str | PathLike | Iterable[str | PathLike], + namefile_pattern: str = "mfsim.nam", + ): """ Create a registry from models under the given directory path. @@ -112,12 +110,23 @@ def __init__(self, path: str | PathLike, namefile_pattern: str = "mfsim.nam"): are identified by the presence of a namefile matching `namefile_pattern`. """ - self._path = Path(path).expanduser().resolve().absolute() + # check if path is iterable of str\pathlike + if isinstance(path, Iterable) and not isinstance(path, str): + path = [Path(p).expanduser().resolve().absolute() for p in path] # type: ignore + missing = [p for p in path if not p.is_dir()] # type: ignore + if any(missing): + raise NotADirectoryError( + f"Directory paths not found: {', '.join(missing)}" # type: ignore + ) + self._path = path + else: + path = Path(path).expanduser().resolve().absolute() + if not path.is_dir(): + raise NotADirectoryError(f"Directory path not found: {path}") + self._path = [path] self._files: dict[str, dict[str, str | None]] = {} self._models: dict[str, list[str]] = {} self._examples: dict[str, list[str]] = {} - if not self._path.is_dir(): - raise NotADirectoryError(f"Path {self._path} is not a directory.") self.namefile_pattern = namefile_pattern self.index() @@ -131,25 +140,28 @@ def index(self): self._files = {} self._models = {} self._examples = {} - model_paths = get_model_paths(self._path, namefile=self.namefile_pattern) - for model_path in model_paths: - model_path = model_path.expanduser().resolve().absolute() - rel_path = model_path.relative_to(self._path) - model_name = "/".join(rel_path.parts) - self._models[model_name] = [] - if len(rel_path.parts) > 1: - name = rel_path.parts[0] - if name not in self._examples: - self._examples[name] = [] - self._examples[name].append(model_name) - for p in model_path.rglob("*"): - if not p.is_file() or any(e in p.name for e in LocalRegistry.exclude): - continue - relpath = p.expanduser().absolute().relative_to(self._path) - name = "/".join(relpath.parts) - hash = _sha256(p) - self._files[name] = {"hash": hash} - self._models[model_name].append(name) + for path in self._path: + model_paths = get_model_paths(path, namefile=self.namefile_pattern) + for model_path in model_paths: + model_path = model_path.expanduser().resolve().absolute() + rel_path = model_path.relative_to(path) + model_name = "/".join(rel_path.parts) + self._models[model_name] = [] + if len(rel_path.parts) > 1: + name = rel_path.parts[0] + if name not in self._examples: + self._examples[name] = [] + self._examples[name].append(model_name) + for p in model_path.rglob("*"): + if not p.is_file() or any( + e in p.name for e in LocalRegistry.exclude + ): + continue + relpath = p.expanduser().absolute().relative_to(path) + name = "/".join(relpath.parts) + hash = _sha256(p) + self._files[name] = {"hash": hash, "path": p, "relpath": relpath} + self._models[model_name].append(p) def copy_to( self, workspace: str | PathLike, model_name: str, verbose: bool = False @@ -159,7 +171,7 @@ def copy_to( The workspace will be created if it does not exist. """ - if not any(files := self.models.get(model_name, [])): + if not any(file_paths := self.models.get(model_name, [])): return None # create the workspace if needed workspace = Path(workspace).expanduser().absolute() @@ -169,18 +181,18 @@ def copy_to( # copy the files. some might be in nested folders, # but the first is guaranteed not to be, so use it # to determine relative path in the new workspace. - base = Path(files[0]).parent - for file in [Path(self._path) / f for f in files]: + base = Path(file_paths[0]).parent + for file_path in file_paths: if verbose: - print(f"Copying {file} to workspace") - path = workspace / file.relative_to(base) - path.parent.mkdir(parents=True, exist_ok=True) - copy(file, workspace / file.relative_to(base)) + print(f"Copying {file_path} to workspace") + dest = workspace / file_path.relative_to(base) + dest.parent.mkdir(parents=True, exist_ok=True) + copy(file_path, dest) return workspace @property - def path(self) -> Path: - return self._path + def path(self) -> list[Path]: + return self._path # type: ignore @property def files(self) -> dict: