Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 40 additions & 51 deletions src/praisonai/praisonai/agents_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,37 @@ def load_tools_from_package(self, package_path):
return tools_dict


def _load_config(self):
"""Load configuration from agent file or agent_yaml."""
if self.agent_yaml:
config = yaml.safe_load(self.agent_yaml)
else:
if self.agent_file in ('/app/api:app', 'api:app'):
self.agent_file = 'agents.yaml'
try:
with open(self.agent_file, 'r') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
print(f"File not found: {self.agent_file}")
return None

# Apply CLI config overrides to both paths (agent_yaml and agent_file)
if self.cli_config:
self._merge_cli_config(config, self.cli_config)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return config
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def _is_workflow_yaml(self, config):
"""Check if configuration is workflow mode YAML."""
process_type = config.get('process', 'sequential')
has_steps = 'steps' in config
has_workflow_config = 'workflow' in config
workflow_type = config.get('type')
return (
process_type == 'workflow'
or (has_steps and has_workflow_config)
or workflow_type in {'job', 'hybrid'}
)

def generate_crew_and_kickoff(self):
"""
Generates a crew of agents and initiates tasks based on the provided configuration.
Expand All @@ -649,30 +680,10 @@ def generate_crew_and_kickoff(self):

This function first loads the agent configuration from the specified file. It then initializes the tools required for the agents based on the specified framework. If the specified framework is "autogen", it loads the LLM configuration dynamically and creates an AssistantAgent for each role in the configuration. It then adds tools to the agents if specified in the configuration. Finally, it prepares tasks for the agents based on the configuration and initiates the tasks using the crew of agents. If the specified framework is not "autogen", it creates a crew of agents and initiates tasks based on the configuration.
"""
if self.agent_yaml:
config = yaml.safe_load(self.agent_yaml)
else:
if self.agent_file == '/app/api:app' or self.agent_file == 'api:app':
self.agent_file = 'agents.yaml'
try:
with open(self.agent_file, 'r') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
print(f"File not found: {self.agent_file}")
return

# Apply CLI configuration overrides to YAML config
if self.cli_config:
# Merge CLI configuration with YAML config
self._merge_cli_config(config, self.cli_config)

# Check if this is a workflow-mode YAML (process: workflow or has steps section)
process_type = config.get('process', 'sequential')
has_steps = 'steps' in config
has_workflow_config = 'workflow' in config

if process_type == 'workflow' or (has_steps and has_workflow_config):
# Route to YAMLWorkflowParser for advanced workflow patterns
config = self._load_config()
if config is None:
return
if self._is_workflow_yaml(config):
return self._run_yaml_workflow(config)

# Use shared preparation logic
Expand All @@ -694,35 +705,12 @@ async def agenerate_crew_and_kickoff(self):
Async version of generate_crew_and_kickoff.
Generates a crew of agents and initiates tasks based on the provided configuration.
"""
if self.agent_yaml:
config = yaml.safe_load(self.agent_yaml)
else:
if self.agent_file == '/app/api:app' or self.agent_file == 'api:app':
self.agent_file = 'agents.yaml'
try:
with open(self.agent_file, 'r') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
print(f"File not found: {self.agent_file}")
return

# Apply CLI configuration overrides to YAML config
if self.cli_config:
# Merge CLI configuration with YAML config
self._merge_cli_config(config, self.cli_config)

# Check if this is a workflow-mode YAML (process: workflow or has steps section)
process_type = config.get('process', 'sequential')
has_steps = 'steps' in config
has_workflow_config = 'workflow' in config

if process_type == 'workflow' or (has_steps and has_workflow_config):
config = self._load_config()
if config is None:
return
if self._is_workflow_yaml(config):
return await self._arun_yaml_workflow(config)
else:
return await self._arun_framework(config)

async def _arun_framework(self, config):
"""Async version of _run_framework with shared preparation logic."""
# Use shared preparation logic
prep = self._prepare_for_run(config)

Expand All @@ -737,6 +725,7 @@ async def _arun_framework(self, config):
cli_config=getattr(self, 'cli_config', None),
)


async def _arun_yaml_workflow(self, config):
"""
Async version of _run_yaml_workflow using YAMLWorkflowParser.
Expand Down
50 changes: 36 additions & 14 deletions src/praisonai/praisonai/gateway/kanban_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ async def dispatch_once(self) -> int:
# Release claim on error
try:
store.release_claim(task.id, self.worker_id)
except:
pass
except Exception as release_err:
logger.error(
"Failed to release claim for task %s (worker %s); task may be stuck "
"in 'claimed' state until manually released: %s",
task.id, self.worker_id, release_err,
exc_info=True,
)

if spawned > 0:
logger.info(f"Spawned {spawned} kanban task workers")
Expand Down Expand Up @@ -149,13 +154,15 @@ async def _spawn_worker(self, task: Any, store: Any) -> bool:
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.log') as temp_log:
temp_log_path = temp_log.name

process = subprocess.Popen(
cmd,
env=env,
stdout=open(temp_log_path, 'w'),
stderr=subprocess.STDOUT,
text=True
)
with open(temp_log_path, 'w') as log_handle:
process = subprocess.Popen(
cmd,
env=env,
stdout=log_handle,
stderr=subprocess.STDOUT,
text=True
)
# parent FD closed here; child still has its duped copy

# Store log path for later cleanup
if not hasattr(self, '_temp_logs'):
Expand Down Expand Up @@ -275,8 +282,12 @@ def _cleanup_completed_tasks(self, store: Any):
# Release claim as fallback
try:
store.release_claim(task_id, self.worker_id)
except:
pass
except Exception as release_err:
logger.error(
"Failed to release claim during cleanup for task %s: %s",
task_id, release_err,
exc_info=True,
)

# Remove completed tasks from tracking and clean up temp logs
for task_id in completed:
Expand Down Expand Up @@ -330,12 +341,23 @@ async def _shutdown(self):

# Force terminate remaining tasks
for task_id, process in self.running_tasks.items():
logger.warning(f"Force terminating task {task_id}")
try:
logger.warning(f"Force terminating task {task_id}")
process.terminate()
process.wait(timeout=5)
except:
process.kill()
except subprocess.TimeoutExpired:
logger.warning(f"Task {task_id} did not terminate in 5s; sending SIGKILL")
try:
process.kill()
except OSError as kill_err:
logger.error(f"Failed to kill task {task_id}: {kill_err}")
except OSError as os_err:
logger.error(f"OS error while terminating task {task_id}: {os_err}")
try:
process.kill()
except OSError as kill_err:
logger.error(f"Failed to kill task {task_id}: {kill_err}")
# KeyboardInterrupt, SystemExit, CancelledError now propagate as intended

self.running_tasks.clear()

Expand Down
Loading