-
Notifications
You must be signed in to change notification settings - Fork 487
Expand file tree
/
Copy pathsupervisor_framework_crewai_gemini_marktechpost.py
More file actions
367 lines (312 loc) · 13.5 KB
/
supervisor_framework_crewai_gemini_marktechpost.py
File metadata and controls
367 lines (312 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# -*- coding: utf-8 -*-
"""supervisor_framework_crewai_gemini_Marktechpost.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1VzZ9EwQ1BZWKsLGtUIIWW6wx5tu_VgpC
"""
!pip install crewai crewai-tools langchain-google-genai python-dotenv
import os
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, WebsiteSearchTool
from langchain_google_genai import ChatGoogleGenerativeAI
from dotenv import load_dotenv
class TaskPriority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class TaskConfig:
description: str
expected_output: str
priority: TaskPriority
max_execution_time: int = 300
requires_human_input: bool = False
class SupervisorFramework:
"""
Advanced Supervisor Agent Framework using CrewAI
Manages multiple specialized agents with hierarchical coordination
"""
def __init__(self, gemini_api_key: str, serper_api_key: str = None):
"""
Initialize the supervisor framework
Args:
gemini_api_key: Google Gemini API key (free tier)
serper_api_key: Serper API key for web search (optional, has free tier)
"""
os.environ["GOOGLE_API_KEY"] = gemini_api_key
if serper_api_key:
os.environ["SERPER_API_KEY"] = serper_api_key
self.llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash",
temperature=0.7,
max_tokens=2048
)
self.tools = []
if serper_api_key:
self.tools.append(SerperDevTool())
self.agents = {}
self.supervisor = None
self.crew = None
print("🚀 SupervisorFramework initialized successfully!")
print(f"📡 LLM Model: {self.llm.model}")
print(f"🛠️ Available tools: {len(self.tools)}")
def create_research_agent(self) -> Agent:
"""Create a specialized research agent"""
return Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research and gather accurate information on any given topic",
backstory="""You are an expert research analyst with years of experience in
information gathering, fact-checking, and synthesizing complex data from multiple sources.
You excel at finding reliable sources and presenting well-structured research findings.""",
verbose=True,
allow_delegation=False,
llm=self.llm,
tools=self.tools,
max_iter=3,
memory=True
)
def create_analyst_agent(self) -> Agent:
"""Create a specialized data analyst agent"""
return Agent(
role="Strategic Data Analyst",
goal="Analyze data, identify patterns, and provide actionable insights",
backstory="""You are a strategic data analyst with expertise in statistical analysis,
pattern recognition, and business intelligence. You transform raw data and research
into meaningful insights that drive decision-making.""",
verbose=True,
allow_delegation=False,
llm=self.llm,
max_iter=3,
memory=True
)
def create_writer_agent(self) -> Agent:
"""Create a specialized content writer agent"""
return Agent(
role="Expert Technical Writer",
goal="Create clear, engaging, and well-structured written content",
backstory="""You are an expert technical writer with a talent for making complex
information accessible and engaging. You specialize in creating documentation,
reports, and content that effectively communicates insights to diverse audiences.""",
verbose=True,
allow_delegation=False,
llm=self.llm,
max_iter=3,
memory=True
)
def create_reviewer_agent(self) -> Agent:
"""Create a quality assurance reviewer agent"""
return Agent(
role="Quality Assurance Reviewer",
goal="Review, validate, and improve the quality of all deliverables",
backstory="""You are a meticulous quality assurance expert with an eye for detail
and a commitment to excellence. You ensure all work meets high standards of accuracy,
completeness, and clarity before final delivery.""",
verbose=True,
allow_delegation=False,
llm=self.llm,
max_iter=2,
memory=True
)
def create_supervisor_agent(self) -> Agent:
"""Create the main supervisor agent"""
return Agent(
role="Project Supervisor & Coordinator",
goal="Coordinate team efforts, manage workflows, and ensure project success",
backstory="""You are an experienced project supervisor with expertise in team
coordination, workflow optimization, and quality management. You ensure that all
team members work efficiently towards common goals and maintain high standards
throughout the project lifecycle.""",
verbose=True,
allow_delegation=True,
llm=self.llm,
max_iter=2,
memory=True
)
def setup_agents(self):
"""Initialize all agents in the framework"""
print("🤖 Setting up specialized agents...")
self.agents = {
'researcher': self.create_research_agent(),
'analyst': self.create_analyst_agent(),
'writer': self.create_writer_agent(),
'reviewer': self.create_reviewer_agent()
}
self.supervisor = self.create_supervisor_agent()
print(f"✅ Created {len(self.agents)} specialized agents + 1 supervisor")
for role, agent in self.agents.items():
print(f" └── {role.title()}: {agent.role}")
def create_task_workflow(self, topic: str, task_configs: Dict[str, TaskConfig]) -> List[Task]:
"""
Create a comprehensive task workflow
Args:
topic: Main topic/project focus
task_configs: Dictionary of task configurations
Returns:
List of CrewAI Task objects
"""
tasks = []
if 'research' in task_configs:
config = task_configs['research']
research_task = Task(
description=f"{config.description} Focus on: {topic}",
expected_output=config.expected_output,
agent=self.agents['researcher']
)
tasks.append(research_task)
if 'analysis' in task_configs:
config = task_configs['analysis']
analysis_task = Task(
description=f"{config.description} Analyze the research findings about: {topic}",
expected_output=config.expected_output,
agent=self.agents['analyst'],
context=tasks
)
tasks.append(analysis_task)
if 'writing' in task_configs:
config = task_configs['writing']
writing_task = Task(
description=f"{config.description} Create content about: {topic}",
expected_output=config.expected_output,
agent=self.agents['writer'],
context=tasks
)
tasks.append(writing_task)
if 'review' in task_configs:
config = task_configs['review']
review_task = Task(
description=f"{config.description} Review all work related to: {topic}",
expected_output=config.expected_output,
agent=self.agents['reviewer'],
context=tasks
)
tasks.append(review_task)
supervisor_task = Task(
description=f"""As the project supervisor, coordinate the entire workflow for: {topic}.
Monitor progress, ensure quality standards, resolve any conflicts between agents,
and provide final project oversight. Ensure all deliverables meet requirements.""",
expected_output="""A comprehensive project summary including:
- Executive summary of all completed work
- Quality assessment of deliverables
- Recommendations for improvements or next steps
- Final project status report""",
agent=self.supervisor,
context=tasks
)
tasks.append(supervisor_task)
return tasks
def execute_project(self,
topic: str,
task_configs: Dict[str, TaskConfig],
process_type: Process = Process.hierarchical) -> Dict[str, Any]:
"""
Execute a complete project using the supervisor framework
Args:
topic: Main project topic
task_configs: Task configurations
process_type: CrewAI process type (hierarchical recommended for supervisor)
Returns:
Dictionary containing execution results
"""
print(f"🚀 Starting project execution: {topic}")
print(f"📋 Process type: {process_type.value}")
if not self.agents or not self.supervisor:
self.setup_agents()
tasks = self.create_task_workflow(topic, task_configs)
print(f"📝 Created {len(tasks)} tasks in workflow")
crew_agents = list(self.agents.values()) + [self.supervisor]
self.crew = Crew(
agents=crew_agents,
tasks=tasks,
process=process_type,
manager_llm=self.llm,
verbose=True,
memory=True
)
print("🎯 Executing project...")
try:
result = self.crew.kickoff()
return {
'status': 'success',
'result': result,
'topic': topic,
'tasks_completed': len(tasks),
'agents_involved': len(crew_agents)
}
except Exception as e:
print(f"❌ Error during execution: {str(e)}")
return {
'status': 'error',
'error': str(e),
'topic': topic
}
def get_crew_usage_metrics(self) -> Dict[str, Any]:
"""Get usage metrics from the crew"""
if not self.crew:
return {'error': 'No crew execution found'}
try:
return {
'total_tokens_used': getattr(self.crew, 'total_tokens_used', 'Not available'),
'total_cost': getattr(self.crew, 'total_cost', 'Not available'),
'execution_time': getattr(self.crew, 'execution_time', 'Not available')
}
except:
return {'note': 'Metrics not available for this execution'}
def create_sample_task_configs() -> Dict[str, TaskConfig]:
"""Create sample task configurations for demonstration"""
return {
'research': TaskConfig(
description="Conduct comprehensive research on the given topic. Gather information from reliable sources and compile key findings.",
expected_output="A detailed research report with key findings, statistics, trends, and source references (minimum 500 words).",
priority=TaskPriority.HIGH
),
'analysis': TaskConfig(
description="Analyze the research findings to identify patterns, insights, and implications.",
expected_output="An analytical report highlighting key insights, trends, opportunities, and potential challenges (minimum 400 words).",
priority=TaskPriority.HIGH
),
'writing': TaskConfig(
description="Create a comprehensive, well-structured document based on the research and analysis.",
expected_output="A professional document with clear structure, engaging content, and actionable recommendations (minimum 800 words).",
priority=TaskPriority.MEDIUM
),
'review': TaskConfig(
description="Review all deliverables for quality, accuracy, completeness, and coherence.",
expected_output="A quality assessment report with recommendations for improvements and final approval status.",
priority=TaskPriority.CRITICAL
)
}
def demo_supervisor_framework():
"""
Demo function to showcase the supervisor framework
Replace 'your_gemini_api_key' with your actual API key
"""
print("🎬 CrewAI Supervisor Framework Demo")
print("=" * 50)
framework = SupervisorFramework(
gemini_api_key="Use Your API Key Here",
serper_api_key=None
)
topic = "The Future of Artificial Intelligence in Healthcare"
task_configs = create_sample_task_configs()
print(f"📊 Demo Topic: {topic}")
print(f"📋 Task Configurations: {list(task_configs.keys())}")
results = framework.execute_project(topic, task_configs)
print("\n" + "=" * 50)
print("📈 EXECUTION RESULTS")
print("=" * 50)
if results['status'] == 'success':
print(f"✅ Status: {results['status'].upper()}")
print(f"📝 Tasks Completed: {results['tasks_completed']}")
print(f"🤖 Agents Involved: {results['agents_involved']}")
print(f"📄 Final Result Preview: {str(results['result'])[:200]}...")
else:
print(f"❌ Status: {results['status'].upper()}")
print(f"🚫 Error: {results['error']}")
metrics = framework.get_crew_usage_metrics()
print(f"\n💰 Usage Metrics: {metrics}")
if __name__ == "__main__":
demo_supervisor_framework()