diff --git a/.gitignore b/.gitignore index 5eb9616c8c..0a7f2824a7 100644 --- a/.gitignore +++ b/.gitignore @@ -62,5 +62,6 @@ GenieData/ .opencode/ .kilocode/ .worktrees/ +.codegraph/ dashboard/bun.lock diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index b10d2424b5..b73770b498 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -88,6 +88,10 @@ ], # 当 split_mode 为 words 时使用 "content_cleanup_rule": "", }, + "message_debounce": { + "enable": False, + "interval": 2, + }, "no_permission_reply": True, "empty_mention_waiting": True, "empty_mention_waiting_need_reply": True, @@ -1067,6 +1071,19 @@ }, }, }, + "message_debounce": { + "type": "object", + "items": { + "enable": { + "type": "bool", + "hint": "启用后,短时间内来自同一会话的多条消息会合并后再交给LLM处理,避免图片和文字分开到达时上下文割裂。", + }, + "interval": { + "type": "int", + "hint": "防抖等待时间(秒)。在此时间内若收到同一会话的新消息,则等待时间重置。超时后所有消息合并为一条再处理。推荐2-3秒。", + }, + }, + }, "reply_prefix": { "type": "string", "hint": "机器人回复消息时带有的前缀。", @@ -4135,6 +4152,20 @@ }, }, }, + "message_debounce": { + "description": "消息防抖", + "type": "object", + "items": { + "platform_settings.message_debounce.enable": { + "description": "启用消息防抖", + "type": "bool", + }, + "platform_settings.message_debounce.interval": { + "description": "防抖间隔(秒)", + "type": "int", + }, + }, + }, "ltm": { "description": "群聊上下文感知(原聊天记忆增强)", "type": "object", diff --git a/astrbot/core/event_bus.py b/astrbot/core/event_bus.py index 70b5f054ed..ce5010e1d0 100644 --- a/astrbot/core/event_bus.py +++ b/astrbot/core/event_bus.py @@ -4,22 +4,129 @@ class: EventBus: 事件总线, 用于处理事件的分发和处理 + DebounceManager: 消息防抖管理器, 短时间内的同会话消息合并为一条再调度 工作流程: 1. 维护一个异步队列, 来接受各种消息事件 2. 无限循环的调度函数, 从事件队列中获取新的事件, 打印日志并创建一个新的异步任务来执行管道调度器的处理逻辑 + - 若启用了消息防抖, 则交由 DebounceManager 聚合后统一执行 """ import asyncio +import copy from asyncio import Queue from astrbot.core import logger from astrbot.core.astrbot_config_mgr import AstrBotConfigManager +from astrbot.core.message.components import At, Reply from astrbot.core.pipeline.scheduler import PipelineScheduler from .platform import AstrMessageEvent +class DebounceManager: + """消息防抖管理器:短时间内的同会话消息合并为一条再调度。 + + 当 message_debounce.enable = true 时, + 同一 unified_msg_origin 的连续消息会在 interval 秒内聚合, + 计时器被新消息重置,超时后合并所有消息一次性交给 scheduler 执行。 + """ + + def __init__( + self, + config_mgr: AstrBotConfigManager, + scheduler_mapping: dict[str, PipelineScheduler], + ) -> None: + self._config_mgr = config_mgr + self._scheduler_mapping = scheduler_mapping + self._tasks: dict[str, asyncio.Task] = {} + self._buffers: dict[str, list[AstrMessageEvent]] = {} + + async def push(self, event: AstrMessageEvent) -> None: + """推送事件到防抖管理器。若同会话已有等待计时器则重置, 否则创建新计时器。""" + origin = event.unified_msg_origin + + # Cancel existing timer if any + existing = self._tasks.get(origin) + if existing is not None and not existing.done(): + existing.cancel() + logger.debug(f"[Debounce] 重置计时器: {origin}") + + # Get interval from config + conf = self._config_mgr.get_conf(origin) + debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {}) + interval: int = max(1, int(debounce_cfg.get("interval", 2))) + + # Buffer + if origin not in self._buffers: + self._buffers[origin] = [] + self._buffers[origin].append(event) + + # Start new timer + self._tasks[origin] = asyncio.create_task( + self._flush_after_delay(origin, interval), + ) + + async def _flush_after_delay(self, origin: str, interval: int) -> None: + """等待 interval 秒后刷新缓冲区。被取消则静默退出(有新消息重置了计时器)。""" + try: + await asyncio.sleep(interval) + except asyncio.CancelledError: + return + + events = self._buffers.pop(origin, []) + self._tasks.pop(origin, None) + + if not events: + return + + # Find scheduler + conf_info = self._config_mgr.get_conf_info(origin) + conf_id = conf_info["id"] + scheduler = self._scheduler_mapping.get(conf_id) + if not scheduler: + logger.error( + f"[Debounce] PipelineScheduler not found for id: {conf_id}, events discarded." + ) + return + + # Merge if multiple + merged = events[0] if len(events) == 1 else self._merge_events(events) + + logger.info( + f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}" + ) + await scheduler.execute(merged) + + @staticmethod + def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent: + """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。""" + # Copy the first event's message data to avoid mutating the original + base = events[0] + + # Guard against None message_str + merged_str = (base.message_str or "") + "".join( + "\n" + (ev.message_str or "") + for ev in events[1:] + if ev.message_str + ) + + # Guard against None message_obj + merged_components = list(base.message_obj.message) if base.message_obj and base.message_obj.message else [] + for ev in events[1:]: + if ev.message_obj and ev.message_obj.message: + for comp in ev.message_obj.message: + if not isinstance(comp, (At, Reply)): + merged_components.append(comp) + + # Apply merged data to base (shallow copy of components already done above) + base.message_str = merged_str + if base.message_obj: + base.message_obj.message = merged_components + + return base + + class EventBus: """用于处理事件的分发和处理""" @@ -33,6 +140,9 @@ def __init__( # abconf uuid -> scheduler self.pipeline_scheduler_mapping = pipeline_scheduler_mapping self.astrbot_config_mgr = astrbot_config_mgr + self._debounce_mgr = DebounceManager( + astrbot_config_mgr, pipeline_scheduler_mapping + ) async def dispatch(self) -> None: while True: @@ -41,6 +151,19 @@ async def dispatch(self) -> None: conf_id = conf_info["id"] conf_name = conf_info.get("name") or conf_id self._print_event(event, conf_name) + + # Check if debounce is enabled for this session + conf = self.astrbot_config_mgr.get_conf(event.unified_msg_origin) + debounce_cfg = conf.get("platform_settings", {}).get( + "message_debounce", {} + ) + if debounce_cfg.get("enable", False): + logger.debug( + f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}" + ) + await self._debounce_mgr.push(event) + continue + scheduler = self.pipeline_scheduler_mapping.get(conf_id) if not scheduler: logger.error( diff --git a/dashboard/src/i18n/locales/en-US/features/config-metadata.json b/dashboard/src/i18n/locales/en-US/features/config-metadata.json index be62932745..a5fa6669b7 100644 --- a/dashboard/src/i18n/locales/en-US/features/config-metadata.json +++ b/dashboard/src/i18n/locales/en-US/features/config-metadata.json @@ -979,6 +979,21 @@ } } }, + "message_debounce": { + "description": "Message Debounce", + "platform_settings": { + "message_debounce": { + "enable": { + "description": "Enable Message Debounce", + "hint": "When enabled, consecutive messages from the same session within a short time window are merged before being sent to the LLM. Avoids context fragmentation when images and text arrive separately." + }, + "interval": { + "description": "Debounce Interval (seconds)", + "hint": "If a new message arrives from the same session within this window, the timer resets. When the timer expires, all buffered messages are merged and processed as one. Recommended: 2-3 seconds." + } + } + } + }, "ltm": { "description": "Group Chat Context Awareness (formerly Chat Memory Enhancement)", "provider_ltm_settings": { diff --git a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json index d506083a8a..c4ae6addd1 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json +++ b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json @@ -981,6 +981,21 @@ } } }, + "message_debounce": { + "description": "消息防抖", + "platform_settings": { + "message_debounce": { + "enable": { + "description": "启用消息防抖", + "hint": "启用后,短时间内来自同一会话的多条消息会合并后再交给LLM处理,避免图片和文字分开到达时上下文割裂。" + }, + "interval": { + "description": "防抖间隔(秒)", + "hint": "在此时间内若收到同一会话的新消息,则等待时间重置。超时后所有消息合并为一条再处理。推荐2-3秒。" + } + } + } + }, "ltm": { "description": "群聊上下文感知(原聊天记忆增强)", "provider_ltm_settings": {