diff --git a/workflow/executor/parallel_executor.py b/workflow/executor/parallel_executor.py index 7584002fd3..7cdfe00ca7 100755 --- a/workflow/executor/parallel_executor.py +++ b/workflow/executor/parallel_executor.py @@ -1,10 +1,13 @@ """Parallel execution helpers that eliminate duplicated code.""" import concurrent.futures +import os from typing import Any, Callable, List, Tuple from utils.log_manager import LogManager +MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4) + class ParallelExecutor: """Manage parallel execution for workflow nodes. @@ -103,7 +106,7 @@ def _execute_parallel_batch( """ self.log_manager.debug(f"Executing {len(items)} items in parallel") - with concurrent.futures.ThreadPoolExecutor(max_workers=len(items)) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(items), MAX_WORKERS)) as executor: futures = [] for item in items: future = executor.submit(executor_func, item)