Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dceadd7
Remove loop
DrLeucine Mar 17, 2025
1a7caba
Vectorise functions
DrLeucine Mar 17, 2025
5c04726
Clean up + remove redundant functions
DrLeucine Mar 18, 2025
0ef83ad
Simplify args + remove print statements in verbosity = 0
DrLeucine Mar 18, 2025
432e3cb
Remove unused args
DrLeucine Mar 18, 2025
c19d89f
Update versions
DrLeucine Mar 18, 2025
f7542ea
Update Github Workflow
DrLeucine Mar 18, 2025
eb27d1c
Add tqdm
DrLeucine Mar 18, 2025
a5978e0
Fix tests
DrLeucine Mar 18, 2025
3f274c5
Fix tests
DrLeucine Mar 18, 2025
cf559f0
Update test with new vectorised iswithin function
DrLeucine Mar 18, 2025
3230adc
Update README
DrLeucine Mar 18, 2025
f1a50b4
Update AMPAL
DrLeucine Mar 20, 2025
1ae096f
Add compression level
DrLeucine Mar 20, 2025
6f1c666
Update dependencies
DrLeucine Mar 20, 2025
45774bf
Fix numpy version to < 2.0
DrLeucine Mar 20, 2025
03377e8
Enforce HDF5 Typing
DrLeucine Mar 20, 2025
56ee079
Handle chunking and clean up memory
DrLeucine Apr 7, 2025
3e6ff50
Address I/O bottleneck leading to broken pipe with throttling and gc
DrLeucine Apr 8, 2025
77772f5
Add psutil
DrLeucine Apr 8, 2025
c168aa2
Fix memory fraction to 0.7 and reduce amino acid length
DrLeucine Apr 8, 2025
c348151
Fix memory fraction to 0.65 and reduce amino acid length
DrLeucine Apr 8, 2025
6f528b0
Avoid save_results bottleneck by writing to different files and then …
DrLeucine Apr 8, 2025
69ae5e5
Add better gc and error logging without mp.Manager().dict()
DrLeucine Apr 8, 2025
2aff43a
Add progress for errored structures
DrLeucine Apr 9, 2025
1535822
Add sanity checks + check if PDB is in the merged file already
DrLeucine Apr 9, 2025
c0984b7
Fix remerging logic
DrLeucine Apr 9, 2025
cfe1416
Add better checks if files are smaller than workers
DrLeucine Apr 9, 2025
0dacca7
Add recovery option + better deal with metadata
DrLeucine Apr 10, 2025
790f74e
Add further garbage collection and cache size for hdf5
DrLeucine Apr 11, 2025
8f17ebf
Add smaller chunking hdf5 + hd5 flush
DrLeucine Apr 11, 2025
90841a5
Avoid error after merging partials
DrLeucine Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add progress for errored structures
  • Loading branch information
DrLeucine committed Apr 9, 2025
commit 2aff43a92f0d80faca9af89338d6b74ef4a815b8
26 changes: 16 additions & 10 deletions src/aposteriori/data_prep/create_frame_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,6 @@ def process_single_path(
voxels_per_side: int,
atom_filter_fn: t.Callable[[ampal.Atom], bool],
chain_filter_dict: t.Optional[t.Dict[str, t.List[str]]],
errors: t.Dict[str, str],
verbosity: int,
codec: object,
voxels_as_gaussian: bool,
Expand Down Expand Up @@ -1036,9 +1035,7 @@ def process_single_path(
)
except Exception as e:
result = str(e)
if isinstance(result, str):
errors[str(structure_path)] = result
elif isinstance(result, list):
if isinstance(result, list):
for curr_res in result:
result_queue.put(curr_res)
del curr_res
Expand All @@ -1058,7 +1055,6 @@ def save_worker_results(
voxels_per_side: int,
atom_filter_fn: t.Callable[[ampal.Atom], bool],
chain_filter_dict: t.Optional[t.Dict[str, t.List[str]]],
errors: t.Dict[str, str],
verbosity: int,
codec: Codec,
voxels_as_gaussian: bool,
Expand All @@ -1072,7 +1068,7 @@ def save_worker_results(
if verbosity > 1:
print(f"[Worker {worker_id}] starting...")

error_log_path = output_path.with_name(output_path.stem + "_errors.log")
error_log_path = output_path.with_name(f"{output_path.stem}_worker_{worker_id}_errors.log")

with h5py.File(str(output_path), "w") as hd5:
hd5.attrs.update(metadata.__dict__)
Expand Down Expand Up @@ -1100,8 +1096,12 @@ def save_worker_results(
tag_rotamers,
)
except Exception as e:
# Log the error
with open(error_log_path, "a") as ef:
ef.write(f"{structure_path}: {str(e)}\n")
# Advance the progress counter
with progress_counter.get_lock():
progress_counter.value += 1
continue
if isinstance(result, list):
for pdb_code, chain_dict in result:
Expand Down Expand Up @@ -1312,7 +1312,6 @@ def process_paths(
for i in range(processes)
]
progress_counter = mp.Value("i", 0)
error_lock = mp.Lock()

# Spawn worker processes
workers = [
Expand All @@ -1325,7 +1324,6 @@ def process_paths(
voxels_per_side,
default_atom_filter,
None,
error_lock,
verbosity,
codec,
voxels_as_gaussian,
Expand All @@ -1346,7 +1344,7 @@ def process_paths(
with tqdm(total=len(unprocessed_files), desc="Total Progress") as pbar:
last_count = 0
while True:
# read the shared counter
# Read the shared counter
current_count = progress_counter.value
pbar.update(current_count - last_count)
last_count = current_count
Expand All @@ -1358,9 +1356,17 @@ def process_paths(
for proc in workers:
proc.join()

# Merge HDF5 files from each worker into a single output file
merge_worker_hdf5_files(worker_output_paths, output_path, metadata, verbosity)
# Merge error logs from each worker into a single error log file
error_log_path = output_path.with_name(f"{output_path.stem}_errors.log")
with open(error_log_path, "w") as merged_log:
for i in range(processes):
worker_log = output_path.parent / f"{output_path.stem}_worker_{i}_errors.log"
if worker_log.exists():
merged_log.write(worker_log.read_text())
worker_log.unlink()

error_log_path = output_path.with_name(output_path.stem + "_errors.log")
if verbosity > 0 and error_log_path.exists() and error_log_path.stat().st_size > 0:
print(f"Errors were logged in: {error_log_path}")

Expand Down