-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcomment_detection_manager.py
More file actions
298 lines (233 loc) · 9.96 KB
/
comment_detection_manager.py
File metadata and controls
298 lines (233 loc) · 9.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
"""新規コメント検知とコンテキスト反映機能.
このモジュールは、Issue/MRの処理中に新しいユーザーコメントを検出し、
LLMコンテキストに反映する機能を提供します。
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from handlers.task import Task
class CommentDetectionManager:
"""Issue/MRの新規コメント検出を管理するクラス.
処理中に追加された新規ユーザーコメントを検出し、
LLMコンテキストに追加する機能を提供します。
"""
def __init__(self, task: Task, config: dict[str, Any]) -> None:
"""CommentDetectionManagerを初期化する.
Args:
task: 処理対象のタスクオブジェクト
config: アプリケーション設定辞書
"""
self.task = task
self.config = config
self.logger = logging.getLogger(__name__)
# コメント検出機能の有効/無効フラグ
self.enabled = False
# 前回までのコメントIDセット(文字列として管理)
self.last_comment_ids: set[str] = set()
# 前回チェック時刻
self.last_check_time: datetime | None = None
# bot自身のユーザー名(除外用)
self.bot_username: str | None = None
# 有効/無効とbot_usernameの決定
self._configure()
def _configure(self) -> None:
"""設定からbot_usernameを取得し、機能の有効/無効を決定する."""
# タスクタイプを取得
task_key = self.task.get_task_key().to_dict()
task_type = task_key.get("type", "")
if task_type.startswith("github"):
# GitHub: 設定ファイルから取得
self.bot_username = self.config.get("github", {}).get("bot_name")
elif task_type.startswith("gitlab"):
# GitLab: 設定ファイルから取得
self.bot_username = self.config.get("gitlab", {}).get("bot_name")
else:
self.logger.warning("不明なタスクタイプ: %s", task_type)
self.bot_username = None
# bot_usernameが取得できない場合は機能を無効化
if not self.bot_username:
self.logger.warning("bot_usernameが設定されていません。コメント検出機能を無効化します。")
self.enabled = False
else:
self.enabled = True
self.logger.info(
"コメント検出機能を有効化しました (bot_username=%s)",
self.bot_username,
)
def initialize(self) -> None:
"""現在のコメント一覧を取得してlast_comment_idsを初期化する.
タスク開始時に呼び出して、既存のコメントをすべて記録します。
"""
if not self.enabled:
self.logger.debug("コメント検出機能が無効のため、初期化をスキップします")
return
try:
# 現在のコメント一覧を取得
comments = self.task.get_comments()
# コメントIDをセットに追加(文字列として管理)
self.last_comment_ids = {
str(comment.get("id", "")) for comment in comments if comment.get("id") is not None
}
# チェック時刻を記録
self.last_check_time = datetime.now(timezone.utc)
self.logger.info(
"コメント検出を初期化しました: %d件のコメントを記録",
len(self.last_comment_ids),
)
except Exception as e:
self.logger.warning("コメント取得中にエラー発生: %s", e)
# エラーでも処理を継続
def check_for_new_comments(self) -> list[dict[str, Any]]:
"""新規コメントを検出する.
Returns:
新規コメントのリスト(空リストの場合は新規なし)
"""
if not self.enabled:
return []
try:
# 現在のコメント一覧を取得
current_comments = self.task.get_comments()
# 新規コメントを抽出
new_comments = []
current_ids = set()
for comment in current_comments:
comment_id_raw = comment.get("id")
if comment_id_raw is None:
continue
comment_id = str(comment_id_raw)
current_ids.add(comment_id)
# 新規コメントの判定
if comment_id not in self.last_comment_ids:
# bot自身のコメントを除外
if not self.is_bot_comment(comment):
new_comments.append(comment)
else:
self.logger.debug(
"bot自身のコメントを除外しました: id=%s",
comment_id,
)
# 状態を更新
self.last_comment_ids = current_ids
self.last_check_time = datetime.now(timezone.utc)
if new_comments:
self.logger.info(
"新規コメントを検出しました: %d件 (Task: %s)",
len(new_comments),
getattr(self.task, "uuid", "unknown"),
)
else:
self.logger.debug(
"新規コメントなし (Task: %s)",
getattr(self.task, "uuid", "unknown"),
)
return new_comments
except Exception as e:
self.logger.warning(
"コメント取得中にエラー発生: %s (Task: %s)",
e,
getattr(self.task, "uuid", "unknown"),
)
# エラー時は空リストを返して処理を継続
return []
def is_bot_comment(self, comment: dict[str, Any]) -> bool:
"""コメントがbot自身によるものか判定する.
Args:
comment: コメント情報の辞書
Returns:
botのコメントの場合True
"""
if not self.bot_username:
return False
author = comment.get("author", "")
return author == self.bot_username
def format_comment_message(self, comments: list[dict[str, Any]]) -> str:
"""検出したコメントをLLMメッセージ形式に整形する.
Args:
comments: コメントリスト
Returns:
整形されたメッセージ文字列
"""
if not comments:
return ""
if len(comments) == 1:
# 単一コメントの場合
comment = comments[0]
author = comment.get("author", "unknown")
body = comment.get("body", "")
return f"[New Comment from @{author}]:\n{body}"
# 複数コメントの場合
lines = ["[New Comments Detected]:", ""]
for i, comment in enumerate(comments, 1):
author = comment.get("author", "unknown")
body = comment.get("body", "")
timestamp = comment.get("created_at", "")
lines.append(f"Comment {i} from @{author} ({timestamp}):")
lines.append(body)
lines.append("")
return "\n".join(lines)
def add_to_context(
self,
llm_client: Any, # noqa: ANN401
comments: list[dict[str, Any]],
) -> None:
"""検出したコメントをLLMコンテキストに追加する.
Args:
llm_client: LLMクライアントインスタンス
comments: 追加するコメントのリスト
"""
if not comments:
return
try:
# メッセージを整形
message = self.format_comment_message(comments)
# LLMコンテキストに追加
llm_client.send_user_message(message)
self.logger.info(
"新規コメントをコンテキストに追加しました: %d件 (Task: %s)",
len(comments),
getattr(self.task, "uuid", "unknown"),
)
except Exception as e:
self.logger.warning(
"コンテキスト追加中にエラー発生: %s (Task: %s)",
e,
getattr(self.task, "uuid", "unknown"),
)
def get_state(self) -> dict[str, Any]:
"""一時停止時の状態永続化用に現在の状態を取得する.
Returns:
状態辞書
"""
return {
"last_comment_ids": list(self.last_comment_ids),
"last_check_timestamp": (self.last_check_time.isoformat() if self.last_check_time else None),
}
def restore_state(self, state: dict[str, Any]) -> None:
"""再開時の状態復元用.
Args:
state: get_state()と同じ構造の状態辞書
"""
if not state:
self.logger.debug("復元する状態がありません")
return
try:
# last_comment_idsを復元
comment_ids = state.get("last_comment_ids", [])
self.last_comment_ids = set(comment_ids)
# last_check_timeを復元
timestamp = state.get("last_check_timestamp")
if timestamp:
self.last_check_time = datetime.fromisoformat(timestamp)
self.logger.info(
"コメント検出状態を復元しました: %d件のコメントID",
len(self.last_comment_ids),
)
except Exception as e:
self.logger.warning(
"コメント検出状態の復元に失敗しました: %s。新規初期化を実行します。",
e,
)
# 復元失敗時は新規初期化
self.initialize()