Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ GenieData/
.opencode/
.kilocode/
.worktrees/
.codegraph/

dashboard/bun.lock
31 changes: 31 additions & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
], # 当 split_mode 为 words 时使用
"content_cleanup_rule": "",
},
"message_debounce": {
"enable": False,
"interval": 2,
},
"no_permission_reply": True,
"empty_mention_waiting": True,
"empty_mention_waiting_need_reply": True,
Expand Down Expand Up @@ -1067,6 +1071,19 @@
},
},
},
"message_debounce": {
"type": "object",
"items": {
"enable": {
"type": "bool",
"hint": "启用后,短时间内来自同一会话的多条消息会合并后再交给LLM处理,避免图片和文字分开到达时上下文割裂。",
},
"interval": {
"type": "int",
"hint": "防抖等待时间(秒)。在此时间内若收到同一会话的新消息,则等待时间重置。超时后所有消息合并为一条再处理。推荐2-3秒。",
},
},
},
"reply_prefix": {
"type": "string",
"hint": "机器人回复消息时带有的前缀。",
Expand Down Expand Up @@ -4135,6 +4152,20 @@
},
},
},
"message_debounce": {
"description": "消息防抖",
"type": "object",
"items": {
"platform_settings.message_debounce.enable": {
"description": "启用消息防抖",
"type": "bool",
},
"platform_settings.message_debounce.interval": {
"description": "防抖间隔(秒)",
"type": "int",
},
},
},
"ltm": {
"description": "群聊上下文感知(原聊天记忆增强)",
"type": "object",
Expand Down
123 changes: 123 additions & 0 deletions astrbot/core/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,129 @@

class:
EventBus: 事件总线, 用于处理事件的分发和处理
DebounceManager: 消息防抖管理器, 短时间内的同会话消息合并为一条再调度

工作流程:
1. 维护一个异步队列, 来接受各种消息事件
2. 无限循环的调度函数, 从事件队列中获取新的事件, 打印日志并创建一个新的异步任务来执行管道调度器的处理逻辑
- 若启用了消息防抖, 则交由 DebounceManager 聚合后统一执行
"""

import asyncio
import copy
from asyncio import Queue

from astrbot.core import logger
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
from astrbot.core.message.components import At, Reply
from astrbot.core.pipeline.scheduler import PipelineScheduler

from .platform import AstrMessageEvent


class DebounceManager:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.

You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.

1. Let EventBus own config & scheduler resolution

Instead of letting DebounceManager read config and resolve the scheduler again, have EventBus do that once and pass the resolved values in. That removes duplicated logic and makes DebounceManager a pure “timer + buffer” component.

EventBus.dispatch (compute once, then delegate):

async def dispatch(self) -> None:
    while True:
        event: AstrMessageEvent = await self.event_queue.get()
        origin = event.unified_msg_origin

        conf_info = self.astrbot_config_mgr.get_conf_info(origin)
        conf_id = conf_info["id"]
        conf_name = conf_info.get("name") or conf_id
        self._print_event(event, conf_name)

        # Config lookup only here
        conf = self.astrbot_config_mgr.get_conf(origin)
        debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
        enable = debounce_cfg.get("enable", False)
        interval: int = debounce_cfg.get("interval", 2)

        scheduler = self.pipeline_scheduler_mapping.get(conf_id)
        if not scheduler:
            logger.error(
                f"PipelineScheduler not found for id: {conf_id}, event ignored."
            )
            continue

        if enable:
            logger.debug(
                f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
            )
            await self._debounce_mgr.push(event, scheduler, interval)
        else:
            asyncio.create_task(scheduler.execute(event))

DebounceManager: no config_mgr, no scheduler_mapping

class DebounceManager:
    def __init__(self) -> None:
        self._tasks: dict[str, asyncio.Task] = {}
        self._buffers: dict[str, list[AstrMessageEvent]] = {}
        self._schedulers: dict[str, PipelineScheduler] = {}

    async def push(
        self,
        event: AstrMessageEvent,
        scheduler: PipelineScheduler,
        interval: int,
    ) -> None:
        origin = event.unified_msg_origin

        # Remember scheduler per origin (no mapping lookup inside)
        self._schedulers[origin] = scheduler

        existing = self._tasks.get(origin)
        if existing is not None and not existing.done():
            existing.cancel()
            logger.debug(f"[Debounce] 重置计时器: {origin}")

        self._buffers.setdefault(origin, []).append(event)

        self._tasks[origin] = asyncio.create_task(
            self._flush_after_delay(origin, interval),
        )

    async def _flush_after_delay(self, origin: str, interval: int) -> None:
        try:
            await asyncio.sleep(interval)
        except asyncio.CancelledError:
            return

        events = self._buffers.pop(origin, [])
        task = self._tasks.pop(origin, None)
        scheduler = self._schedulers.get(origin)

        if not events or not scheduler:
            if not scheduler:
                logger.error(
                    f"[Debounce] PipelineScheduler not found for origin: {origin}, "
                    f"events discarded."
                )
            return

        merged = events[0] if len(events) == 1 else self._merge_events(events)

        logger.info(f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}")
        await scheduler.execute(merged)

This keeps the debounce feature but:

  • Config reads only happen in EventBus.
  • Scheduler resolution only happens in EventBus.
  • DebounceManager no longer depends on AstrBotConfigManager or the mapping.

2. Make _merge_events avoid mutating the original event

Mutating events[0] in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:

import copy

@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
    """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
    base = copy.deepcopy(events[0])  # avoid mutating the original

    for ev in events[1:]:
        if ev.message_str:
            base.message_str += "\n" + ev.message_str

        if ev.message_obj and ev.message_obj.message:
            for comp in ev.message_obj.message:
                if not isinstance(comp, (At, Reply)):
                    base.message_obj.message.append(comp)

    return base

This keeps the same merged content semantics, but _merge_events becomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.

"""消息防抖管理器:短时间内的同会话消息合并为一条再调度。

当 message_debounce.enable = true 时,
同一 unified_msg_origin 的连续消息会在 interval 秒内聚合,
计时器被新消息重置,超时后合并所有消息一次性交给 scheduler 执行。
"""

Comment on lines +27 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

根据通用规则,新功能(如消息防抖管理器 DebounceManager)应当伴随相应的单元测试。请添加对应的单元测试,以验证 DebounceManager 的防抖延迟、计时器重置以及消息合并逻辑的正确性。

References
  1. New functionality, such as handling attachments, should be accompanied by corresponding unit tests.

def __init__(
self,
config_mgr: AstrBotConfigManager,
scheduler_mapping: dict[str, PipelineScheduler],
) -> None:
self._config_mgr = config_mgr
self._scheduler_mapping = scheduler_mapping
self._tasks: dict[str, asyncio.Task] = {}
self._buffers: dict[str, list[AstrMessageEvent]] = {}

async def push(self, event: AstrMessageEvent) -> None:
"""推送事件到防抖管理器。若同会话已有等待计时器则重置, 否则创建新计时器。"""
origin = event.unified_msg_origin

# Cancel existing timer if any
existing = self._tasks.get(origin)
if existing is not None and not existing.done():
existing.cancel()
logger.debug(f"[Debounce] 重置计时器: {origin}")

# Get interval from config
conf = self._config_mgr.get_conf(origin)
debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {})
interval: int = max(1, int(debounce_cfg.get("interval", 2)))

