Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dceadd7
Remove loop
DrLeucine Mar 17, 2025
1a7caba
Vectorise functions
DrLeucine Mar 17, 2025
5c04726
Clean up + remove redundant functions
DrLeucine Mar 18, 2025
0ef83ad
Simplify args + remove print statements in verbosity = 0
DrLeucine Mar 18, 2025
432e3cb
Remove unused args
DrLeucine Mar 18, 2025
c19d89f
Update versions
DrLeucine Mar 18, 2025
f7542ea
Update Github Workflow
DrLeucine Mar 18, 2025
eb27d1c
Add tqdm
DrLeucine Mar 18, 2025
a5978e0
Fix tests
DrLeucine Mar 18, 2025
3f274c5
Fix tests
DrLeucine Mar 18, 2025
cf559f0
Update test with new vectorised iswithin function
DrLeucine Mar 18, 2025
3230adc
Update README
DrLeucine Mar 18, 2025
f1a50b4
Update AMPAL
DrLeucine Mar 20, 2025
1ae096f
Add compression level
DrLeucine Mar 20, 2025
6f1c666
Update dependencies
DrLeucine Mar 20, 2025
45774bf
Fix numpy version to < 2.0
DrLeucine Mar 20, 2025
03377e8
Enforce HDF5 Typing
DrLeucine Mar 20, 2025
56ee079
Handle chunking and clean up memory
DrLeucine Apr 7, 2025
3e6ff50
Address I/O bottleneck leading to broken pipe with throttling and gc
DrLeucine Apr 8, 2025
77772f5
Add psutil
DrLeucine Apr 8, 2025
c168aa2
Fix memory fraction to 0.7 and reduce amino acid length
DrLeucine Apr 8, 2025
c348151
Fix memory fraction to 0.65 and reduce amino acid length
DrLeucine Apr 8, 2025
6f528b0
Avoid save_results bottleneck by writing to different files and then …
DrLeucine Apr 8, 2025
69ae5e5
Add better gc and error logging without mp.Manager().dict()
DrLeucine Apr 8, 2025
2aff43a
Add progress for errored structures
DrLeucine Apr 9, 2025
1535822
Add sanity checks + check if PDB is in the merged file already
DrLeucine Apr 9, 2025
c0984b7
Fix remerging logic
DrLeucine Apr 9, 2025
cfe1416
Add better checks if files are smaller than workers
DrLeucine Apr 9, 2025
0dacca7
Add recovery option + better deal with metadata
DrLeucine Apr 10, 2025
790f74e
Add further garbage collection and cache size for hdf5
DrLeucine Apr 11, 2025
8f17ebf
Add smaller chunking hdf5 + hd5 flush
DrLeucine Apr 11, 2025
90841a5
Avoid error after merging partials
DrLeucine Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address I/O bottleneck leading to broken pipe with throttling and gc
  • Loading branch information
DrLeucine committed Apr 8, 2025
commit 3e6ff509e0bd0a16e343e4a8fbd6fbc4343f5990
251 changes: 153 additions & 98 deletions src/aposteriori/data_prep/create_frame_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
structure.
"""
import csv
import gc
import gzip
import multiprocessing as mp
import sys
Expand All @@ -19,11 +20,18 @@
import ampal.geometry as geometry
import h5py
import numpy as np
import psutil
from ampal.amino_acids import polarity_Zimmerman, residue_charge, standard_amino_acids
from tqdm import tqdm

from aposteriori.config import (ATOM_VANDERWAAL_RADII, MAKE_FRAME_DATASET_VER, PDB_REQUEST_URL, STD_RESIDUES_1, STD_RESIDUES_3,
UNCOMMON_RESIDUE_DICT, )
from aposteriori.config import (
ATOM_VANDERWAAL_RADII,
MAKE_FRAME_DATASET_VER,
PDB_REQUEST_URL,
STD_RESIDUES_1,
STD_RESIDUES_3,
UNCOMMON_RESIDUE_DICT,
)


@dataclass
Expand Down Expand Up @@ -719,6 +727,12 @@ def create_residue_frame(
and res_property != 0
):
frame[indices] = res_property

del indices, atom, res_obj, cha_obj
if voxels_as_gaussian:
del gaussian_matrix, gaussian_atom
gc.collect()

# Check whether central atom is C:
if "CA" in codec.atomic_labels:
if voxels_as_gaussian:
Expand All @@ -744,6 +758,8 @@ def create_residue_frame(
assert (
frame[center_adjust, center_adjust, center_adjust][0] == 1
), f"The central atom should be Carbon, but it is {frame[center_adjust, center_adjust, center_adjust]}."

gc.collect()
return frame


Expand Down Expand Up @@ -841,6 +857,9 @@ def voxelise_assembly(
rotamers=rota,
)
)
del array, encoded_residue, rota
gc.collect()

if verbosity > 1:
print(f"{name}:\t\tAdded residue {chain.id}:{residue.id}.")
if verbosity > 0:
Expand Down Expand Up @@ -983,7 +1002,12 @@ def process_single_path(
chain_filter_list: t.Optional[t.List[str]]
result: t.Union[t.Tuple[str, ChainDict], str]
while True:
# Get pdb path from the queue
structure_path = path_queue.get()
# Early exit if the queue is empty
if structure_path is None:
break
# Attempt to process the structure
if verbosity > 0:
print(f"Processing `{structure_path}`...")
try:
Expand Down Expand Up @@ -1013,10 +1037,13 @@ def process_single_path(
for curr_res in result:
result_queue.put(curr_res)
del curr_res
gc.collect()
del result
gc.collect()
else:
result_queue.put(result)
del result
gc.collect()


def save_results(
Expand Down Expand Up @@ -1058,7 +1085,7 @@ def save_results(
-------

"""
lock = mp.Manager().Lock() # Lock for exclusive write access
lock = mp.Lock() # Lock for exclusive write access
# We count the number of processed files to close the file every n structures
processed_since_close = 0

