Skip to content

Commit e4e2657

Browse files
fix: resolve critical async safety issues identified by code reviewers
- Fix threading.Lock used with async with in Process class (issue #1289) * Replace sync lock creation with lazy async lock initialization * Prevents runtime AttributeError in aworkflow method - Improve thread safety in AsyncPostgres lazy lock initialization * Add double-checked locking pattern to prevent race conditions * Ensures only one asyncio.Lock instance created across coroutines - Remove redundant asyncio import in async_postgres.py - Improve exception chaining for better error diagnostics Addresses critical issues raised by Gemini, Qodo, CodeRabbit reviewers. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent cdefa98 commit e4e2657

2 files changed

Lines changed: 10 additions & 5 deletions

File tree

src/praisonai-agents/praisonaiagents/process/process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(
4545
self.workflow_timeout = workflow_timeout
4646
self.task_retry_counter: Dict[str, int] = {} # Initialize retry counter
4747
self.workflow_finished = False # ADDED: Workflow finished flag
48-
self._state_lock = threading.Lock() # Thread lock for shared state protection
48+
self._state_lock = None # Lazy-initialized async lock for shared state protection
4949

5050
# Resolve verbose from output= param (takes precedence) or legacy verbose= param
5151
if output is not None:
@@ -597,6 +597,8 @@ async def aworkflow(self) -> AsyncGenerator[str, None]:
597597
break
598598

599599
# Reset completed task to "not started" so it can run again (atomic operation)
600+
if self._state_lock is None:
601+
self._state_lock = asyncio.Lock()
600602
async with self._state_lock:
601603
if self.tasks[task_id].status == "completed":
602604
# Never reset loop tasks, decision tasks, or their subtasks if rerun is False

src/praisonai/praisonai/persistence/conversation/async_postgres.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import json
1010
import logging
11+
import threading
1112
import time
1213
from typing import List, Optional
1314

@@ -64,24 +65,26 @@ def __init__(
6465
self._pool = None
6566
self._initialized = False
6667
self._init_lock = None
68+
self._lock_creation_lock = threading.Lock()
6769

6870
async def init(self):
6971
"""Initialize connection pool and create tables."""
7072
if self._init_lock is None:
71-
import asyncio
72-
self._init_lock = asyncio.Lock()
73+
with self._lock_creation_lock:
74+
if self._init_lock is None: # Double-checked locking
75+
self._init_lock = asyncio.Lock()
7376

7477
async with self._init_lock:
7578
if self._initialized:
7679
return
7780

7881
try:
7982
import asyncpg
80-
except ImportError:
83+
except ImportError as err:
8184
raise ImportError(
8285
"asyncpg is required for async PostgreSQL support. "
8386
"Install with: pip install asyncpg"
84-
)
87+
) from err
8588

8689
if self.url:
8790
self._pool = await asyncpg.create_pool(self.url, min_size=1, max_size=self.pool_size)

0 commit comments

Comments
 (0)