From 417da943062d64cf48c91ec5e5052e934be7cca7 Mon Sep 17 00:00:00 2001 From: caic99 Date: Wed, 27 Nov 2024 18:35:07 +0800 Subject: [PATCH 01/13] refactor: simplify dataset construction --- deepmd/pt/utils/dataloader.py | 104 ++++++++++++++-------------------- deepmd/utils/path.py | 3 + 2 files changed, 46 insertions(+), 61 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 9920622792..59dc2779fa 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -1,11 +1,16 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import logging import os -import queue import time -from multiprocessing.dummy import ( +from functools import ( + partial, +) +from multiprocessing import ( Pool, ) +from queue import ( + Queue, +) from threading import ( Thread, ) @@ -51,6 +56,11 @@ def setup_seed(seed) -> None: torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True +def construct_dataset(system, type_map): + return DeepmdDataSetForLoader( + system=system, + type_map=type_map, + ) class DpLoaderSet(Dataset): """A dataset for storing DataLoaders to multiple Systems. @@ -87,11 +97,7 @@ def __init__( if len(systems) >= 100: log.info(f"Constructing DataLoaders from {len(systems)} systems") - def construct_dataset(system): - return DeepmdDataSetForLoader( - system=system, - type_map=type_map, - ) + construct_dataset_systems=partial(construct_dataset, type_map=type_map) with Pool( os.cpu_count() @@ -101,7 +107,7 @@ def construct_dataset(system): else 1 ) ) as pool: - self.systems = pool.map(construct_dataset, systems) + self.systems = pool.map(construct_dataset_systems, systems) self.sampler_list: list[DistributedSampler] = [] self.index = [] @@ -185,85 +191,61 @@ def print_summary( name: str, prob: list[float], ) -> None: - print_summary( - name, - len(self.systems), - [ss.system for ss in self.systems], - [ss._natoms for ss in self.systems], - self.batch_sizes, - [ - ss._data_system.get_sys_numb_batch(self.batch_sizes[ii]) - for ii, ss in enumerate(self.systems) - ], - prob, - [ss._data_system.pbc for ss in self.systems], - ) - - -_sentinel = object() -QUEUESIZE = 32 + rank = dist.get_rank() if dist.is_initialized() else 0 + if rank == 0: + print_summary( + name, + len(self.systems), + [ss.system for ss in self.systems], + [ss._natoms for ss in self.systems], + self.batch_sizes, + [ + ss._data_system.get_sys_numb_batch(self.batch_sizes[ii]) + for ii, ss in enumerate(self.systems) + ], + prob, + [ss._data_system.pbc for ss in self.systems], + ) class BackgroundConsumer(Thread): - def __init__(self, queue, source, max_len) -> None: - Thread.__init__(self) + def __init__(self, queue, source) -> None: + super().__init__() + self.daemon = True self._queue = queue self._source = source # Main DL iterator - self._max_len = max_len # def run(self) -> None: for item in self._source: self._queue.put(item) # Blocking if the queue is full - # Signal the consumer we are done. - self._queue.put(_sentinel) + # Signal the consumer we are done; this should not happen for DataLoader + self._queue.put(StopIteration) +QUEUESIZE = 32 class BufferedIterator: def __init__(self, iterable) -> None: - self._queue = queue.Queue(QUEUESIZE) + self._queue = Queue(QUEUESIZE) self._iterable = iterable - self._consumer = None - - self.start_time = time.time() - self.warning_time = None - self.total = len(iterable) - - def _create_consumer(self) -> None: - self._consumer = BackgroundConsumer(self._queue, self._iterable, self.total) - self._consumer.daemon = True + self._consumer = BackgroundConsumer(self._queue, self._iterable) self._consumer.start() + self.len = len(iterable) def __iter__(self): return self def __len__(self) -> int: - return self.total + return self.len def __next__(self): - # Create consumer if not created yet - if self._consumer is None: - self._create_consumer() - # Notify the user if there is a data loading bottleneck - if self._queue.qsize() < min(2, max(1, self._queue.maxsize // 2)): - if time.time() - self.start_time > 5 * 60: - if ( - self.warning_time is None - or time.time() - self.warning_time > 15 * 60 - ): - log.warning( - "Data loading buffer is empty or nearly empty. This may " - "indicate a data loading bottleneck, and increasing the " - "number of workers (--num-workers) may help." - ) - self.warning_time = time.time() - - # Get next example + start_wait = time.time() item = self._queue.get() + wait_time = time.time() - start_wait + if wait_time > 1.0: # Even for Multi-Task training, each step usually takes < 1s + log.warning(f"Data loading is slow, waited {wait_time:.2f} seconds.") if isinstance(item, Exception): raise item - if item is _sentinel: - raise StopIteration return item diff --git a/deepmd/utils/path.py b/deepmd/utils/path.py index 6c52caac1d..d9853c33c7 100644 --- a/deepmd/utils/path.py +++ b/deepmd/utils/path.py @@ -44,6 +44,9 @@ def __new__(cls, path: str, mode: str = "r"): raise FileNotFoundError(f"{path} not found") return super().__new__(cls) + def __getnewargs__(self): + return (self.path, self.mode) + @abstractmethod def load_numpy(self) -> np.ndarray: """Load NumPy array. From e9672d079661b8141776d89636a1e212a972d0f9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 10:38:03 +0000 Subject: [PATCH 02/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pt/utils/dataloader.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 59dc2779fa..7e7e2cb12d 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -56,12 +56,14 @@ def setup_seed(seed) -> None: torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True + def construct_dataset(system, type_map): return DeepmdDataSetForLoader( system=system, type_map=type_map, ) + class DpLoaderSet(Dataset): """A dataset for storing DataLoaders to multiple Systems. @@ -97,7 +99,7 @@ def __init__( if len(systems) >= 100: log.info(f"Constructing DataLoaders from {len(systems)} systems") - construct_dataset_systems=partial(construct_dataset, type_map=type_map) + construct_dataset_systems = partial(construct_dataset, type_map=type_map) with Pool( os.cpu_count() @@ -222,8 +224,10 @@ def run(self) -> None: # Signal the consumer we are done; this should not happen for DataLoader self._queue.put(StopIteration) + QUEUESIZE = 32 + class BufferedIterator: def __init__(self, iterable) -> None: self._queue = Queue(QUEUESIZE) @@ -242,7 +246,9 @@ def __next__(self): start_wait = time.time() item = self._queue.get() wait_time = time.time() - start_wait - if wait_time > 1.0: # Even for Multi-Task training, each step usually takes < 1s + if ( + wait_time > 1.0 + ): # Even for Multi-Task training, each step usually takes < 1s log.warning(f"Data loading is slow, waited {wait_time:.2f} seconds.") if isinstance(item, Exception): raise item From a4a36c1f7c42c75e7512cf3e21ebe7b3987e8f06 Mon Sep 17 00:00:00 2001 From: caic99 Date: Wed, 27 Nov 2024 21:44:54 +0800 Subject: [PATCH 03/13] fix exception handling --- deepmd/pt/utils/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 7e7e2cb12d..e5b41a7acf 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -250,7 +250,7 @@ def __next__(self): wait_time > 1.0 ): # Even for Multi-Task training, each step usually takes < 1s log.warning(f"Data loading is slow, waited {wait_time:.2f} seconds.") - if isinstance(item, Exception): + if issubclass(item, Exception): raise item return item From ee9d8f8545a481ebf3b78ffe1e723c909d20eeec Mon Sep 17 00:00:00 2001 From: caic99 Date: Wed, 27 Nov 2024 21:45:13 +0800 Subject: [PATCH 04/13] refactor getdata --- deepmd/pt/train/training.py | 54 +++++++++---------------------------- 1 file changed, 13 insertions(+), 41 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index f74c4769bf..d2df3e1a13 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1053,47 +1053,19 @@ def save_model(self, save_path, lr=0.0, step=0) -> None: checkpoint_files[0].unlink() def get_data(self, is_train=True, task_key="Default"): - if not self.multi_task: - if is_train: - try: - batch_data = next(iter(self.training_data)) - except StopIteration: - # Refresh the status of the dataloader to start from a new epoch - with torch.device("cpu"): - self.training_data = BufferedIterator( - iter(self.training_dataloader) - ) - batch_data = next(iter(self.training_data)) - else: - if self.validation_data is None: - return {}, {}, {} - try: - batch_data = next(iter(self.validation_data)) - except StopIteration: - self.validation_data = BufferedIterator( - iter(self.validation_dataloader) - ) - batch_data = next(iter(self.validation_data)) - else: - if is_train: - try: - batch_data = next(iter(self.training_data[task_key])) - except StopIteration: - # Refresh the status of the dataloader to start from a new epoch - self.training_data[task_key] = BufferedIterator( - iter(self.training_dataloader[task_key]) - ) - batch_data = next(iter(self.training_data[task_key])) - else: - if self.validation_data[task_key] is None: - return {}, {}, {} - try: - batch_data = next(iter(self.validation_data[task_key])) - except StopIteration: - self.validation_data[task_key] = BufferedIterator( - iter(self.validation_dataloader[task_key]) - ) - batch_data = next(iter(self.validation_data[task_key])) + data, dataloader = (self.training_data, self.training_dataloader) \ + if is_train else (self.validation_data, self.validation_dataloader) + if data is None and not is_train: + return {}, {}, {} + if self.multi_task: + data=data[task_key] + dataloader=dataloader[task_key] + try: + batch_data = next(iter(data)) + except StopIteration: + # Refresh the status of the dataloader to start from a new epoch + data = BufferedIterator(iter(dataloader)) + batch_data = next(iter(data)) for key in batch_data.keys(): if key == "sid" or key == "fid" or key == "box" or "find_" in key: From 3895c94af24048ecee8b898dd8bf715836372a6b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:48:49 +0000 Subject: [PATCH 05/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pt/train/training.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index d2df3e1a13..de1bc50360 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1053,13 +1053,16 @@ def save_model(self, save_path, lr=0.0, step=0) -> None: checkpoint_files[0].unlink() def get_data(self, is_train=True, task_key="Default"): - data, dataloader = (self.training_data, self.training_dataloader) \ - if is_train else (self.validation_data, self.validation_dataloader) + data, dataloader = ( + (self.training_data, self.training_dataloader) + if is_train + else (self.validation_data, self.validation_dataloader) + ) if data is None and not is_train: return {}, {}, {} if self.multi_task: - data=data[task_key] - dataloader=dataloader[task_key] + data = data[task_key] + dataloader = dataloader[task_key] try: batch_data = next(iter(data)) except StopIteration: From ddc39de7e4eefd0a492a8173c918ae3dad46f3e4 Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 28 Nov 2024 11:13:51 +0800 Subject: [PATCH 06/13] fix ut --- deepmd/pt/utils/dataloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index e5b41a7acf..66889fcbdb 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -222,7 +222,7 @@ def run(self) -> None: self._queue.put(item) # Blocking if the queue is full # Signal the consumer we are done; this should not happen for DataLoader - self._queue.put(StopIteration) + self._queue.put(StopIteration()) QUEUESIZE = 32 @@ -250,7 +250,7 @@ def __next__(self): wait_time > 1.0 ): # Even for Multi-Task training, each step usually takes < 1s log.warning(f"Data loading is slow, waited {wait_time:.2f} seconds.") - if issubclass(item, Exception): + if isinstance(item, Exception): raise item return item From 12daca1b9970efb9cce3b224b87f8090495b07ed Mon Sep 17 00:00:00 2001 From: Chun Cai Date: Thu, 28 Nov 2024 15:45:09 +0800 Subject: [PATCH 07/13] fix multi task validation Co-authored-by: Han Wang <92130845+wanghan-iapcm@users.noreply.github.com> Signed-off-by: Chun Cai --- deepmd/pt/train/training.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index c5f11d6896..f60f165cae 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1058,11 +1058,11 @@ def get_data(self, is_train=True, task_key="Default"): if is_train else (self.validation_data, self.validation_dataloader) ) - if data is None and not is_train: - return {}, {}, {} if self.multi_task: data = data[task_key] dataloader = dataloader[task_key] + if data is None and not is_train: + return {}, {}, {} try: batch_data = next(iter(data)) except StopIteration: From a1506afe56b6ad60657143be68cf638568fc5163 Mon Sep 17 00:00:00 2001 From: caic99 Date: Thu, 28 Nov 2024 19:21:02 +0800 Subject: [PATCH 08/13] refactor: implement __getnewargs__ method for DPPath subclasses --- deepmd/utils/path.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/deepmd/utils/path.py b/deepmd/utils/path.py index d9853c33c7..aed7d0b73d 100644 --- a/deepmd/utils/path.py +++ b/deepmd/utils/path.py @@ -44,9 +44,6 @@ def __new__(cls, path: str, mode: str = "r"): raise FileNotFoundError(f"{path} not found") return super().__new__(cls) - def __getnewargs__(self): - return (self.path, self.mode) - @abstractmethod def load_numpy(self) -> np.ndarray: """Load NumPy array. @@ -116,6 +113,10 @@ def is_file(self) -> bool: def is_dir(self) -> bool: """Check if self is directory.""" + @abstractmethod + def __getnewargs__(self): + """Return the arguments to be passed to __new__ when unpickling an instance.""" + @abstractmethod def __truediv__(self, key: str) -> "DPPath": """Used for / operator.""" @@ -174,6 +175,9 @@ def __init__(self, path: str, mode: str = "r") -> None: else: self.path = Path(path) + def __getnewargs__(self): + return (self.path, self.mode) + def load_numpy(self) -> np.ndarray: """Load NumPy array. @@ -307,6 +311,9 @@ def __init__(self, path: str, mode: str = "r") -> None: # h5 path: default is the root path self._name = s[1] if len(s) > 1 else "/" + def __getnewargs__(self): + return (self.root_path, self.mode) + @classmethod @lru_cache(None) def _load_h5py(cls, path: str, mode: str = "r") -> h5py.File: From 91d2b9c995b45e03af6ff3109d8c1734b4cc3f76 Mon Sep 17 00:00:00 2001 From: Chun Cai Date: Fri, 29 Nov 2024 10:13:44 +0800 Subject: [PATCH 09/13] Update deepmd/pt/train/training.py Signed-off-by: Chun Cai --- deepmd/pt/train/training.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index f60f165cae..5124468ab5 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1067,7 +1067,8 @@ def get_data(self, is_train=True, task_key="Default"): batch_data = next(iter(data)) except StopIteration: # Refresh the status of the dataloader to start from a new epoch - data = BufferedIterator(iter(dataloader)) + with torch.device("cpu"): + data = BufferedIterator(iter(dataloader)) batch_data = next(iter(data)) for key in batch_data.keys(): From a9108a71acd3d8419d9d08fa20872492dc5b9d29 Mon Sep 17 00:00:00 2001 From: caic99 Date: Fri, 29 Nov 2024 10:19:00 +0800 Subject: [PATCH 10/13] revert changes on `get_data` --- deepmd/pt/train/training.py | 58 ++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/deepmd/pt/train/training.py b/deepmd/pt/train/training.py index 5124468ab5..af6e48191d 100644 --- a/deepmd/pt/train/training.py +++ b/deepmd/pt/train/training.py @@ -1053,23 +1053,47 @@ def save_model(self, save_path, lr=0.0, step=0) -> None: checkpoint_files[0].unlink() def get_data(self, is_train=True, task_key="Default"): - data, dataloader = ( - (self.training_data, self.training_dataloader) - if is_train - else (self.validation_data, self.validation_dataloader) - ) - if self.multi_task: - data = data[task_key] - dataloader = dataloader[task_key] - if data is None and not is_train: - return {}, {}, {} - try: - batch_data = next(iter(data)) - except StopIteration: - # Refresh the status of the dataloader to start from a new epoch - with torch.device("cpu"): - data = BufferedIterator(iter(dataloader)) - batch_data = next(iter(data)) + if not self.multi_task: + if is_train: + try: + batch_data = next(iter(self.training_data)) + except StopIteration: + # Refresh the status of the dataloader to start from a new epoch + with torch.device("cpu"): + self.training_data = BufferedIterator( + iter(self.training_dataloader) + ) + batch_data = next(iter(self.training_data)) + else: + if self.validation_data is None: + return {}, {}, {} + try: + batch_data = next(iter(self.validation_data)) + except StopIteration: + self.validation_data = BufferedIterator( + iter(self.validation_dataloader) + ) + batch_data = next(iter(self.validation_data)) + else: + if is_train: + try: + batch_data = next(iter(self.training_data[task_key])) + except StopIteration: + # Refresh the status of the dataloader to start from a new epoch + self.training_data[task_key] = BufferedIterator( + iter(self.training_dataloader[task_key]) + ) + batch_data = next(iter(self.training_data[task_key])) + else: + if self.validation_data[task_key] is None: + return {}, {}, {} + try: + batch_data = next(iter(self.validation_data[task_key])) + except StopIteration: + self.validation_data[task_key] = BufferedIterator( + iter(self.validation_dataloader[task_key]) + ) + batch_data = next(iter(self.validation_data[task_key])) for key in batch_data.keys(): if key == "sid" or key == "fid" or key == "box" or "find_" in key: From 68dd750b4cde9dd3d286274935e15f7d886f6dba Mon Sep 17 00:00:00 2001 From: caic99 Date: Mon, 2 Dec 2024 11:45:30 +0800 Subject: [PATCH 11/13] restrict the number of warnings by an interval of 15min --- deepmd/pt/utils/dataloader.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index 66889fcbdb..b1a92b15ea 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -234,6 +234,7 @@ def __init__(self, iterable) -> None: self._iterable = iterable self._consumer = BackgroundConsumer(self._queue, self._iterable) self._consumer.start() + self.last_warning_time = time.time() self.len = len(iterable) def __iter__(self): @@ -247,9 +248,12 @@ def __next__(self): item = self._queue.get() wait_time = time.time() - start_wait if ( - wait_time > 1.0 + wait_time > 1.0 and start_wait - self.last_warning_time > 15 * 60 ): # Even for Multi-Task training, each step usually takes < 1s - log.warning(f"Data loading is slow, waited {wait_time:.2f} seconds.") + log.warning( + f"Data loading is slow, waited {wait_time:.2f} seconds. Ignoring this warning for 15 minutes." + ) + self.last_warning_time = start_wait if isinstance(item, Exception): raise item return item From 5a454d5d4487b6bd4225570c2d236d37439217a4 Mon Sep 17 00:00:00 2001 From: caic99 Date: Mon, 2 Dec 2024 18:01:19 +0800 Subject: [PATCH 12/13] revert changes on using process for no significant improvement --- deepmd/pt/utils/dataloader.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index b1a92b15ea..c3c7d04d8c 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -2,10 +2,8 @@ import logging import os import time -from functools import ( - partial, -) -from multiprocessing import ( + +from multiprocessing.dummy import ( Pool, ) from queue import ( @@ -57,13 +55,6 @@ def setup_seed(seed) -> None: torch.backends.cudnn.deterministic = True -def construct_dataset(system, type_map): - return DeepmdDataSetForLoader( - system=system, - type_map=type_map, - ) - - class DpLoaderSet(Dataset): """A dataset for storing DataLoaders to multiple Systems. @@ -99,7 +90,11 @@ def __init__( if len(systems) >= 100: log.info(f"Constructing DataLoaders from {len(systems)} systems") - construct_dataset_systems = partial(construct_dataset, type_map=type_map) + def construct_dataset(system): + return DeepmdDataSetForLoader( + system=system, + type_map=type_map, + ) with Pool( os.cpu_count() @@ -109,7 +104,7 @@ def __init__( else 1 ) ) as pool: - self.systems = pool.map(construct_dataset_systems, systems) + self.systems = pool.map(construct_dataset, systems) self.sampler_list: list[DistributedSampler] = [] self.index = [] @@ -235,13 +230,12 @@ def __init__(self, iterable) -> None: self._consumer = BackgroundConsumer(self._queue, self._iterable) self._consumer.start() self.last_warning_time = time.time() - self.len = len(iterable) def __iter__(self): return self def __len__(self) -> int: - return self.len + return len(self._iterable) def __next__(self): start_wait = time.time() From a199c44cf80defaffa061c5a5a5faa3d6a84cdd9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 2 Dec 2024 10:03:05 +0000 Subject: [PATCH 13/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deepmd/pt/utils/dataloader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/deepmd/pt/utils/dataloader.py b/deepmd/pt/utils/dataloader.py index c3c7d04d8c..b32448d8ef 100644 --- a/deepmd/pt/utils/dataloader.py +++ b/deepmd/pt/utils/dataloader.py @@ -2,7 +2,6 @@ import logging import os import time - from multiprocessing.dummy import ( Pool, )