Skip to content

Commit 6f0cc33

Browse files
fix: address 6 critical issues in kanban system implementation
- Fix subprocess deadlock by redirecting output to temp files instead of PIPE - Enable foreign key constraints on all SQLite connections for proper cascades - Replace immediate dispatcher startup with proper event handler registration - Add persistent CLI board switching via ~/.praisonai/kanban_config.json - Prevent database locking with connection-reusing internal methods - Replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - Fix optimistic locking by incrementing version in claim/release operations Fixes all issues identified in code review by Gemini. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent b9eca74 commit 6f0cc33

6 files changed

Lines changed: 156 additions & 42 deletions

File tree

src/praisonai/praisonai/cli/commands/kanban.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,25 @@ def kanban_boards_cmd(
282282
raise typer.Exit(1)
283283

284284
import os
285+
from pathlib import Path
286+
import json
287+
288+
# Set for current session
285289
os.environ['PRAISONAI_KANBAN_BOARD'] = board
286290

287-
output.print_success(f"Switched to board '{board}'")
288-
output.print_info("Note: This only affects the current CLI session")
291+
# Persist to config file for future sessions
292+
config_dir = Path.home() / ".praisonai"
293+
config_dir.mkdir(exist_ok=True)
294+
config_file = config_dir / "kanban_config.json"
295+
296+
try:
297+
config = {"active_board": board}
298+
with open(config_file, 'w') as f:
299+
json.dump(config, f, indent=2)
300+
output.print_success(f"Switched to board '{board}' (persisted)")
301+
except Exception as e:
302+
output.print_success(f"Switched to board '{board}' (session only)")
303+
output.print_info(f"Could not persist setting: {e}")
289304

290305
else:
291306
output.print_error(f"Unknown action: {action}")

src/praisonai/praisonai/gateway/kanban_dispatcher.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,24 @@ async def _spawn_worker(self, task: Any, store: Any) -> bool:
144144

145145
logger.info(f"Spawning worker for task {task.id}: {' '.join(cmd)}")
146146

147-
# Start process
147+
# Start process with output redirect to avoid deadlock
148+
import tempfile
149+
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.log') as temp_log:
150+
temp_log_path = temp_log.name
151+
148152
process = subprocess.Popen(
149153
cmd,
150154
env=env,
151-
stdout=subprocess.PIPE,
155+
stdout=open(temp_log_path, 'w'),
152156
stderr=subprocess.STDOUT,
153157
text=True
154158
)
155159

160+
# Store log path for later cleanup
161+
if not hasattr(self, '_temp_logs'):
162+
self._temp_logs = {}
163+
self._temp_logs[task.id] = temp_log_path
164+
156165
# Track the running task
157166
self.running_tasks[task.id] = process
158167

@@ -208,8 +217,17 @@ def _cleanup_completed_tasks(self, store: Any):
208217
# Get return code
209218
return_code = process.returncode
210219

211-
# Read output
212-
stdout_data, _ = process.communicate(timeout=1)
220+
# Read output from temp log file
221+
stdout_data = ""
222+
if hasattr(self, '_temp_logs') and task_id in self._temp_logs:
223+
try:
224+
with open(self._temp_logs[task_id], 'r') as f:
225+
stdout_data = f.read()
226+
except Exception as e:
227+
logger.warning(f"Failed to read log for task {task_id}: {e}")
228+
stdout_data = f"<log read error: {e}>"
229+
else:
230+
stdout_data = "<no log available>"
213231

214232
# Update task based on exit code
215233
if return_code == 0:
@@ -260,9 +278,19 @@ def _cleanup_completed_tasks(self, store: Any):
260278
except:
261279
pass
262280

263-
# Remove completed tasks from tracking
281+
# Remove completed tasks from tracking and clean up temp logs
264282
for task_id in completed:
265283
del self.running_tasks[task_id]
284+
285+
# Clean up temporary log file
286+
if hasattr(self, '_temp_logs') and task_id in self._temp_logs:
287+
try:
288+
import os
289+
os.unlink(self._temp_logs[task_id])
290+
except Exception as e:
291+
logger.warning(f"Failed to cleanup temp log for task {task_id}: {e}")
292+
finally:
293+
del self._temp_logs[task_id]
266294

267295
async def run_forever(self):
268296
"""

src/praisonai/praisonai/integration/bridges/kanban_bridge.py

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ def register_kanban_backends():
6565
backends.set_backend("jobs_executor", jobs_executor)
6666
log.debug("jobs_executor backend registered")
6767

68-
# Start kanban dispatcher
69-
ensure_kanban_dispatcher()
68+
# Register kanban dispatcher startup handler
69+
register_kanban_startup_handler()
7070

7171
return True
7272

@@ -75,34 +75,39 @@ def register_kanban_backends():
7575
return False
7676

7777

78-
def ensure_kanban_dispatcher():
79-
"""Ensure kanban dispatcher is running in background."""
80-
import asyncio
78+
def register_kanban_startup_handler():
79+
"""Register kanban dispatcher startup handler with the ASGI app."""
8180
import logging
8281

8382
log = logging.getLogger(__name__)
8483

8584
try:
85+
import praisonaiui.backends as backends
8686
from praisonai.gateway.kanban_dispatcher import start_kanban_dispatcher, is_dispatcher_running
8787

88-
if not is_dispatcher_running():
89-
# Start dispatcher in background
90-
loop = None
88+
async def _startup_handler():
89+
"""Startup handler for kanban dispatcher."""
9190
try:
92-
loop = asyncio.get_event_loop()
93-
except RuntimeError:
94-
# No event loop in this thread, create one
95-
loop = asyncio.new_event_loop()
96-
asyncio.set_event_loop(loop)
97-
98-
if loop and loop.is_running():
99-
# Loop is running, schedule the task
100-
asyncio.create_task(start_kanban_dispatcher())
101-
log.debug("kanban dispatcher scheduled to start")
102-
else:
103-
# No loop or loop not running, we can't start dispatcher here
104-
# This is expected in CLI/sync contexts
105-
log.debug("kanban dispatcher cannot start (no async context)")
91+
if not is_dispatcher_running():
92+
await start_kanban_dispatcher()
93+
log.info("kanban dispatcher started")
94+
except Exception as exc:
95+
log.warning("failed to start kanban dispatcher: %s", exc)
10696

97+
# Register startup handler if the backend supports it
98+
if hasattr(backends, 'register_startup_handler'):
99+
backends.register_startup_handler(_startup_handler)
100+
log.debug("kanban dispatcher startup handler registered")
101+
else:
102+
# Fallback: try immediate start if event loop is running
103+
try:
104+
import asyncio
105+
loop = asyncio.get_running_loop()
106+
if loop and not is_dispatcher_running():
107+
asyncio.create_task(start_kanban_dispatcher())
108+
log.debug("kanban dispatcher started via task")
109+
except RuntimeError:
110+
log.debug("kanban dispatcher deferred (no running event loop)")
111+
107112
except Exception as exc:
108-
log.debug("kanban dispatcher startup failed: %s", exc)
113+
log.debug("kanban startup handler registration failed: %s", exc)

src/praisonai/praisonai/kanban/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ class Task:
3737
metadata: Optional[Dict[str, Any]] = None
3838

3939
def __post_init__(self):
40+
from datetime import timezone
4041
if self.created_at is None:
41-
self.created_at = datetime.utcnow()
42+
self.created_at = datetime.now(timezone.utc)
4243
if self.updated_at is None:
43-
self.updated_at = datetime.utcnow()
44+
self.updated_at = datetime.now(timezone.utc)
4445
if self.metadata is None:
4546
self.metadata = {}
4647

src/praisonai/praisonai/kanban/paths.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,20 @@ def get_kanban_db_path(board: Optional[str] = None) -> Path:
2222

2323
# Read from environment variable for override
2424
if board is None:
25-
board = os.environ.get('PRAISONAI_KANBAN_BOARD', 'default')
25+
board = os.environ.get('PRAISONAI_KANBAN_BOARD')
26+
if not board:
27+
# Try to read from persisted config file
28+
try:
29+
import json
30+
config_file = home / "kanban_config.json"
31+
if config_file.exists():
32+
with open(config_file) as f:
33+
config = json.load(f)
34+
board = config.get('active_board', 'default')
35+
else:
36+
board = 'default'
37+
except Exception:
38+
board = 'default'
2639

2740
# Legacy override for single DB file
2841
if db_override := os.environ.get('PRAISONAI_KANBAN_DB'):

src/praisonai/praisonai/kanban/sqlite_store.py

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ def _get_connection(self):
106106
"""Get database connection with proper cleanup."""
107107
conn = sqlite3.connect(self.db_path, timeout=30.0)
108108
conn.row_factory = sqlite3.Row
109+
# Enable foreign key constraints for this connection
110+
conn.execute("PRAGMA foreign_keys=ON")
109111
try:
110112
yield conn
111113
conn.commit()
@@ -267,11 +269,58 @@ def list_tasks(self, filters: Optional[Dict[str, Any]] = None) -> List[Task]:
267269
cursor = conn.execute(query, values)
268270
return [Task.from_dict(dict(row)) for row in cursor.fetchall()]
269271

272+
def _get_task_with_conn(self, task_id: str, conn: sqlite3.Connection) -> Optional[Task]:
273+
"""Get task using existing connection to avoid nesting."""
274+
cursor = conn.execute(
275+
"SELECT * FROM tasks WHERE id = ?", (task_id,)
276+
)
277+
row = cursor.fetchone()
278+
if row:
279+
return Task.from_dict(dict(row))
280+
return None
281+
282+
def _update_task_with_conn(self, task_id: str, updates: Dict[str, Any], conn: sqlite3.Connection) -> Task:
283+
"""Update task using existing connection to avoid nesting."""
284+
from datetime import timezone
285+
286+
if not updates:
287+
return self._get_task_with_conn(task_id, conn)
288+
289+
# Build update query
290+
set_clauses = []
291+
params = []
292+
293+
for key, value in updates.items():
294+
if key in ['id', 'created_at']: # Don't allow updating immutable fields
295+
continue
296+
set_clauses.append(f"{key} = ?")
297+
params.append(value)
298+
299+
if not set_clauses:
300+
return self._get_task_with_conn(task_id, conn)
301+
302+
# Always update timestamp and version
303+
set_clauses.append("updated_at = ?")
304+
set_clauses.append("version = version + 1")
305+
params.append(datetime.now(timezone.utc).isoformat())
306+
params.append(task_id)
307+
308+
query = f"UPDATE tasks SET {', '.join(set_clauses)} WHERE id = ?"
309+
cursor = conn.execute(query, params)
310+
311+
if cursor.rowcount == 0:
312+
raise ValueError(f"Task {task_id} not found")
313+
314+
# Log the update
315+
self._log_event(conn, task_id, 'updated', updates)
316+
317+
return self._get_task_with_conn(task_id, conn)
318+
270319
def move_task(self, task_id: str, status: str) -> Task:
271320
"""Move task to new status with parent/child promotion logic."""
272321
with self._get_connection() as conn:
273322
# Check if task exists
274-
task = self.get_task(task_id)
323+
task = self._get_task_with_conn(task_id, conn)
275324
if not task:
276325
raise ValueError(f"Task {task_id} not found")
277326

@@ -290,7 +339,7 @@ def move_task(self, task_id: str, status: str) -> Task:
290339
raise ValueError("Cannot move to ready: incomplete parent tasks")
291340

292341
# Update task status
293-
updated_task = self.update_task(task_id, {'status': status})
342+
updated_task = self._update_task_with_conn(task_id, {'status': status}, conn)
294343

295344
# Promote children if moving to done
296345
if new_status == TaskStatus.DONE:
@@ -300,7 +349,7 @@ def move_task(self, task_id: str, status: str) -> Task:
300349

301350
for row in cursor.fetchall():
302351
child_id = row['child_id']
303-
child_task = self.get_task(child_id)
352+
child_task = self._get_task_with_conn(child_id, conn)
304353

305354
if child_task and child_task.status == TaskStatus.TODO:
306355
# Check if all other parents are done
@@ -312,7 +361,7 @@ def move_task(self, task_id: str, status: str) -> Task:
312361
""", (child_id,))
313362