# Buffer
if origin not in self._buffers:
self._buffers[origin] = []
self._buffers[origin].append(event)

# Start new timer
self._tasks[origin] = asyncio.create_task(
self._flush_after_delay(origin, interval),
)
Comment on lines +45 to +68
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

当前的防抖实现中,每次收到新消息都会重置计时器(existing.cancel())。在非常活跃的会话(如高频群聊或遭遇刷屏)中,如果消息发送间隔一直小于 interval,计时器将不断被重置,导致消息永远无法被 flush 调度给 LLM,且缓冲区会持续增长(存在潜在的内存泄漏风险)。

建议引入一个最大等待时间(例如 max_delay),从第一条消息到达开始计时,一旦达到最大等待时间则强制执行 flush,不再因新消息而重置,以避免消息饥饿和内存无限增长。


async def _flush_after_delay(self, origin: str, interval: int) -> None:
"""等待 interval 秒后刷新缓冲区。被取消则静默退出(有新消息重置了计时器)。"""
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
return

events = self._buffers.pop(origin, [])
self._tasks.pop(origin, None)

if not events:
return

# Find scheduler
conf_info = self._config_mgr.get_conf_info(origin)
conf_id = conf_info["id"]
scheduler = self._scheduler_mapping.get(conf_id)
if not scheduler:
logger.error(
f"[Debounce] PipelineScheduler not found for id: {conf_id}, events discarded."
)
return

# Merge if multiple
merged = events[0] if len(events) == 1 else self._merge_events(events)

logger.info(
f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}"
)
await scheduler.execute(merged)

@staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
# Copy the first event's message data to avoid mutating the original
base = events[0]

# Guard against None message_str
merged_str = (base.message_str or "") + "".join(
"\n" + (ev.message_str or "")
for ev in events[1:]
if ev.message_str
)

