From 37b3b760c0b9ec9b71ca1e462eabb895a0c68a55 Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Sun, 21 Jun 2026 00:01:08 +0800 Subject: [PATCH] feat(data): add dpdata format conversion --- deepmd/pd/entrypoints/main.py | 33 +- deepmd/pt/entrypoints/main.py | 86 +-- deepmd/pt_expt/entrypoints/main.py | 38 +- deepmd/utils/argcheck.py | 54 ++ deepmd/utils/data_system.py | 529 +++++++++++++++++- pyproject.toml | 2 +- .../common/test_data_system_conversion.py | 205 +++++++ 7 files changed, 895 insertions(+), 52 deletions(-) create mode 100644 source/tests/common/test_data_system_conversion.py diff --git a/deepmd/pd/entrypoints/main.py b/deepmd/pd/entrypoints/main.py index f397bc358b..72c9ef7677 100644 --- a/deepmd/pd/entrypoints/main.py +++ b/deepmd/pd/entrypoints/main.py @@ -23,6 +23,9 @@ expand_sys_str, j_loader, ) +from deepmd.dpmodel.utils.lmdb_data import ( + is_lmdb, +) from deepmd.loggers.loggers import ( set_log_handles, ) @@ -109,10 +112,36 @@ def prepare_trainer_input_single( ) training_systems = training_dataset_params["systems"] trn_patterns = training_dataset_params.get("rglob_patterns", None) - training_systems = process_systems(training_systems, patterns=trn_patterns) + training_systems = process_systems( + training_systems, + patterns=trn_patterns, + fmt=training_dataset_params.get("format", None), + out_fmt=training_dataset_params.get( + "out_format", training_dataset_params.get("output_format", None) + ), + ) + if len(training_systems) == 1 and is_lmdb(training_systems[0]): + raise NotImplementedError( + "Paddle backend does not support LMDB training data yet. " + "Set training_data.out_format to 'deepmd/hdf5' when using " + "training_data.format for automatic conversion." + ) if validation_systems is not None: val_patterns = validation_dataset_params.get("rglob_patterns", None) - validation_systems = process_systems(validation_systems, val_patterns) + validation_systems = process_systems( + validation_systems, + val_patterns, + fmt=validation_dataset_params.get("format", None), + out_fmt=validation_dataset_params.get( + "out_format", validation_dataset_params.get("output_format", None) + ), + ) + if len(validation_systems) == 1 and is_lmdb(validation_systems[0]): + raise NotImplementedError( + "Paddle backend does not support LMDB validation data yet. " + "Set validation_data.out_format to 'deepmd/hdf5' when using " + "validation_data.format for automatic conversion." + ) # stat files stat_file_path_single = data_dict_single.get("stat_file", None) diff --git a/deepmd/pt/entrypoints/main.py b/deepmd/pt/entrypoints/main.py index 560ea5a1ba..8bd147697d 100644 --- a/deepmd/pt/entrypoints/main.py +++ b/deepmd/pt/entrypoints/main.py @@ -183,10 +183,24 @@ def prepare_trainer_input_single( def _make_dp_loader_set( systems: str | list[str], dataset_params: dict[str, Any], - ) -> DpLoaderSet: - """Create a DpLoaderSet from systems with pattern expansion.""" + ) -> DpLoaderSet | LmdbDataset: + """Create a dataset from systems with pattern expansion/conversion.""" patterns = dataset_params.get("rglob_patterns", None) - systems = process_systems(systems, patterns=patterns) + systems = process_systems( + systems, + patterns=patterns, + fmt=dataset_params.get("format", None), + out_fmt=dataset_params.get( + "out_format", dataset_params.get("output_format", None) + ), + ) + if len(systems) == 1 and is_lmdb(systems[0]): + return LmdbDataset( + systems[0], + model_params_single["type_map"], + dataset_params["batch_size"], + auto_prob_style=dataset_params.get("auto_prob", None), + ) return DpLoaderSet( systems, dataset_params["batch_size"], @@ -196,7 +210,11 @@ def _make_dp_loader_set( ) # LMDB path: single string → LmdbDataset - if isinstance(training_systems, str) and is_lmdb(training_systems): + if ( + training_dataset_params.get("format", None) is None + and isinstance(training_systems, str) + and is_lmdb(training_systems) + ): auto_prob = training_dataset_params.get("auto_prob", None) train_data_single = LmdbDataset( training_systems, @@ -206,6 +224,7 @@ def _make_dp_loader_set( ) if ( validation_systems is not None + and validation_dataset_params.get("format", None) is None and isinstance(validation_systems, str) and is_lmdb(validation_systems) ): @@ -397,23 +416,40 @@ def train( "Calculate neighbor statistics... (add --skip-neighbor-stat to skip this step)" ) - if not multi_task: - type_map = config["model"].get("type_map") - training_systems = config["training"]["training_data"].get("systems") + def _get_neighbor_stat_data_from_params( + dataset_params: dict[str, Any], + type_map: list[str] | None, + ) -> Any: + training_systems = dataset_params.get("systems") if ( - training_systems is not None + dataset_params.get("format", None) is None + and training_systems is not None and isinstance(training_systems, str) and is_lmdb(training_systems) ): + systems = [training_systems] + else: + systems = process_systems( + training_systems, + patterns=dataset_params.get("rglob_patterns", None), + fmt=dataset_params.get("format", None), + out_fmt=dataset_params.get( + "out_format", dataset_params.get("output_format", None) + ), + ) + if len(systems) == 1 and is_lmdb(systems[0]): from deepmd.dpmodel.utils.lmdb_data import ( make_neighbor_stat_data, ) - train_data = make_neighbor_stat_data(training_systems, type_map) - else: - train_data = get_data( - config["training"]["training_data"], 0, type_map, None - ) + return make_neighbor_stat_data(systems[0], type_map) + return get_data(dataset_params, 0, type_map, None) + + if not multi_task: + type_map = config["model"].get("type_map") + train_data = _get_neighbor_stat_data_from_params( + config["training"]["training_data"], type_map + ) config["model"], min_nbor_dist = BaseModel.update_sel( train_data, type_map, config["model"] ) @@ -421,26 +457,10 @@ def train( min_nbor_dist = {} for model_item in config["model"]["model_dict"]: type_map = config["model"]["model_dict"][model_item].get("type_map") - training_systems = config["training"]["data_dict"][model_item][ - "training_data" - ].get("systems") - if ( - training_systems is not None - and isinstance(training_systems, str) - and is_lmdb(training_systems) - ): - from deepmd.dpmodel.utils.lmdb_data import ( - make_neighbor_stat_data, - ) - - train_data = make_neighbor_stat_data(training_systems, type_map) - else: - train_data = get_data( - config["training"]["data_dict"][model_item]["training_data"], - 0, - type_map, - None, - ) + train_data = _get_neighbor_stat_data_from_params( + config["training"]["data_dict"][model_item]["training_data"], + type_map, + ) config["model"]["model_dict"][model_item], min_nbor_dist[model_item] = ( BaseModel.update_sel( train_data, type_map, config["model"]["model_dict"][model_item] diff --git a/deepmd/pt_expt/entrypoints/main.py b/deepmd/pt_expt/entrypoints/main.py index da28229bf4..38f16095f4 100644 --- a/deepmd/pt_expt/entrypoints/main.py +++ b/deepmd/pt_expt/entrypoints/main.py @@ -104,13 +104,31 @@ def _get_neighbor_stat_data( ``make_neighbor_stat_data``; falls back to the legacy ``get_data`` for npy/HDF5 directories. """ - lmdb_path = _detect_lmdb_path(dataset_params.get("systems")) + lmdb_path = ( + None + if dataset_params.get("format", None) is not None + else _detect_lmdb_path(dataset_params.get("systems")) + ) if lmdb_path is not None: from deepmd.dpmodel.utils.lmdb_data import ( make_neighbor_stat_data, ) return make_neighbor_stat_data(lmdb_path, type_map) + systems = process_systems( + dataset_params["systems"], + patterns=dataset_params.get("rglob_patterns", None), + fmt=dataset_params.get("format", None), + out_fmt=dataset_params.get( + "out_format", dataset_params.get("output_format", None) + ), + ) + if len(systems) == 1 and is_lmdb(systems[0]): + from deepmd.dpmodel.utils.lmdb_data import ( + make_neighbor_stat_data, + ) + + return make_neighbor_stat_data(systems[0], type_map) return get_data(dataset_params, 0, type_map, None) @@ -126,7 +144,11 @@ def _build_data_system( :class:`DeepmdDataSystem` path with system expansion. """ systems_raw = dataset_params["systems"] - lmdb_path = _detect_lmdb_path(systems_raw) + lmdb_path = ( + None + if dataset_params.get("format", None) is not None + else _detect_lmdb_path(systems_raw) + ) if lmdb_path is not None: return LmdbDataSystem( lmdb_path=lmdb_path, @@ -138,7 +160,19 @@ def _build_data_system( systems = process_systems( systems_raw, patterns=dataset_params.get("rglob_patterns", None), + fmt=dataset_params.get("format", None), + out_fmt=dataset_params.get( + "out_format", dataset_params.get("output_format", None) + ), ) + if len(systems) == 1 and is_lmdb(systems[0]): + return LmdbDataSystem( + lmdb_path=systems[0], + type_map=type_map, + batch_size=dataset_params["batch_size"], + auto_prob_style=dataset_params.get("auto_prob"), + seed=seed, + ) return DeepmdDataSystem( systems=systems, batch_size=dataset_params["batch_size"], diff --git a/deepmd/utils/argcheck.py b/deepmd/utils/argcheck.py index 48c711a10b..6bfab83e62 100644 --- a/deepmd/utils/argcheck.py +++ b/deepmd/utils/argcheck.py @@ -4745,6 +4745,19 @@ def training_data_args() -> list[ doc_patterns = ( "The customized patterns used in `rglob` to collect all training systems. " ) + doc_format = ( + "The input data format passed to dpdata for automatic conversion. " + "If this key is not set, `systems` must already point to DeePMD data. " + "If this key is set to a non-DeePMD format, each selected input path is " + "loaded by dpdata and converted before training. Use dpdata format names " + "such as `extxyz`, `ase/structure`, `ase/traj`, or `auto`." + ) + doc_out_format = ( + "The output data format passed to dpdata for automatic conversion. " + "When `format` requests conversion from a non-DeePMD format, this key " + "defaults to `lmdb`. Use a DeePMD format supported by dpdata, such as " + "`lmdb`, `deepmd/hdf5`, or `deepmd/npy`." + ) doc_batch_size = f'This key can be \n\n\ - list: the length of which is the same as the {link_sys}. The batch size of each system is given by the elements of the list.\n\n\ - int: all {link_sys} use the same batch size.\n\n\ @@ -4783,6 +4796,20 @@ def training_data_args() -> list[ default=None, doc=doc_patterns + doc_only_pt_supported, ), + Argument( + "format", + [str, None], + optional=True, + doc=doc_format, + ), + Argument( + "out_format", + [str, None], + optional=True, + default="lmdb", + doc=doc_out_format, + alias=["output_format"], + ), Argument( "batch_size", [list[int], int, str], @@ -4842,6 +4869,19 @@ def validation_data_args() -> list[ doc_patterns = ( "The customized patterns used in `rglob` to collect all validation systems. " ) + doc_format = ( + "The input data format passed to dpdata for automatic conversion. " + "If this key is not set, `systems` must already point to DeePMD data. " + "If this key is set to a non-DeePMD format, each selected input path is " + "loaded by dpdata and converted before validation. Use dpdata format names " + "such as `extxyz`, `ase/structure`, `ase/traj`, or `auto`." + ) + doc_out_format = ( + "The output data format passed to dpdata for automatic conversion. " + "When `format` requests conversion from a non-DeePMD format, this key " + "defaults to `lmdb`. Use a DeePMD format supported by dpdata, such as " + "`lmdb`, `deepmd/hdf5`, or `deepmd/npy`." + ) doc_batch_size = f'This key can be \n\n\ - list: the length of which is the same as the {link_sys}. The batch size of each system is given by the elements of the list.\n\n\ - int: all {link_sys} use the same batch size.\n\n\ @@ -4867,6 +4907,20 @@ def validation_data_args() -> list[ default=None, doc=doc_patterns + doc_only_pt_supported, ), + Argument( + "format", + [str, None], + optional=True, + doc=doc_format, + ), + Argument( + "out_format", + [str, None], + optional=True, + default="lmdb", + doc=doc_out_format, + alias=["output_format"], + ), Argument( "batch_size", [list[int], int, str], diff --git a/deepmd/utils/data_system.py b/deepmd/utils/data_system.py index 9d13cb4699..e6a795c616 100644 --- a/deepmd/utils/data_system.py +++ b/deepmd/utils/data_system.py @@ -1,10 +1,17 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import collections +import hashlib import logging +import os +import shutil +import time import warnings from functools import ( cached_property, ) +from pathlib import ( + Path, +) from typing import ( Any, ) @@ -30,6 +37,10 @@ log = logging.getLogger(__name__) +_DPDATA_CACHE_DIR = ".deepmd_dpdata_cache" +_DPDATA_DEFAULT_OUT_FORMAT = "lmdb" +_DPDATA_CONVERSION_CACHE: dict[tuple[str, str, str, str], list[str]] = {} + class DeepmdDataSystem: """Class for manipulating many data systems. @@ -672,6 +683,263 @@ def _check_type_map_consistency( return ret +class LmdbDataSystem: + """A DeepmdDataSystem-compatible adapter for LMDB datasets. + + The adapter returns raw DeePMD-style numpy batches (``type``, + ``natoms_vec``, ``default_mesh``) so it can be consumed by the legacy + TensorFlow/JAX training paths. Consumers that need the dpmodel canonical + format can still call ``normalize_batch`` on its output. + """ + + def __init__( + self, + lmdb_path: str, + type_map: list[str], + batch_size: int | str = "auto", + auto_prob_style: str | None = None, + seed: int | None = None, + ) -> None: + if not type_map: + raise ValueError( + "LMDB datasets require a non-empty model/type_map because " + "LMDB stores atom type indices and the training data adapter " + "must map them to element names." + ) + + from deepmd.dpmodel.utils.lmdb_data import ( + LmdbDataReader, + SameNlocBatchSampler, + compute_block_targets, + ) + + self.lmdb_path = lmdb_path + self._reader = LmdbDataReader( + lmdb_path, type_map, batch_size, mixed_batch=False + ) + self._type_map = list(type_map) + self.mixed_type = self._detect_mixed_type() + self.nsystems = 1 + self.system_dirs = [lmdb_path] + self.natoms = [max(self._reader.frame_nlocs) if self._reader.frame_nlocs else 0] + self.batch_size = [self._reader.batch_size] + self.nbatches = [self._reader.total_batch] + self.sys_probs = [1.0] + self.data_systems = [self] + self._nloc_set_indices = { + f"{self.lmdb_path}#nloc={nloc}": indices + for nloc, indices in sorted(self._reader.nloc_groups.items()) + } + self.dirs = list(self._nloc_set_indices) + self.pbc = self._detect_pbc() + self._data_dict = { + "box": { + "ndof": 9, + "atomic": False, + "must": False, + "high_prec": False, + "type_sel": None, + "repeat": 1, + "default": 0.0, + "dtype": None, + "output_natoms_for_type_sel": False, + }, + "coord": { + "ndof": 3, + "atomic": True, + "must": True, + "high_prec": False, + "type_sel": None, + "repeat": 1, + "default": 0.0, + "dtype": None, + "output_natoms_for_type_sel": False, + }, + "numb_copy": { + "ndof": 1, + "atomic": False, + "must": False, + "high_prec": False, + "type_sel": None, + "repeat": 1, + "default": 1, + "dtype": int, + "output_natoms_for_type_sel": False, + }, + } + + block_targets = None + if auto_prob_style is not None and self._reader.frame_system_ids is not None: + block_targets = compute_block_targets( + auto_prob_style, + self._reader.nsystems, + self._reader.system_nframes, + ) + self._sampler = SameNlocBatchSampler( + self._reader, + shuffle=True, + seed=seed, + block_targets=block_targets, + ) + self._iter = iter(self._sampler) + + def _detect_mixed_type(self) -> bool: + """Return True when frames cannot be represented as fixed-type data.""" + if len(self._reader.nloc_groups) > 1: + return True + if len(self._reader) == 0: + return False + ref_type = self._reader[0]["atype"] + for idx in range(1, len(self._reader)): + if not np.array_equal(self._reader[idx]["atype"], ref_type): + return True + return False + + def _detect_pbc(self) -> bool: + """Return True when LMDB frames contain a non-zero simulation box.""" + if len(self._reader) == 0: + return False + box = self._reader[0].get("box") + return box is not None and not np.allclose(box, 0.0) + + def add_data_requirements( + self, data_requirements: list[DataRequirementItem] + ) -> None: + """Add label/auxiliary data requirements.""" + for item in data_requirements: + self._data_dict[item.key] = item.dict + self._reader.add_data_requirement(data_requirements) + + def add_data_requirement(self, data_requirement: list[DataRequirementItem]) -> None: + """Alias used by DataLoader-style backends.""" + self.add_data_requirements(data_requirement) + + def add( + self, + key: str, + ndof: int, + atomic: bool = False, + must: bool = False, + high_prec: bool = False, + type_sel: list[int] | None = None, + repeat: int = 1, + default: float = 0.0, + dtype: np.dtype | None = None, + output_natoms_for_type_sel: bool = False, + ) -> None: + item = DataRequirementItem( + key, + ndof, + atomic=atomic, + must=must, + high_prec=high_prec, + type_sel=type_sel, + repeat=repeat, + default=default, + dtype=dtype, + output_natoms_for_type_sel=output_natoms_for_type_sel, + ) + self.add_data_requirements([item]) + + def get_data_dict(self, ii: int = 0) -> dict[str, dict[str, Any]]: + del ii + return self._data_dict + + def _load_set(self, set_name: str) -> dict[str, Any]: + """Load one same-nloc LMDB group for legacy neighbor-stat code.""" + indices = self._nloc_set_indices[str(set_name)] + frames = [self._reader[int(idx)] for idx in indices] + return self._stack_frames(frames) + + def _next_indices(self) -> list[int]: + try: + return next(self._iter) + except StopIteration: + self._iter = iter(self._sampler) + return next(self._iter) + + def _stack_frames(self, frames: list[dict[str, Any]]) -> dict[str, Any]: + out: dict[str, Any] = {} + structural_keys = {"coord", "box"} + for key in frames[0]: + if key in {"atype", "fid", "natoms", "real_natoms_vec"}: + continue + if key.startswith("find_") and key[5:] not in self._data_dict: + continue + if ( + not key.startswith("find_") + and key not in structural_keys + and key not in self._data_dict + ): + continue + if key.startswith("find_"): + out[key] = np.asarray(frames[0][key], dtype=np.float32) + elif frames[0][key] is None: + out[key] = None + else: + arr = np.stack([frame[key] for frame in frames]) + data_info = self._data_dict.get(key) + if data_info is not None and data_info["atomic"] and arr.ndim >= 3: + arr = arr.reshape(arr.shape[0], -1) + out[key] = arr + + atype = np.stack([frame["atype"] for frame in frames]).astype(np.int32) + real_natoms_vec = np.stack([frame["natoms"] for frame in frames]).astype( + np.int32 + ) + nloc = int(real_natoms_vec[:, 0].max()) + natoms_vec = np.concatenate( + ( + np.array([nloc, nloc], dtype=np.int32), + real_natoms_vec[:, 2:].max(axis=0).astype(np.int32), + ) + ) + + out["type"] = atype + out["natoms_vec"] = natoms_vec + out["real_natoms_vec"] = real_natoms_vec + if "box" not in out or out["box"] is None: + out["box"] = np.zeros((len(frames), 9), dtype=GLOBAL_NP_FLOAT_PRECISION) + out["find_box"] = np.float32(0.0) + elif "find_box" not in out: + out["find_box"] = np.float32(0.0 if np.allclose(out["box"], 0.0) else 1.0) + if "find_coord" not in out: + out["find_coord"] = np.float32(1.0) + if "numb_copy" not in out: + out["numb_copy"] = np.ones((len(frames), 1), dtype=np.int64) + out["find_numb_copy"] = np.float32(0.0) + out["default_mesh"] = np.asarray( + make_default_mesh(bool(float(out["find_box"]) > 0.5), self.mixed_type), + dtype=np.int32, + ) + return out + + def get_batch(self, sys_idx: int | None = None) -> dict[str, Any]: + del sys_idx + indices = self._next_indices() + frames = [self._reader[int(idx)] for idx in indices] + return self._stack_frames(frames) + + def get_nsystems(self) -> int: + return self.nsystems + + def get_natoms(self) -> int: + return self.natoms[0] + + def get_ntypes(self) -> int: + return len(self._type_map) + + def get_type_map(self) -> list[str]: + return self._type_map + + def get_batch_size(self) -> list[int]: + return self.batch_size + + def print_summary(self, name: str, prob: Any | None = None) -> None: + del prob + self._reader.print_summary(name, self.sys_probs) + + def _format_name_length(name: str, width: int) -> str: if len(name) <= width: return "{: >{}}".format(name, width) @@ -815,14 +1083,212 @@ def prob_sys_size_ext(keywords: str, nsystems: int, nbatch: int) -> list[float]: return sys_probs +def _is_deepmd_data_format(fmt: str) -> bool: + return fmt in { + "deepmd", + "deepmd/raw", + "deepmd/npy", + "deepmd/comp", + "deepmd/npy/mixed", + "deepmd/hdf5", + "lmdb", + } + + +def _looks_like_extxyz(path: Path) -> bool: + if not path.is_file(): + return False + try: + with path.open() as fp: + fp.readline() + comment = fp.readline() + except OSError: + return False + return "Properties=" in comment or "Lattice=" in comment + + +def _normalize_dpdata_format(fmt: str, source: Path) -> str: + fmt = fmt.lower() + if fmt == "ase": + return "ase/structure" + if fmt != "auto": + return fmt + suffix = source.suffix.lower().lstrip(".") + if suffix == "traj": + return "ase/traj" + if suffix == "extxyz" or (suffix == "xyz" and _looks_like_extxyz(source)): + return "extxyz" + return suffix or fmt + + +def _iter_conversion_inputs(path: str, patterns: list[str] | None) -> list[str]: + if patterns is None: + return [path] + root = Path(path) + if not root.is_dir(): + return [path] + matches = [] + for pattern in patterns: + matches.extend(str(match) for match in root.rglob(pattern)) + return sorted(set(matches)) + + +def _conversion_cache_path(source: Path, fmt: str, out_fmt: str) -> Path: + source_resolved = source.resolve(strict=False) + digest = hashlib.sha1(f"{source_resolved}|{fmt}|{out_fmt}".encode()).hexdigest()[ + :16 + ] + stem = source_resolved.stem or source_resolved.name or "dataset" + safe_out_fmt = out_fmt.replace("/", "-") + suffix = ".lmdb" if out_fmt == "lmdb" else "" + return Path.cwd() / _DPDATA_CACHE_DIR / f"{stem}-{safe_out_fmt}-{digest}{suffix}" + + +def _source_mtime(source: Path, cache_file: Path) -> float: + if source.is_file(): + return source.stat().st_mtime + if not source.is_dir(): + return 0.0 + cache_dir = cache_file.parent.resolve(strict=False) + latest = source.stat().st_mtime + for item in source.rglob("*"): + try: + item_resolved = item.resolve(strict=False) + if item_resolved == cache_file or cache_dir in item_resolved.parents: + continue + latest = max(latest, item.stat().st_mtime) + except OSError: + continue + return latest + + +def _is_conversion_current(source: Path, output: Path) -> bool: + if not output.exists(): + return False + return output.stat().st_mtime >= _source_mtime(source, output) + + +def _wait_for_conversion(source: Path, output: Path, lock_path: Path) -> bool: + for _ in range(300): + if not lock_path.exists(): + return _is_conversion_current(source, output) + if _is_conversion_current(source, output): + return True + time.sleep(1.0) + return False + + +def _remove_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + + +def _write_dpdata_conversion( + source: Path, fmt: str, out_fmt: str, output: Path +) -> None: + try: + import dpdata + except ImportError as exc: + raise ImportError( + "dpdata is required when training_data.format or " + "validation_data.format is specified. Install dpdata to enable " + "automatic dataset conversion." + ) from exc + + tmp_output = output.with_name(f".{output.name}.{os.getpid()}.tmp") + _remove_path(tmp_output) + try: + multi_systems = dpdata.MultiSystems() + try: + multi_systems.load_systems_from_file(str(source), fmt=fmt) + except NotImplementedError: + labeled_system = dpdata.LabeledSystem(str(source), fmt=fmt) + multi_systems = dpdata.MultiSystems(labeled_system) + if len(multi_systems) == 0: + raise RuntimeError(f"No frames were loaded by dpdata from {source}") + multi_systems.to(out_fmt, str(tmp_output)) + _remove_path(output) + os.replace(tmp_output, output) + except Exception: + _remove_path(tmp_output) + raise + + +def _convert_system_by_dpdata( + source_path: str, fmt: str, out_fmt: str | None +) -> list[str]: + if out_fmt is None: + out_fmt = _DPDATA_DEFAULT_OUT_FORMAT + source = Path(source_path) + fmt = _normalize_dpdata_format(fmt, source) + out_fmt = out_fmt.lower() + cache_key = ( + str(Path.cwd().resolve(strict=False)), + str(source.resolve(strict=False)), + fmt, + out_fmt, + ) + if cache_key in _DPDATA_CONVERSION_CACHE: + return _DPDATA_CONVERSION_CACHE[cache_key] + + output = _conversion_cache_path(source, fmt, out_fmt) + output.parent.mkdir(parents=True, exist_ok=True) + lock_path = output.with_suffix(output.suffix + ".lock") + if not _is_conversion_current(source, output): + while True: + try: + lock_fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + except FileExistsError: + if _wait_for_conversion(source, output, lock_path): + break + raise TimeoutError( + f"Timed out waiting for dpdata conversion lock {lock_path}" + ) from None + else: + with os.fdopen(lock_fd, "w") as fp: + fp.write(str(os.getpid())) + try: + if not _is_conversion_current(source, output): + log.info( + "Converting %s from dpdata format %s to %s at %s", + source, + fmt, + out_fmt, + output, + ) + _write_dpdata_conversion(source, fmt, out_fmt, output) + finally: + try: + lock_path.unlink() + except FileNotFoundError: + pass + break + + if out_fmt == "lmdb": + converted_systems = [str(output)] + else: + converted_systems = expand_sys_str(str(output)) + if not converted_systems: + raise RuntimeError(f"No DeePMD systems were found in converted file {output}") + _DPDATA_CONVERSION_CACHE[cache_key] = converted_systems + return converted_systems + + def process_systems( - systems: str | list[str], patterns: list[str] | None = None + systems: str | list[str], + patterns: list[str] | None = None, + fmt: str | None = None, + out_fmt: str | None = None, ) -> list[str]: """Process the user-input systems. If it is a single directory, search for all the systems in the directory. If it is a list, each item in the list is treated as a directory to search. If it is a single LMDB path, return it directly without expansion. + If fmt is specified and is not a DeePMD data format, each input path is + converted by dpdata and the converted systems are returned. Check if the systems are valid. Parameters @@ -831,20 +1297,17 @@ def process_systems( The user-input systems patterns : list of str, optional The patterns to match the systems, by default None + fmt : str, optional + The dpdata input format. If None, no conversion is performed. + out_fmt : str, optional + The dpdata output format. If None, ``lmdb`` is used when fmt triggers + conversion. Returns ------- result_systems: list of str The valid systems """ - from deepmd.dpmodel.utils.lmdb_data import ( - is_lmdb, - ) - - # LMDB path: return directly without expansion - if isinstance(systems, str) and is_lmdb(systems): - return [systems] - # Normalize input to a list of paths to search if isinstance(systems, str): search_paths = [systems] @@ -856,15 +1319,31 @@ def process_systems( f"Invalid systems type: {type(systems)}. Must be str or list[str]." ) + if fmt is not None: + fmt = fmt.lower() + if _is_deepmd_data_format(fmt): + fmt = None + + from deepmd.dpmodel.utils.lmdb_data import ( + is_lmdb, + ) + # Iterate over the search_paths list and apply expansion logic to each path result_systems = [] for path in search_paths: - if patterns is None: + if fmt is not None: + for input_path in _iter_conversion_inputs(path, patterns): + result_systems.extend( + _convert_system_by_dpdata(input_path, fmt, out_fmt) + ) + elif is_lmdb(path): + result_systems.append(path) + elif patterns is None: expanded_paths = expand_sys_str(path) + result_systems.extend(expanded_paths) else: expanded_paths = rglob_sys_str(path, patterns) - - result_systems.extend(expanded_paths) + result_systems.extend(expanded_paths) return result_systems @@ -875,7 +1354,7 @@ def get_data( type_map: list[str] | None, modifier: Any | None, multi_task_mode: bool = False, -) -> DeepmdDataSystem: +) -> DeepmdDataSystem | LmdbDataSystem: """Get the data system. Parameters @@ -898,13 +1377,35 @@ def get_data( """ systems = jdata["systems"] rglob_patterns = jdata.get("rglob_patterns", None) - systems = process_systems(systems, patterns=rglob_patterns) + data_format = jdata.get("format", None) + out_format = jdata.get("out_format", jdata.get("output_format", None)) + systems = process_systems( + systems, patterns=rglob_patterns, fmt=data_format, out_fmt=out_format + ) batch_size = jdata["batch_size"] sys_probs = jdata.get("sys_probs", None) auto_prob = jdata.get("auto_prob", "prob_sys_size") optional_type_map = not multi_task_mode + from deepmd.dpmodel.utils.lmdb_data import ( + is_lmdb, + ) + + if len(systems) == 1 and is_lmdb(systems[0]): + if type_map is None: + raise ValueError( + "LMDB training data requires model/type_map to be set. " + "Set model/type_map or choose training_data.out_format=" + "'deepmd/hdf5' for automatic conversion." + ) + return LmdbDataSystem( + lmdb_path=systems[0], + type_map=type_map, + batch_size=batch_size, + auto_prob_style=auto_prob, + ) + data = DeepmdDataSystem( systems=systems, batch_size=batch_size, diff --git a/pyproject.toml b/pyproject.toml index 35fc0fdb18..37312b0a3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ dependencies = [ 'array-api-compat', 'lmdb', 'msgpack', + 'dpdata>=1.0.1', ] requires-python = ">=3.10" keywords = ["deepmd"] @@ -81,7 +82,6 @@ repository = "https://github.com/deepmodeling/deepmd-kit" # which can be read by the build backend. [tool.deepmd_build_backend.optional-dependencies] test = [ - "dpdata>=0.2.7", # ASE issue: https://gitlab.com/ase/ase/-/merge_requests/2843 # fixed in 3.23.0 "ase>=3.23.0", diff --git a/source/tests/common/test_data_system_conversion.py b/source/tests/common/test_data_system_conversion.py new file mode 100644 index 0000000000..37b8b7bddf --- /dev/null +++ b/source/tests/common/test_data_system_conversion.py @@ -0,0 +1,205 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +import os +import sys +import tempfile +import types +import unittest +from pathlib import ( + Path, +) +from typing import ( + ClassVar, +) +from unittest.mock import ( + patch, +) + +import h5py +import lmdb +import msgpack +import numpy as np + +from deepmd.dpmodel.utils.lmdb_data import ( + is_lmdb, +) +from deepmd.utils import ( + data_system, +) +from deepmd.utils.data_system import ( + LmdbDataSystem, + get_data, + process_systems, +) + + +def _write_minimal_deepmd_hdf5(file_name: str) -> None: + with h5py.File(file_name, "w") as fp: + system = fp.create_group("H") + system.create_dataset("type.raw", data=np.array([0], dtype=np.int32)) + string_dtype = h5py.string_dtype(encoding="utf-8") + system.create_dataset("type_map.raw", data=np.array(["H"], dtype=string_dtype)) + set_dir = system.create_group("set.000") + set_dir.create_dataset("coord.npy", data=np.zeros((1, 3), dtype=np.float32)) + set_dir.create_dataset( + "box.npy", data=np.eye(3, dtype=np.float32).reshape(1, 9) + ) + + +def _encode_array(arr: np.ndarray) -> dict: + return { + "type": str(arr.dtype), + "shape": list(arr.shape), + "data": arr.tobytes(), + } + + +def _write_minimal_lmdb(path: str) -> None: + env = lmdb.open(path, map_size=10 * 1024 * 1024) + frame = { + "atom_names": ["H"], + "atom_numbs": [1], + "atom_types": _encode_array(np.array([0], dtype=np.int64)), + "cells": _encode_array(np.eye(3, dtype=np.float64) * 8.0), + "coords": _encode_array(np.zeros((1, 3), dtype=np.float64)), + "energies": _encode_array(np.array([0.0], dtype=np.float64)), + "forces": _encode_array(np.zeros((1, 3), dtype=np.float64)), + } + metadata = { + "nframes": 1, + "frame_idx_fmt": "012d", + "type_map": ["H"], + "system_info": { + "formula": "H", + "natoms": [1], + "nframes": 1, + }, + } + with env.begin(write=True) as txn: + txn.put(b"__metadata__", msgpack.packb(metadata, use_bin_type=True)) + txn.put(b"000000000000", msgpack.packb(frame, use_bin_type=True)) + env.close() + + +class _FakeMultiSystems: + write_count = 0 + load_calls: ClassVar[list[tuple[str, str]]] = [] + + def __init__(self, *systems) -> None: + self.systems = list(systems) + self.loaded = False + + def load_systems_from_file(self, file_name: str, fmt: str): + self.load_calls.append((file_name, fmt)) + self.loaded = True + return self + + def __len__(self) -> int: + return 1 if self.loaded or self.systems else 0 + + def to(self, fmt: str, file_name: str) -> None: + type(self).write_count += 1 + if fmt == "deepmd/hdf5": + _write_minimal_deepmd_hdf5(file_name) + elif fmt == "lmdb": + _write_minimal_lmdb(file_name) + else: + raise AssertionError(fmt) + + +class _FakeLabeledSystem: + def __init__(self, file_name: str, fmt: str) -> None: + self.file_name = file_name + self.fmt = fmt + + +class TestDpdataFormatConversion(unittest.TestCase): + def setUp(self) -> None: + self.tmpdir = tempfile.TemporaryDirectory() + self.root = Path(self.tmpdir.name) + self.old_cwd = Path.cwd() + os.chdir(self.root) + self.source = self.root / "data.extxyz" + self.source.write_text("1\nProperties=species:S:1:pos:R:3\nH 0 0 0\n") + _FakeMultiSystems.write_count = 0 + _FakeMultiSystems.load_calls = [] + data_system._DPDATA_CONVERSION_CACHE.clear() + self.fake_dpdata = types.SimpleNamespace( + MultiSystems=_FakeMultiSystems, + LabeledSystem=_FakeLabeledSystem, + ) + + def tearDown(self) -> None: + os.chdir(self.old_cwd) + self.tmpdir.cleanup() + data_system._DPDATA_CONVERSION_CACHE.clear() + + def test_process_systems_defaults_to_lmdb_and_reuses_cache(self) -> None: + with patch.dict(sys.modules, {"dpdata": self.fake_dpdata}): + systems = process_systems(str(self.source), fmt="extxyz") + systems_again = process_systems(str(self.source), fmt="extxyz") + + self.assertEqual(systems, systems_again) + self.assertEqual(_FakeMultiSystems.write_count, 1) + self.assertEqual(_FakeMultiSystems.load_calls, [(str(self.source), "extxyz")]) + self.assertEqual(len(systems), 1) + self.assertTrue(systems[0].endswith(".lmdb")) + self.assertTrue(is_lmdb(systems[0])) + self.assertTrue(Path(systems[0]).is_relative_to(self.root)) + self.assertEqual(Path(systems[0]).parent, self.root / ".deepmd_dpdata_cache") + + def test_process_systems_cache_is_scoped_to_cwd(self) -> None: + other_cwd = self.root / "run2" + other_cwd.mkdir() + + with patch.dict(sys.modules, {"dpdata": self.fake_dpdata}): + systems = process_systems(str(self.source), fmt="extxyz") + os.chdir(other_cwd) + systems_other = process_systems(str(self.source), fmt="extxyz") + + self.assertNotEqual(systems, systems_other) + self.assertEqual(_FakeMultiSystems.write_count, 2) + self.assertEqual(Path(systems[0]).parent, self.root / ".deepmd_dpdata_cache") + self.assertEqual( + Path(systems_other[0]).parent, + other_cwd / ".deepmd_dpdata_cache", + ) + + def test_process_systems_converts_to_explicit_hdf5(self) -> None: + with patch.dict(sys.modules, {"dpdata": self.fake_dpdata}): + systems = process_systems( + str(self.source), fmt="extxyz", out_fmt="deepmd/hdf5" + ) + + self.assertEqual(_FakeMultiSystems.write_count, 1) + self.assertEqual(_FakeMultiSystems.load_calls, [(str(self.source), "extxyz")]) + self.assertEqual(len(systems), 1) + self.assertTrue(systems[0].endswith("#/H")) + + def test_get_data_uses_format_conversion(self) -> None: + with patch.dict(sys.modules, {"dpdata": self.fake_dpdata}): + data = get_data( + { + "systems": str(self.source), + "format": "auto", + "batch_size": 1, + }, + 0.0, + ["H"], + None, + ) + + self.assertEqual(data.get_nsystems(), 1) + self.assertIsInstance(data, LmdbDataSystem) + self.assertEqual(_FakeMultiSystems.load_calls, [(str(self.source), "extxyz")]) + batch = data.get_batch() + self.assertIn("type", batch) + self.assertIn("natoms_vec", batch) + self.assertEqual(batch["coord"].shape, (1, 3)) + self.assertEqual(data.data_systems, [data]) + stat_set = data._load_set(data.dirs[0]) + self.assertEqual(stat_set["coord"].shape, (1, 3)) + self.assertEqual(stat_set["type"].shape, (1, 1)) + + +if __name__ == "__main__": + unittest.main()