Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 126 additions & 31 deletions flo_ai/examples/arium_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,22 @@
from flo_ai.llm import OpenAI
from flo_ai.models import TextMessageContent, UserMessage
from flo_ai.models.agent import Agent
from flo_ai.tool import flo_tool
from flo_ai.tool.base_tool import Tool
from flo_ai.arium.memory import MessageMemory
from flo_ai.arium.nodes import FunctionNode
from flo_ai.arium.memory import MessageMemory, MessageMemoryItem
from flo_ai.models import BaseMessage
from typing import List


@flo_tool(
description='Calculate mathematical expressions',
parameter_descriptions={
'result': 'The result to print',
},
)
async def print_result(result: str) -> str:
print(f'Result: {result}')
return result


# Example 1: Simple Linear Workflow
async def example_linear_workflow():
"""Example of a simple linear workflow: Agent -> Tool -> Agent"""
"""Example of a simple linear workflow: Agent -> FunctionNode -> Agent"""

# Create some example agents and tools (these would be your actual implementations)
# Create some example agents and function nodes (these would be your actual implementations)
analyzer_agent = Agent(
name='analyzer',
system_prompt='Analyze the input',
Expand All @@ -38,16 +33,19 @@ async def example_linear_workflow():
system_prompt='Summarize the results',
llm=OpenAI(model='gpt-4o-mini'),
)
processing_function_node = FunctionNode(
name='processor', description='Process the input', function=print_result
)

# Build and run the workflow
result = await (
AriumBuilder()
.add_agent(analyzer_agent)
.add_tool(print_result.tool)
.add_function_node(processing_function_node)
.add_agent(summarizer_agent)
.start_with(analyzer_agent)
.connect(analyzer_agent, print_result.tool)
.connect(print_result.tool, summarizer_agent)
.connect(analyzer_agent, processing_function_node)
.connect(processing_function_node, summarizer_agent)
.end_with(summarizer_agent)
.build_and_run([UserMessage(TextMessageContent(text='Analyze this text'))])
)
Expand All @@ -59,10 +57,14 @@ async def example_linear_workflow():
async def example_branching_workflow():
"""Example of a branching workflow with conditional routing"""

# Create agents and tools
# Create agents and function nodes
classifier_agent = Agent(name='classifier', prompt='Classify the input type')
text_processor = Tool(name='text_processor')
image_processor = Tool(name='image_processor')
text_processor_node = FunctionNode(
name='text_processor', description='Process text', function=lambda x: x
)
image_processor_node = FunctionNode(
name='image_processor', description='Process image', function=lambda x: x
)
final_agent = Agent(name='final', prompt='Provide final response')

# Router function for conditional branching
Expand All @@ -77,32 +79,49 @@ def content_router(memory) -> Literal['text_processor', 'image_processor']:
result = await (
AriumBuilder()
.add_agent(classifier_agent)
.add_tool(text_processor)
.add_tool(image_processor)
.add_agent(final_agent)
.add_function_node(
FunctionNode(
name='text_processor', description='Process text', function=lambda x: x
)
)
.add_function_node(
FunctionNode(
name='image_processor',
description='Process image',
function=lambda x: x,
)
)
.start_with(classifier_agent)
.add_edge(classifier_agent, [text_processor, image_processor], content_router)
.connect(text_processor, final_agent)
.connect(image_processor, final_agent)
.add_edge(
classifier_agent,
[text_processor_node, image_processor_node],
content_router,
)
.connect(text_processor_node, final_agent)
.connect(image_processor_node, final_agent)
.end_with(final_agent)
.build_and_run(['Process this content'])
)

return result


# Example 3: Complex Multi-Agent Workflow
async def example_complex_workflow():
"""Example of a more complex workflow with multiple agents and tools"""
"""Example of a more complex workflow with multiple agents and function nodes"""

