Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/praisonai-agents/praisonaiagents/bots/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class BotConfig:

# Owner user ID for pairing approvals (platform-specific format)
owner_user_id: Optional[str] = None

# Progressive streaming for channel bots (default: False)
streaming: bool = False

# Edit interval for streaming responses in milliseconds (default: 700ms)
stream_edit_interval_ms: int = 700

def __post_init__(self) -> None:
if self.unknown_user_policy not in {"deny", "pair", "allow"}:
Expand Down Expand Up @@ -153,6 +159,8 @@ def to_dict(self) -> Dict[str, Any]:
"workspace_scope": self.workspace_scope,
"unknown_user_policy": self.unknown_user_policy,
"owner_user_id": "***" if self.owner_user_id else None,
"streaming": self.streaming,
"stream_edit_interval_ms": self.stream_edit_interval_ms,
"metadata": self.metadata,
}

Expand Down
107 changes: 79 additions & 28 deletions src/praisonai/praisonai/bots/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,15 @@ def __init__(
self._agent = agent
self.config = config or BotConfig(token=token)

# Initialize streaming config (None means streaming disabled)
self._streaming_config = None
# Initialize streaming config based on BotConfig
if self.config.streaming:
self._streaming_config = StreamingConfig(
mode=StreamingMode.DRAFT,
min_interval=self.config.stream_edit_interval_ms / 1000.0, # Convert ms to seconds
min_delta=50, # Reasonable default for character delta
)
else:
self._streaming_config = None
self._rate_limiter = RateLimiter.for_platform("telegram")

self._is_running = False
Expand Down Expand Up @@ -271,33 +278,77 @@ async def _tg_unreact(emoji, **kw):
)

# Start streaming (send placeholder)
await streamer.start()

# Get response with streaming callback
response = await self._session.chat(
self._agent, user_id, message_text,
chat_id=str(update.message.chat_id) if update.message.chat_id else "",
user_name=user_name,
message_id=str(update.message.message_id),
account=self.config.get("account", "default"),
stream_callback=streamer.on_event,
)

# Apply message hooks to final response (same as non-streaming path)
send_result = self.fire_message_sending(
str(update.message.chat_id), str(response),
reply_to=str(update.message.message_id),
)
if send_result["cancel"]:
return

# Finalize with complete response (after hook processing)
await streamer.finalize(send_result["content"])
placeholder_message_id = await streamer.start()

# Fire sent hooks
self.fire_message_sent(
str(update.message.chat_id), send_result["content"],
)
try:
# Get response with streaming callback - include message_id and account for durability
response = await self._session.chat(
self._agent, user_id, message_text,
chat_id=str(update.message.chat_id) if update.message.chat_id else "",
user_name=user_name,
message_id=str(update.message.message_id),
account=self.config.get("account", "default"),
stream_callback=streamer.on_event,
)

# Apply message hooks to final response (same as non-streaming path)
send_result = self.fire_message_sending(
str(update.message.chat_id), str(response),
reply_to=str(update.message.message_id),
)
if send_result["cancel"]:
# Cancel: delete the placeholder message
try:
await self.delete_message(
str(update.message.chat_id), placeholder_message_id
)
except Exception:
pass # Ignore deletion errors
return

# Handle media content before finalizing (extract MEDIA: markers)
from .media import split_media_from_output
parsed = split_media_from_output(send_result["content"])
text_content = parsed["text"]
media_urls = parsed.get("media_urls", [])

# Finalize with text content (after hook processing and media extraction)
await streamer.finalize(text_content if text_content else send_result["content"])
Comment on lines +315 to +316

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 When the agent response contains only media (no text), split_media_from_output returns an empty string for parsed["text"], causing the fallback to send_result["content"] which still contains the raw MEDIA:/path/to/file markers. Those markers are then passed to edit_message_text, so users see MEDIA:... as literal text rather than receiving clean output. The non-streaming path handles this correctly by simply skipping the send_long_message call when text is empty — the streaming path should finalize with an empty string in that case.

