diff --git a/docs/api/changelog.rst b/docs/api/changelog.rst index 531a88116..1f073881c 100644 --- a/docs/api/changelog.rst +++ b/docs/api/changelog.rst @@ -14,6 +14,8 @@ Added - :class:`imod.msw.MeteoGridCopy` to copy existing `mete_grid.inp` files, so ASCII grids in large existing meteo databases do not have to be read. +- :class:`imod.msw.CopyFiles` to copy settings and lookup tables in existing + ``.inp`` files. - :meth:`imod.mf6.LayeredWell.from_imod5_cap_data` to construct a :class:`imod.mf6.LayeredWell` package from iMOD5 data in the CAP package (for MetaSWAP). Currently only griddata (IDF) is supported. diff --git a/imod/msw/__init__.py b/imod/msw/__init__.py index 83f8df90b..d8d053c49 100644 --- a/imod/msw/__init__.py +++ b/imod/msw/__init__.py @@ -1,3 +1,4 @@ +from imod.msw.copy_files import FileCopier from imod.msw.coupler_mapping import CouplerMapping from imod.msw.grid_data import GridData from imod.msw.idf_mapping import IdfMapping diff --git a/imod/msw/copy_files.py b/imod/msw/copy_files.py new file mode 100644 index 000000000..c5615b5fb --- /dev/null +++ b/imod/msw/copy_files.py @@ -0,0 +1,55 @@ +from pathlib import Path +from shutil import copy2 +from typing import cast + +import numpy as np +import xarray as xr + +from imod.logging import logger +from imod.logging.loglevel import LogLevel +from imod.msw.pkgbase import MetaSwapPackage +from imod.typing import Imod5DataDict + +_LOG_MESSAGE_TEMPLATE = """\ +Will not copy files {filtered}, these will be generated by iMOD Python +instead.""" + + +class FileCopier(MetaSwapPackage): + def __init__(self, paths: list[str]): + super().__init__() + paths_da = xr.DataArray( + paths, coords={"file_nr": np.arange(len(paths))}, dims=("file_nr",) + ) + self.dataset["paths"] = paths_da + + @classmethod + def from_imod5_data(cls, imod5_data: Imod5DataDict): + paths = cast(list[list[str]], imod5_data["extra"]["paths"]) + paths_unpacked = {Path(p[0]) for p in paths} + files_to_filter = ( + "mete_grid.inp", + "para_sim.inp", + "svat2precgrid.inp", + "svat2etrefgrid.inp", + ) + paths_included = [ + str(p) for p in paths_unpacked if p.name.lower() not in files_to_filter + ] + paths_excluded = {str(p) for p in paths_unpacked} - set(paths_included) + if paths_excluded: + log_message = _LOG_MESSAGE_TEMPLATE.format(filtered=paths_excluded) + logger.log( + loglevel=LogLevel.INFO, + message=log_message, + ) + return cls(paths_included) + + def write(self, directory: str | Path, *_): + directory = Path(directory) + + src_paths = [Path(p) for p in self.dataset["paths"].to_numpy()] + dst_paths = [directory / p.name for p in src_paths] + + for src_path, dst_path in zip(src_paths, dst_paths): + copy2(src_path, dst_path) diff --git a/imod/tests/test_msw/test_copy_files.py b/imod/tests/test_msw/test_copy_files.py new file mode 100644 index 000000000..80d8ceb08 --- /dev/null +++ b/imod/tests/test_msw/test_copy_files.py @@ -0,0 +1,69 @@ +from pytest_cases import parametrize_with_cases + +from imod.msw.copy_files import FileCopier + + +def write_test_files(directory, filenames): + paths = [directory / filename for filename in filenames] + for p in paths: + with open(p, mode="w") as f: + f.write("test") + return paths + + +def case_simple_files(tmp_path_factory): + directory = tmp_path_factory.mktemp("simple_files") + filenames = [ + "a.inp", + "b.inp", + "c.inp", + ] + return write_test_files(directory, filenames) + + +def case_imod5_extra_files(tmp_path_factory): + directory = tmp_path_factory.mktemp("imod5_extra_files") + filenames = [ + "a.inp", + "b.inp", + "c.inp", + "mete_grid.inp", + "para_sim.inp", + "svat2precgrid.inp", + "svat2etrefgrid.inp", + ] + return write_test_files(directory, filenames) + + +@parametrize_with_cases("src_files", cases=".") +def test_copyfile_init(src_files): + # Act + copyfiles = FileCopier(src_files) + # Arrange + assert "paths" in copyfiles.dataset.keys() + assert len(copyfiles.dataset["paths"]) == len(src_files) + + +@parametrize_with_cases("src_files", cases=".") +def test_copyfile_write(src_files, tmp_path): + # Arrange + expected_filenames = {f.name for f in src_files} + # Act + copyfiles = FileCopier(src_files) + copyfiles.write(tmp_path) + # Assert + actual_filepaths = tmp_path.glob("*.inp") + actual_filenames = {f.name for f in actual_filepaths} + diff = expected_filenames ^ actual_filenames + assert len(diff) == 0 + + +@parametrize_with_cases("src_files", cases=".") +def test_from_imod5_data(src_files): + # Arrange + imod5_ls = [[p] for p in src_files] + imod5_data = {"extra": {"paths": imod5_ls}} + # Act + copyfiles = FileCopier.from_imod5_data(imod5_data) + # Assert + len(copyfiles.dataset["paths"]) == 3