-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
feat: add message_debounce config and DebounceManager for message aggregation #8624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,5 +62,6 @@ GenieData/ | |
| .opencode/ | ||
| .kilocode/ | ||
| .worktrees/ | ||
| .codegraph/ | ||
|
|
||
| dashboard/bun.lock | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,22 +4,129 @@ | |
|
|
||
| class: | ||
| EventBus: 事件总线, 用于处理事件的分发和处理 | ||
| DebounceManager: 消息防抖管理器, 短时间内的同会话消息合并为一条再调度 | ||
|
|
||
| 工作流程: | ||
| 1. 维护一个异步队列, 来接受各种消息事件 | ||
| 2. 无限循环的调度函数, 从事件队列中获取新的事件, 打印日志并创建一个新的异步任务来执行管道调度器的处理逻辑 | ||
| - 若启用了消息防抖, 则交由 DebounceManager 聚合后统一执行 | ||
| """ | ||
|
|
||
| import asyncio | ||
| import copy | ||
| from asyncio import Queue | ||
|
|
||
| from astrbot.core import logger | ||
| from astrbot.core.astrbot_config_mgr import AstrBotConfigManager | ||
| from astrbot.core.message.components import At, Reply | ||
| from astrbot.core.pipeline.scheduler import PipelineScheduler | ||
|
|
||
| from .platform import AstrMessageEvent | ||
|
|
||
|
|
||
| class DebounceManager: | ||
| """消息防抖管理器:短时间内的同会话消息合并为一条再调度。 | ||
|
|
||
| 当 message_debounce.enable = true 时, | ||
| 同一 unified_msg_origin 的连续消息会在 interval 秒内聚合, | ||
| 计时器被新消息重置,超时后合并所有消息一次性交给 scheduler 执行。 | ||
| """ | ||
|
|
||
|
Comment on lines
+27
to
+34
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| def __init__( | ||
| self, | ||
| config_mgr: AstrBotConfigManager, | ||
| scheduler_mapping: dict[str, PipelineScheduler], | ||
| ) -> None: | ||
| self._config_mgr = config_mgr | ||
| self._scheduler_mapping = scheduler_mapping | ||
| self._tasks: dict[str, asyncio.Task] = {} | ||
| self._buffers: dict[str, list[AstrMessageEvent]] = {} | ||
|
|
||
| async def push(self, event: AstrMessageEvent) -> None: | ||
| """推送事件到防抖管理器。若同会话已有等待计时器则重置, 否则创建新计时器。""" | ||
| origin = event.unified_msg_origin | ||
|
|
||
| # Cancel existing timer if any | ||
| existing = self._tasks.get(origin) | ||
| if existing is not None and not existing.done(): | ||
| existing.cancel() | ||
| logger.debug(f"[Debounce] 重置计时器: {origin}") | ||
|
|
||
| # Get interval from config | ||
| conf = self._config_mgr.get_conf(origin) | ||
| debounce_cfg = conf.get("platform_settings", {}).get("message_debounce", {}) | ||
| interval: int = max(1, int(debounce_cfg.get("interval", 2))) | ||
|
|
||
| # Buffer | ||
| if origin not in self._buffers: | ||
| self._buffers[origin] = [] | ||
| self._buffers[origin].append(event) | ||
|
|
||
| # Start new timer | ||
| self._tasks[origin] = asyncio.create_task( | ||
| self._flush_after_delay(origin, interval), | ||
| ) | ||
|
Comment on lines
+45
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| async def _flush_after_delay(self, origin: str, interval: int) -> None: | ||
| """等待 interval 秒后刷新缓冲区。被取消则静默退出(有新消息重置了计时器)。""" | ||
| try: | ||
| await asyncio.sleep(interval) | ||
| except asyncio.CancelledError: | ||
| return | ||
|
|
||
| events = self._buffers.pop(origin, []) | ||
| self._tasks.pop(origin, None) | ||
|
|
||
| if not events: | ||
| return | ||
|
|
||
| # Find scheduler | ||
| conf_info = self._config_mgr.get_conf_info(origin) | ||
| conf_id = conf_info["id"] | ||
| scheduler = self._scheduler_mapping.get(conf_id) | ||
| if not scheduler: | ||
| logger.error( | ||
| f"[Debounce] PipelineScheduler not found for id: {conf_id}, events discarded." | ||
| ) | ||
| return | ||
|
|
||
| # Merge if multiple | ||
| merged = events[0] if len(events) == 1 else self._merge_events(events) | ||
|
|
||
| logger.info( | ||
| f"[Debounce] 合并 {len(events)} 条消息后执行调度: {origin}" | ||
| ) | ||
| await scheduler.execute(merged) | ||
|
|
||
| @staticmethod | ||
| def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent: | ||
| """将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。""" | ||
| # Copy the first event's message data to avoid mutating the original | ||
| base = events[0] | ||
|
|
||
| # Guard against None message_str | ||
| merged_str = (base.message_str or "") + "".join( | ||
| "\n" + (ev.message_str or "") | ||
| for ev in events[1:] | ||
| if ev.message_str | ||
| ) | ||
|
|
||
| # Guard against None message_obj | ||
| merged_components = list(base.message_obj.message) if base.message_obj and base.message_obj.message else [] | ||
| for ev in events[1:]: | ||
| if ev.message_obj and ev.message_obj.message: | ||
| for comp in ev.message_obj.message: | ||
|
Comment on lines
+105
to
+118
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Merging message components assumes In |
||
| if not isinstance(comp, (At, Reply)): | ||
| merged_components.append(comp) | ||
|
|
||
| # Apply merged data to base (shallow copy of components already done above) | ||
| base.message_str = merged_str | ||
| if base.message_obj: | ||
| base.message_obj.message = merged_components | ||
|
|
||
| return base | ||
|
|
||
|
Comment on lines
+102
to
+128
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 在 同样地,如果 建议在合并时对 @staticmethod
def _merge_events(events: list[AstrMessageEvent]) -> AstrMessageEvent:
"""将多条消息合并为一条, 保留第一条的会话信息, 追加文本和组件。"""
base = events[0]
for ev in events[1:]:
# Merge text safely
if ev.message_str:
if base.message_str:
base.message_str += "\n" + ev.message_str
else:
base.message_str = ev.message_str
# Merge message components (images, files, etc.) safely, skip At/Reply
if ev.message_obj and ev.message_obj.message:
if base.message_obj is None:
base.message_obj = ev.message_obj
else:
for comp in ev.message_obj.message:
if not isinstance(comp, (At, Reply)):
base.message_obj.message.append(comp)
return base |
||
|
|
||
| class EventBus: | ||
| """用于处理事件的分发和处理""" | ||
|
|
||
|
|
@@ -33,6 +140,9 @@ def __init__( | |
| # abconf uuid -> scheduler | ||
| self.pipeline_scheduler_mapping = pipeline_scheduler_mapping | ||
| self.astrbot_config_mgr = astrbot_config_mgr | ||
| self._debounce_mgr = DebounceManager( | ||
| astrbot_config_mgr, pipeline_scheduler_mapping | ||
| ) | ||
|
|
||
| async def dispatch(self) -> None: | ||
| while True: | ||
|
|
@@ -41,6 +151,19 @@ async def dispatch(self) -> None: | |
| conf_id = conf_info["id"] | ||
| conf_name = conf_info.get("name") or conf_id | ||
| self._print_event(event, conf_name) | ||
|
|
||
| # Check if debounce is enabled for this session | ||
| conf = self.astrbot_config_mgr.get_conf(event.unified_msg_origin) | ||
| debounce_cfg = conf.get("platform_settings", {}).get( | ||
| "message_debounce", {} | ||
| ) | ||
| if debounce_cfg.get("enable", False): | ||
| logger.debug( | ||
| f"[Debounce] 消息进入防抖队列: {event.get_message_outline()}" | ||
| ) | ||
| await self._debounce_mgr.push(event) | ||
| continue | ||
|
|
||
| scheduler = self.pipeline_scheduler_mapping.get(conf_id) | ||
| if not scheduler: | ||
| logger.error( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider centralizing config/scheduler resolution in EventBus and making DebounceManager a pure buffering/timing helper that non-destructively merges events to simplify responsibilities and shared state.
You can reduce the new complexity by tightening the boundaries and removing duplicated lookups, without changing behavior.
1. Let
EventBusown config & scheduler resolutionInstead of letting
DebounceManagerread config and resolve the scheduler again, haveEventBusdo that once and pass the resolved values in. That removes duplicated logic and makesDebounceManagera pure “timer + buffer” component.EventBus.dispatch (compute once, then delegate):
DebounceManager: no config_mgr, no scheduler_mapping
This keeps the debounce feature but:
EventBus.EventBus.DebounceManagerno longer depends onAstrBotConfigManageror the mapping.2. Make
_merge_eventsavoid mutating the original eventMutating
events[0]in place makes reasoning/debugging harder when the same object might be referenced elsewhere. A small change keeps behavior but makes it safer:This keeps the same merged content semantics, but
_merge_eventsbecomes easier to test and reason about, and you remove one more subtle source of shared-state complexity.