314363
if parent_check.fetchone()['incomplete_parents'] == 0:
315-
self.update_task(child_id, {'status': TaskStatus.READY.value})
364+
self._update_task_with_conn(child_id, {'status': TaskStatus.READY.value}, conn)
316365

317366
self._log_event(conn, task_id, 'moved', {
318367
'old_status': task.status.value,
@@ -484,12 +533,13 @@ def list_events(self, since: Optional[datetime] = None) -> List[TaskEvent]:
484533
def claim_task(self, task_id: str, worker_id: str) -> bool:
485534
"""Claim a ready task for execution (CAS operation)."""
486535
with self._get_connection() as conn:
487-
# Atomic claim with CAS on status and claim_lock
536+
# Atomic claim with CAS on status and claim_lock, increment version for optimistic locking
537+
from datetime import timezone
488538
result = conn.execute("""
489539
UPDATE tasks
490-
SET claim_lock = ?, updated_at = ?, status = 'running'
540+
SET claim_lock = ?, updated_at = ?, status = 'running', version = version + 1
491541
WHERE id = ? AND status = 'ready' AND (claim_lock IS NULL OR claim_lock = '')
492-
""", (worker_id, datetime.utcnow().isoformat(), task_id))
542+
""", (worker_id, datetime.now(timezone.utc).isoformat(), task_id))
493543

494544
if result.rowcount > 0:
495545
self._log_event(conn, task_id, 'claimed', {'worker_id': worker_id})
@@ -500,11 +550,13 @@ def claim_task(self, task_id: str, worker_id: str) -> bool:
500550
def release_claim(self, task_id: str, worker_id: str) -> bool:
501551
"""Release claim on task."""
502552
with self._get_connection() as conn:
553+
# Release claim and increment version for optimistic locking
554+
from datetime import timezone
503555
result = conn.execute("""
504556
UPDATE tasks
505-
SET claim_lock = NULL, updated_at = ?, status = 'ready'
557+
SET claim_lock = NULL, updated_at = ?, status = 'ready', version = version + 1
506558
WHERE id = ? AND claim_lock = ?
507-
""", (datetime.utcnow().isoformat(), task_id, worker_id))
559+
""", (datetime.now(timezone.utc).isoformat(), task_id, worker_id))
508560

509561
if result.rowcount > 0:
510562
self._log_event(conn, task_id, 'released', {'worker_id': worker_id})

0 commit comments

Comments
 (0)