Suggested change
# Finalize with text content (after hook processing and media extraction)
await streamer.finalize(text_content if text_content else send_result["content"])
# Finalize with text content (after hook processing and media extraction)
# If the response is media-only, finalize with empty string rather than
# leaking raw MEDIA: markers into the edited message.
await streamer.finalize(text_content if text_content else "")


# Send media files separately (same as non-streaming path)
if media_urls:
for media_path in media_urls:
if os.path.exists(media_path):
try:
from .media import is_audio_file
if is_audio_file(media_path):
with open(media_path, "rb") as f:
if parsed.get("audio_as_voice", False):
await self._application.bot.send_voice(
chat_id=update.message.chat_id, voice=f
)
else:
await self._application.bot.send_audio(
chat_id=update.message.chat_id, audio=f
)
except Exception as e:
logger.error(f"Failed to send media: {e}")

# Fire sent hooks
self.fire_message_sent(
str(update.message.chat_id), send_result["content"],
)

except Exception as agent_error:
# Agent failed: clean up placeholder message
try:
await self.delete_message(
str(update.message.chat_id), placeholder_message_id
)
except Exception:
pass # Ignore deletion errors
# Re-raise the original error to be handled below
raise agent_error

else:
# Legacy non-streaming path
Expand Down
4 changes: 4 additions & 0 deletions src/praisonai/praisonai/cli/commands/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def bot_telegram(
auto_tts: bool = typer.Option(False, "--auto-tts", help="Auto-convert all responses to speech"),
stt: bool = typer.Option(False, "--stt", help="Enable STT tool for speech-to-text"),
stt_model: Optional[str] = typer.Option(None, "--stt-model", help="STT model (default: openai/whisper-1)"),
stream: bool = typer.Option(False, "--stream", help="Enable progressive streaming responses (edit messages live)"),
stream_edit_interval: int = typer.Option(700, "--stream-edit-interval", help="Minimum interval between message edits in milliseconds"),
):
"""Start a Telegram bot with full agent capabilities.

Expand Down Expand Up @@ -109,6 +111,8 @@ def bot_telegram(
auto_tts=auto_tts,
stt=stt,
stt_model=stt_model,
stream=stream,
stream_edit_interval=stream_edit_interval,
session_id=session_id,
user_id=user_id,
)
Expand Down
19 changes: 18 additions & 1 deletion src/praisonai/praisonai/cli/features/bots_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class BotCapabilities:
stt: bool = False # Enable STT tool
stt_model: Optional[str] = None # STT model (default: openai/whisper-1)

# Streaming
stream: bool = False # Enable progressive streaming responses
stream_edit_interval: int = 700 # Minimum interval between message edits (ms)

# Session
session_id: Optional[str] = None
user_id: Optional[str] = None
Expand Down Expand Up @@ -104,6 +108,8 @@ def to_dict(self) -> Dict[str, Any]:
"auto_tts": self.auto_tts,
"stt": self.stt,
"stt_model": self.stt_model,
"stream": self.stream,
"stream_edit_interval": self.stream_edit_interval,
"session_id": self.session_id,
"user_id": self.user_id,
}
Expand Down Expand Up @@ -228,6 +234,8 @@ def resolve_env_vars(value):
tools=agent_config.get("tools", []) or [],
model=agent_config.get("llm"),
auto_approve=agent_config.get("auto_approve", False),
stream=config.get("streaming", False),
stream_edit_interval=config.get("stream_edit_interval_ms", 700),
)

# Write a temporary agent YAML for _load_agent (reuse existing logic)
Expand Down Expand Up @@ -343,7 +351,16 @@ def start_telegram(
return

agent = self._load_agent(agent_file, capabilities, agent_config_dict=agent_config_dict)
bot = TelegramBot(token=token, agent=agent)

# Create bot config with streaming settings
from praisonaiagents.bots import BotConfig
bot_config = BotConfig(
token=token,
streaming=capabilities.stream if capabilities else False,
stream_edit_interval_ms=capabilities.stream_edit_interval if capabilities else 700,
)

bot = TelegramBot(token=token, agent=agent, config=bot_config)

self._print_startup_info("Telegram", capabilities)

Expand Down
Loading