# Create multiple agents and tools
# Create multiple agents and function nodes
input_agent = Agent(name='input_handler', prompt='Handle initial input')
researcher_agent = Agent(name='researcher', prompt='Research the topic')
analyzer_agent = Agent(name='analyzer', prompt='Analyze findings')
writer_agent = Agent(name='writer', prompt='Write the final report')

search_tool = Tool(name='search_tool')
data_tool = Tool(name='data_processor')
search_function_node = FunctionNode(
name='search_function', description='Search the web', function=lambda x: x
)
data_function_node = FunctionNode(
name='data_function', description='Process the data', function=lambda x: x
)

# Router for deciding next step after analysis
def analysis_router(memory) -> Literal['writer', 'researcher']:
Expand All @@ -114,13 +133,13 @@ def analysis_router(memory) -> Literal['writer', 'researcher']:
arium = (
AriumBuilder()
.add_agents([input_agent, researcher_agent, analyzer_agent, writer_agent])
.add_tools([search_tool, data_tool])
.add_function_nodes([search_function_node, data_function_node])
.with_memory(MessageMemory())
.start_with(input_agent)
.connect(input_agent, researcher_agent)
.connect(researcher_agent, search_tool)
.connect(search_tool, data_tool)
.connect(data_tool, analyzer_agent)
.connect(researcher_agent, search_function_node)
.connect(search_function_node, data_function_node)
.connect(data_function_node, analyzer_agent)
.add_edge(analyzer_agent, [writer_agent, researcher_agent], analysis_router)
.end_with(writer_agent)
.build()
Expand Down Expand Up @@ -177,6 +196,78 @@ async def example_build_and_reuse():
return result1, result2


# Example 6: Four FunctionNodes with input filtering (no agents)
async def example_function_nodes_with_filters():
"""Workflow of only FunctionNodes; each uses input_filter to read from specific nodes."""

# Define simple functions as nodes
async def pass_through(inputs: List[BaseMessage] | str, variables=None, **kwargs):
return inputs[-1].content

async def capitalize_last(
inputs: List[BaseMessage] | str, variables=None, **kwargs
):
return str(inputs[-1].content).capitalize()

async def uppercase_all(inputs: List[BaseMessage] | str, variables=None, **kwargs):
return ' '.join([str(x.content).upper() for x in inputs])

async def summarize(inputs: List[BaseMessage] | str, variables=None, **kwargs):
return f"count={len(inputs or [])} last={(str(inputs[-1].content) if inputs else '')}"

# Create four FunctionNodes with input filters
t1 = FunctionNode(
name='function1',
description='reads initial inputs',
function=pass_through,
input_filter=['input'],
)
t2 = FunctionNode(
name='function2',
description='reads function1 only',
function=capitalize_last,
input_filter=['function1'],
)
t3 = FunctionNode(
name='function3',
description='reads function2 only',
function=uppercase_all,
input_filter=['function2'],
)
t4 = FunctionNode(
name='function4',
description='reads function1 & function3',
function=summarize,
input_filter=['function1', 'function3'],
)

# Build and run: function1 -> function2 -> function3 -> function4
state = 1

def router(memory: MessageMemory) -> Literal['function2', 'function4']:
nonlocal state
if state == 1:
state = 2
return 'function2'
else:
state = 1
return 'function4'

result = await (
AriumBuilder()
.with_memory(MessageMemory())
.add_function_nodes([t1, t2, t3, t4])
.start_with(t1)
.add_edge(t1, [t2, t4], router=router)
.connect(t2, t3)
.connect(t3, t1)
.end_with(t4)
.build_and_run(['hello world'])
)

return result


if __name__ == '__main__':
import asyncio

Expand All @@ -190,6 +281,10 @@ async def main():
# result3 = await example_complex_workflow()
# result4 = await example_convenience_function()
# result5 = await example_build_and_reuse()
result6: List[MessageMemoryItem] = await example_function_nodes_with_filters()

print(result6[-1].result.content)

print('Examples completed!')

asyncio.run(main())
Loading
Loading