from collections.abc import AsyncGenerator, Callable
from astrbot import logger
from astrbot.core.message.components import At, AtAll
from astrbot.core.message.message_event_result import MessageChain, MessageEventResult
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.platform.message_type import MessageType
from astrbot.core.star.filter.command import CommandFilter
from astrbot.core.star.filter.command_group import CommandGroupFilter
from astrbot.core.star.filter.permission import PermissionTypeFilter
from astrbot.core.star.session_plugin_manager import SessionPluginManager
from astrbot.core.star.star import star_map
from astrbot.core.star.star_handler import EventType, star_handlers_registry
from ..context import PipelineContext
from ..stage import Stage, register_stage
UNIQUE_SESSION_ID_BUILDERS: dict[str, Callable[[AstrMessageEvent], str | None]] = {
"aiocqhttp": lambda e: f"{e.get_sender_id()}_{e.get_group_id()}",
"slack": lambda e: f"{e.get_sender_id()}_{e.get_group_id()}",
"dingtalk": lambda e: e.get_sender_id(),
"qq_official": lambda e: e.get_sender_id(),
"qq_official_webhook": lambda e: e.get_sender_id(),
"lark": lambda e: f"{e.get_sender_id()}%{e.get_group_id()}",
"misskey": lambda e: f"{e.get_session_id()}_{e.get_sender_id()}",
"matrix": lambda e: f"{e.get_sender_id()}_{e.get_group_id() or e.get_session_id()}",
}
def build_unique_session_id(event: AstrMessageEvent) -> str | None:
platform = event.get_platform_name()
builder = UNIQUE_SESSION_ID_BUILDERS.get(platform)
return builder(event) if builder else None
@register_stage
class WakingCheckStage(Stage):
"""检查是否需要唤醒。
本版本采用“兼容普通监听器”的群聊唤醒策略:
1. 以 wake_prefix 开头的文本仍先作为插件指令候选,例如 /help。
但不再限制只能由标准指令 handler 接收;普通监听器也会按自己的 filter 判断。
2. 群聊普通 LLM 对话只在明确 @ 机器人或 /chat 时触发。
3. 群聊中的普通文本、未知 /xxx、引用机器人消息本身不会直接唤醒 LLM。
@全体成员消息会按 ignore_at_all 配置决定是否唤醒。
4. 私聊是否需要唤醒词由 WebUI 的 friend_message_needs_wake_prefix 决定。
5. 已知 /xxx 会放行给普通监听器,兼容 ChatPlus 等插件的指令标记 handler;
未知 /xxx 会在本阶段终止传播。
"""
async def initialize(self, ctx: PipelineContext) -> None:
"""初始化唤醒检查阶段
Args:
ctx (PipelineContext): 消息管道上下文对象, 包括配置和插件管理器
"""
self.ctx = ctx
self.no_permission_reply = self.ctx.astrbot_config["platform_settings"].get(
"no_permission_reply",
True,
)
# 私聊是否需要 wake_prefix 才能唤醒机器人
self.friend_message_needs_wake_prefix = self.ctx.astrbot_config[
"platform_settings"
].get("friend_message_needs_wake_prefix", False)
# 是否忽略机器人自己发送的消息
self.ignore_bot_self_message = self.ctx.astrbot_config["platform_settings"].get(
"ignore_bot_self_message",
False,
)
self.ignore_at_all = self.ctx.astrbot_config["platform_settings"].get(
"ignore_at_all",
False,
)
self.disable_builtin_commands = self.ctx.astrbot_config.get(
"disable_builtin_commands", False
)
platform_settings = self.ctx.astrbot_config.get("platform_settings", {})
self.unique_session = platform_settings.get("unique_session", False)
@staticmethod
def _split_command_root(text: str) -> tuple[str, str]:
text = (text or "").strip()
if not text:
return "", ""
parts = text.split(None, 1)
if len(parts) == 1:
return parts[0], ""
return parts[0], parts[1].strip()
def _find_manual_command_modules(
self,
command_root: str,
*,
is_admin: bool,
) -> set[str]:
root = str(command_root or "").strip().lower()
if not root:
return set()
plugin_context = getattr(self.ctx.plugin_manager, "context", None)
if plugin_context is None:
return set()
matched_modules: set[str] = set()
for plugin in plugin_context.get_all_stars():
if not getattr(plugin, "activated", True):
continue
star_obj = getattr(plugin, "star_cls", None)
getter = getattr(star_obj, "get_manual_command_specs", None)
if not callable(getter):
continue
try:
specs = getter() or []
except Exception:
continue
module_path = getattr(plugin, "module_path", None) or getattr(
star_obj,
"__module__",
"",
)
if not module_path:
continue
for spec in specs:
if not isinstance(spec, dict):
continue
if bool(spec.get("admin_only", False)) and not is_admin:
continue
roots = [
str(item or "").strip().lower()
for item in (spec.get("match_roots") or [])
if str(item or "").strip()
]
if not roots:
command_text = str(spec.get("command", "") or "").strip()
if command_text.startswith("/"):
root_text, _ = self._split_command_root(command_text[1:].strip())
if root_text:
roots = [root_text.strip().lower()]
if root in roots:
matched_modules.add(str(module_path))
break
return matched_modules
@staticmethod
def _command_name_matches(message_str: str, command_name: str) -> bool:
message = " ".join(str(message_str or "").strip().split())
command = " ".join(str(command_name or "").strip().split())
if not message or not command:
return False
return message == command or message.startswith(f"{command} ")
def _handler_declares_command(
self,
handler,
message_str: str,
) -> bool:
for filter_ in handler.event_filters:
if isinstance(filter_, CommandFilter):
for command_name in filter_.get_complete_command_names():
if self._command_name_matches(message_str, command_name):
return True
elif isinstance(filter_, CommandGroupFilter):
for command_name in filter_.get_complete_command_names():
if self._command_name_matches(message_str, command_name):
return True
return False
def _has_registered_command_handler(
self,
handlers,
message_str: str,
) -> bool:
return any(
self._handler_declares_command(handler, message_str)
for handler in handlers
)
async def process(
self,
event: AstrMessageEvent,
) -> None | AsyncGenerator[None, None]:
# apply unique session
if self.unique_session and event.message_obj.type == MessageType.GROUP_MESSAGE:
sid = build_unique_session_id(event)
if sid:
event.session_id = sid
# ignore bot self message
if (
self.ignore_bot_self_message
and event.get_self_id() == event.get_sender_id()
):
event.stop_event()
return
# 设置 sender 身份
event.message_str = event.message_str.strip()
original_message_str = event.message_str
try:
event.set_extra("astrbot_original_message_str", original_message_str)
except Exception:
pass
for admin_id in self.ctx.astrbot_config["admins_id"]:
if str(event.get_sender_id()) == admin_id:
event.role = "admin"
break
# 检查 wake
wake_prefixes = self.ctx.astrbot_config["wake_prefix"]
messages = event.get_messages()
is_wake = False
is_wake_prefix_command = False
wake_prefix = ""
manual_command_modules: set[str] = set()
# 1) /help、/sid、/zlib search 等:只作为“指令候选”。
# 这里先剥掉 wake_prefix,并设置 is_at_or_wake_command,保证原有 CommandFilter 能正确匹配。
for candidate_prefix in wake_prefixes:
if not candidate_prefix:
continue
if event.message_str.startswith(candidate_prefix):
if (
not event.is_private_chat()
and messages
and isinstance(messages[0], At)
and str(messages[0].qq) != str(event.get_self_id())
and str(messages[0].qq) != "all"
):
# 如果是群聊,且第一个消息段是 At 消息,但不是 At 机器人或 At 全体成员,则不唤醒
break
wake_prefix = candidate_prefix
is_wake_prefix_command = True
event.is_at_or_wake_command = True
event.message_str = event.message_str[len(candidate_prefix) :].strip()
command_root, command_rest = self._split_command_root(event.message_str)
if command_root.lower() == "chat":
is_wake = True
event.is_wake = True
event.message_str = command_rest
else:
manual_command_modules = self._find_manual_command_modules(
command_root,
is_admin=getattr(event, "role", "") == "admin",
)
break
# 2) 非 / 指令候选时,群聊需要明确 @ 机器人;私聊则继续尊重 WebUI 的
# friend_message_needs_wake_prefix 配置,关闭时允许直接进入 LLM 对话。
if not is_wake_prefix_command:
for message in messages:
if (
isinstance(message, At)
and str(message.qq) == str(event.get_self_id())
) or (isinstance(message, AtAll) and not self.ignore_at_all):
is_wake = True
event.is_wake = True
event.is_at_or_wake_command = True
wake_prefix = ""
break
if (
not is_wake
and event.is_private_chat()
and not self.friend_message_needs_wake_prefix
):
is_wake = True
event.is_wake = True
event.is_at_or_wake_command = True
wake_prefix = ""
# 将 plugins_name 设置到 event 中
enabled_plugins_name = self.ctx.astrbot_config.get("plugin_set", ["*"])
if enabled_plugins_name == ["*"]:
# 如果是 *,则表示所有插件都启用
event.plugins_name = None
else:
event.plugins_name = enabled_plugins_name
logger.debug(f"enabled_plugins_name: {enabled_plugins_name}")
candidate_handlers = []
for handler in star_handlers_registry.get_handlers_by_event_type(
EventType.AdapterMessageEvent,
plugins_name=event.plugins_name,
):
if (
self.disable_builtin_commands
and handler.handler_module_path
== "astrbot.builtin_stars.builtin_commands.main"
):
continue
candidate_handlers.append(handler)
registered_command_known = False
manual_command_known = bool(manual_command_modules)
known_wake_prefix_command = is_wake
if is_wake_prefix_command:
registered_command_known = self._has_registered_command_handler(
candidate_handlers,
event.message_str,
)
known_wake_prefix_command = (
is_wake or registered_command_known or manual_command_known
)
event.set_extra(
"astrbot_known_wake_prefix_command",
known_wake_prefix_command,
)
# 检查插件的 handler filter
activated_handlers = []
handlers_parsed_params = {} # 注册了指令的 handler
for handler in candidate_handlers:
is_command_handler = any(
isinstance(f, CommandFilter | CommandGroupFilter)
for f in handler.event_filters
)
is_manual_command_handler = (
bool(manual_command_modules)
and handler.handler_module_path in manual_command_modules
)
if is_wake_prefix_command and not known_wake_prefix_command:
if not is_command_handler:
continue
elif (
is_wake_prefix_command
and manual_command_known
and not registered_command_known
and not is_wake
and not is_command_handler
and not is_manual_command_handler
):
continue
# filter 需满足 AND 逻辑关系
passed = True
permission_not_pass = False
permission_filter_raise_error = False
if len(handler.event_filters) == 0:
continue
# /xxx 只在命中已注册命令或已声明的手动命令时放行给普通监听器。
# 未知 /xxx 不再激活 EventMessageType.ALL 这类监听器,末尾会 stop_event()。
for filter in handler.event_filters:
try:
if isinstance(filter, PermissionTypeFilter):
if not filter.filter(event, self.ctx.astrbot_config):
permission_not_pass = True
permission_filter_raise_error = filter.raise_error
elif not filter.filter(event, self.ctx.astrbot_config):
passed = False
break
except Exception as e:
await event.send(
MessageEventResult().message(
f"插件 {star_map[handler.handler_module_path].name}: {e}",
),
)
event.stop_event()
passed = False
break
if passed:
if permission_not_pass:
event._extras.pop("parsed_params", None)
if not permission_filter_raise_error:
# 跳过
continue
if self.no_permission_reply:
await event.send(
MessageChain().message(
f"您(ID: {event.get_sender_id()})的权限不足以使用此指令。通过 /sid 获取 ID 并请管理员添加。",
),
)
logger.info(
f"触发 {star_map[handler.handler_module_path].name} 时, 用户(ID={event.get_sender_id()}) 权限不足。",
)
event.stop_event()
return
is_wake = True
event.is_wake = True
is_group_cmd_handler = any(
isinstance(f, CommandGroupFilter) for f in handler.event_filters
)
if not is_group_cmd_handler:
activated_handlers.append(handler)
if "parsed_params" in event.get_extra(default={}):
handlers_parsed_params[handler.handler_full_name] = (
event.get_extra("parsed_params")
)
event._extras.pop("parsed_params", None)
# 根据会话配置过滤插件处理器
activated_handlers = await SessionPluginManager.filter_handlers_by_session(
event,
activated_handlers,
)
event.set_extra("activated_handlers", activated_handlers)
event.set_extra("handlers_parsed_params", handlers_parsed_params)
if not is_wake:
if is_wake_prefix_command:
logger.debug(
f"忽略未知指令或未命中指令: {original_message_str!r}, prefix={wake_prefix!r}, "
f"registered={registered_command_known}, manual={manual_command_known}",
)
event.stop_event()
Description / 描述
可以把此处代码Lib\site-packages\astrbot\core\pipeline\waking_check\stage.py
改为:
Use Case / 使用场景
/已知指令,执行对应指令
/未知指令,保持静默,与普通命令一致
不再依赖/chat
Willing to Submit PR? / 是否愿意提交PR?
Code of Conduct