Expand All @@ -1069,7 +1096,7 @@ def save_results(
with tqdm(total=total_files, desc="Processing Structures") as pbar:
while True:
result = result_queue.get()
if result == "BREAK":
if result is None:
break

pdb_code, chain_dict = result
Expand Down Expand Up @@ -1147,17 +1174,37 @@ def save_results(
processed_since_close += 1
if processed_since_close >= close_every_n_structures:
hd5.close() # close
gc.collect() # Force garbage collection
# reopen
hd5 = h5py.File(str(h5_path), "a", libver="latest", rdcc_nbytes=1024)
existing_pdbs = set(hd5.keys()) # refresh
processed_since_close = 0

print(f"Finished processing files.")

# Explicitly close the file to avoid lingering objects)
hd5.close()


def estimate_result_queue_maxsize(
voxels_per_side: int,
encoder_length: int,
residues_per_protein: int = 1000,
bytes_per_voxel: int = 2, # float16
memory_fraction: float = 0.75,
) -> int:
# Estimate size per structure
voxels_per_frame = voxels_per_side ** 3
bytes_per_frame = voxels_per_frame * encoder_length * bytes_per_voxel
bytes_per_structure = bytes_per_frame * residues_per_protein

# Get available memory
available_memory = psutil.virtual_memory().available

# Estimate max number of structures that fit in 75% of available memory
max_structures = int((memory_fraction * available_memory) // bytes_per_structure)
return max(1, max_structures)


def process_paths(
structure_file_paths: t.List[Path],
output_path: Path,
Expand All @@ -1171,7 +1218,8 @@ def process_paths(
voxelise_all_states: bool = True,
tag_rotamers: bool = False,
):
"""Discretizes a list of structures and stores them in a HDF5 object.
"""
Discretizes a list of structures and stores them in a HDF5 object.

Parameters
----------
Expand All @@ -1197,111 +1245,118 @@ def process_paths(
Whether to voxelise only the first state of the NMR structure (False) or all of them (True).
"""

with mp.Manager() as manager:
path_queue, result_queue = manager.Queue(), manager.Queue()
complete, frames = manager.Value("i", 0), manager.Value("i", 0)
errors = manager.dict()

# Load existing dataset keys to skip processed files
existing_pdbs = set()
if output_path.exists():
if verbosity > 0:
print(f"Checking existing dataset at `{output_path}`...")
with h5py.File(str(output_path), "r") as hd5:
existing_pdbs.update(hd5.keys())
path_queue = mp.Queue()
dynamic_maxsize = estimate_result_queue_maxsize(
voxels_per_side, codec.encoder_length
)

# Filter files to process
unprocessed_files = [
p for p in structure_file_paths if p.stem.split(".")[0] not in existing_pdbs
]
total_files = len(structure_file_paths)
processed_files = total_files - len(unprocessed_files)
result_queue = mp.Queue(maxsize=dynamic_maxsize)
complete = mp.Value("i", 0)
frames = mp.Value("i", 0)
errors = mp.Manager().dict()

# Load existing dataset keys to skip processed files
existing_pdbs = set()
if output_path.exists():
if verbosity > 0:
print(f"Will attempt to process {total_files} structure file/s.")
print(f"Skipping {processed_files} already processed structures.")
print(f"Processing {len(unprocessed_files)} new structures.")
print(f"Checking existing dataset at `{output_path}`...")
with h5py.File(str(output_path), "r") as hd5:
existing_pdbs.update(hd5.keys())

for path in unprocessed_files:
path_queue.put(path)
# Filter files to process
unprocessed_files = [
p for p in structure_file_paths if p.stem.split(".")[0] not in existing_pdbs
]
total_files = len(structure_file_paths)
processed_files = total_files - len(unprocessed_files)

# Metadata for the dataset
metadata = DatasetMetadata(
make_frame_dataset_ver=MAKE_FRAME_DATASET_VER,
frame_dims=(
voxels_per_side,
voxels_per_side,
voxels_per_side,
codec.encoder_length,
),
atom_encoder=list(codec.atomic_labels),
atom_filter_fn=str(default_atom_filter),
residue_encoder=list(standard_amino_acids.values()),
frame_edge_length=frame_edge_length,
voxels_as_gaussian=voxels_as_gaussian,
)
if verbosity > 0:
print(f"Will attempt to process {total_files} structure file/s.")
print(f"Skipping {processed_files} already processed structures.")
print(f"Processing {len(unprocessed_files)} new structures.")

# Create worker processes
workers = [
mp.Process(
target=process_single_path,
args=(
path_queue,
result_queue,
frame_edge_length,
voxels_per_side,
default_atom_filter,
None,
errors,
verbosity,
codec,
voxels_as_gaussian,
voxelise_all_states,
tag_rotamers,
),
)
for _ in range(processes)
]
for path in unprocessed_files:
path_queue.put(path)

for _ in range(processes):
path_queue.put(None) # poison pill for each worker

# Storage process
storer = mp.Process(
target=save_results,
# Metadata for the dataset
metadata = DatasetMetadata(
make_frame_dataset_ver=MAKE_FRAME_DATASET_VER,
frame_dims=(
voxels_per_side,
voxels_per_side,
voxels_per_side,
codec.encoder_length,
),
atom_encoder=list(codec.atomic_labels),
atom_filter_fn=str(default_atom_filter),
residue_encoder=list(standard_amino_acids.values()),
frame_edge_length=frame_edge_length,
voxels_as_gaussian=voxels_as_gaussian,
)

# Create worker processes
workers = [
mp.Process(
target=process_single_path,
args=(
path_queue,
result_queue,
output_path,
len(unprocessed_files),
complete,
frames,
frame_edge_length,
voxels_per_side,
default_atom_filter,
None,
errors,
verbosity,
metadata,
gzip_compression,
codec,
voxels_as_gaussian,
voxelise_all_states,
tag_rotamers,
),
)
for _ in range(processes)
]

# Storage process
storer = mp.Process(
target=save_results,
args=(
result_queue,
output_path,
len(unprocessed_files),
complete,
frames,
verbosity,
metadata,
gzip_compression,
),
)

# Start all processes
for proc in workers + [storer]:
proc.start()

# Display progress bar
with tqdm(total=len(unprocessed_files), desc="Processing Structures") as pbar:
while (complete.value + len(errors)) < len(unprocessed_files):
if any(not p.is_alive() for p in workers + [storer]):
if verbosity > 0:
print("One or more processes died, aborting...")
break
pbar.update(complete.value - pbar.n)

# Ensure all results are saved before exiting
result_queue.put("BREAK")
storer.join()

# Terminate all processes
for proc in workers + [storer]:
proc.terminate()
if (verbosity > 0) and (errors):
print(f"There were {len(errors)} errors while creating the dataset:")
for path, error in errors.items():
print(f"\t{path}: {error}")
# Start all processes
for proc in workers + [storer]:
proc.start()
storer.start()

# Display progress bar
with tqdm(total=len(unprocessed_files), desc="Processing Structures") as pbar:
last_completed = 0
while complete.value < len(unprocessed_files):
current_done = complete.value
pbar.update(current_done - last_completed)
last_completed = current_done

for proc in workers:
proc.join()

result_queue.put(None)
storer.join()

if (verbosity > 0) and errors:
print(f"There were {len(errors)} errors while creating the dataset:")
for path, error in errors.items():
print(f"\t{path}: {error}")

return output_path

Expand Down
Loading