# Guard against None message_obj
merged_components = list(base.message_obj.message) if base.message_obj and base.message_obj.message else []
for ev in events[1:]:
if ev.message_obj and ev.message_obj.message:
for comp in ev.message_obj.message:
Comment on lines +105 to +118
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Merging message components assumes base.message_obj exists, which may drop or mis-handle components if only later events carry them.

In _merge_events, components are appended only when ev.message_obj is present, but base.message_obj itself may be None. If the first event has only text and a later one carries images/files, appending to base.message_obj.message can either drop those components or raise an attribute error. Consider initializing or cloning message_obj from the first event that has one (when base.message_obj is None), or choosing as base the first event that has a message_obj, so non-text components are preserved safely.

if not isinstance(comp, (At, Reply)):
merged_components.append(comp)

# Apply merged data to base (shallow copy of components already done above)
base.message_str = merged_str
if base.message_obj:
base.message_obj.message = merged_components

return base

Comment on lines +102 to +128
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

_merge_events 中,如果第一条消息(base)的 message_strNone(例如第一条消息是纯图片),而后续消息有 message_str,执行 base.message_str += "\n" + ev.message_str 会抛出 TypeError: unsupported operand type(s) for +=: 'NoneType' and 'str'

同样地,如果 base.message_objNone(例如某些平台上的纯文本消息),而后续消息有 message_obj,访问 base.message_obj.message 会抛出 AttributeError

建议在合并时对 None 值进行防御性检查。

    @staticmethod
    def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
        """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
        base = events[0]

        for ev in events[1:]:
            # Merge text safely
            if ev.message_str:
                if base.message_str:
                    base.message_str += "\n" + ev.message_str
                else:
                    base.message_str = ev.message_str

            # Merge message components (images, files, etc.) safely, skip At/Reply
            if ev.message_obj and ev.message_obj.message:
                if base.message_obj is None:
                    base.message_obj = ev.message_obj
                else:
                    for comp in ev.message_obj.message:
                        if not isinstance(comp, (At, Reply)):
                            base.message_obj.message.append(comp)

        return base


class EventBus:
"""用于处理事件的分发和处理"""

Expand All @@ -33,6 +140,9 @@ def __init__(
# abconf uuid -> scheduler
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
self.astrbot_config_mgr = astrbot_config_mgr
self._debounce_mgr = DebounceManager(
astrbot_config_mgr, pipeline_scheduler_mapping
)

async def dispatch(self) -> None:
while True:
Expand All @@ -41,6 +151,19 @@ async def dispatch(self) -> None:
conf_id = conf_info["id"]
conf_name = conf_info.get("name") or conf_id
self._print_event(event, conf_name)

# Check if debounce is enabled for this session
conf = self.astrbot_config_mgr.get_conf(event.unified_msg_origin)
debounce_cfg = conf.get("platform_settings", {}).get(
"message_debounce", {}
)
if debounce_cfg.get("enable", False):
logger.debug(
f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}"
)
await self._debounce_mgr.push(event)
continue

scheduler = self.pipeline_scheduler_mapping.get(conf_id)
if not scheduler:
logger.error(
Expand Down
15 changes: 15 additions & 0 deletions dashboard/src/i18n/locales/en-US/features/config-metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,21 @@
}
}
},
"message_debounce": {
"description": "Message Debounce",
"platform_settings": {
"message_debounce": {
"enable": {
"description": "Enable Message Debounce",
"hint": "When enabled, consecutive messages from the same session within a short time window are merged before being sent to the LLM. Avoids context fragmentation when images and text arrive separately."
},
"interval": {
"description": "Debounce Interval (seconds)",
"hint": "If a new message arrives from the same session within this window, the timer resets. When the timer expires, all buffered messages are merged and processed as one. Recommended: 2-3 seconds."
}
}
}
},
"ltm": {
"description": "Group Chat Context Awareness (formerly Chat Memory Enhancement)",
"provider_ltm_settings": {
Expand Down
15 changes: 15 additions & 0 deletions dashboard/src/i18n/locales/zh-CN/features/config-metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,21 @@
}
}
},
"message_debounce": {
"description": "消息防抖",
"platform_settings": {
"message_debounce": {
"enable": {
"description": "启用消息防抖",
"hint": "启用后,短时间内来自同一会话的多条消息会合并后再交给LLM处理,避免图片和文字分开到达时上下文割裂。"
},
"interval": {
"description": "防抖间隔(秒)",
"hint": "在此时间内若收到同一会话的新消息,则等待时间重置。超时后所有消息合并为一条再处理。推荐2-3秒。"
}
}
}
},
"ltm": {
"description": "群聊上下文感知(原聊天记忆增强)",
"provider_ltm_settings": {
Expand Down