Skip to content

Commit 464dd01

Browse files
fix: Make from_memory_type and to_memory_type optional and add task_id propagation
- Make from_memory_type and to_memory_type fields optional in ScheduleLogForWebItem - This fixes RabbitMQ log submission validation errors in cloud service scenario - Add task_id field to ScheduleMessageItem and ScheduleLogForWebItem - Propagate task_id from API request through scheduler to web logs - Add logging for preference memory additions in _pref_add_message_consumer Fixes validation error: '2 validation errors for ScheduleLogForWebItem from_memory_type Field required to_memory_type Field required' Changes: - src/memos/mem_scheduler/schemas/message_schemas.py: Add task_id fields - src/memos/multi_mem_cube/single_cube.py: Pass task_id to ScheduleMessageItem - src/memos/mem_scheduler/general_scheduler.py: Propagate task_id to logs
1 parent 9686810 commit 464dd01

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
189189
memory_len=1,
190190
memcube_name=self._map_memcube_name(msg.mem_cube_id),
191191
)
192+
event.task_id = msg.task_id
192193
self._submit_web_logs([event])
193194
except Exception:
194195
logger.exception("Failed to record addMessage log for query")
@@ -233,6 +234,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
233234
memory_len=1,
234235
memcube_name=self._map_memcube_name(msg.mem_cube_id),
235236
)
237+
event.task_id = msg.task_id
236238
self._submit_web_logs([event])
237239
except Exception:
238240
logger.exception("Failed to record addMessage log for answer")
@@ -798,6 +800,38 @@ def process_message(message: ScheduleMessageItem):
798800
f"Successfully processed and add preferences for user_id={user_id}, mem_cube_id={mem_cube_id}, pref_ids={pref_ids}"
799801
)
800802

803+
# Create and submit log for web display
804+
if pref_ids:
805+
pref_content = []
806+
pref_meta = []
807+
for i, pref_mem_item in enumerate(pref_memories):
808+
if i < len(pref_ids):
809+
pref_content.append({
810+
"content": pref_mem_item.memory,
811+
"ref_id": pref_ids[i],
812+
})
813+
pref_meta.append({
814+
"ref_id": pref_ids[i],
815+
"id": pref_ids[i],
816+
"memory": pref_mem_item.memory,
817+
"memory_type": getattr(pref_mem_item.metadata, "memory_type", "preference"),
818+
})
819+
820+
event = self.create_event_log(
821+
label="addMemory",
822+
from_memory_type=USER_INPUT_TYPE,
823+
to_memory_type=LONG_TERM_MEMORY_TYPE,
824+
user_id=user_id,
825+
mem_cube_id=mem_cube_id,
826+
mem_cube=mem_cube,
827+
memcube_log_content=pref_content,
828+
metadata=pref_meta,
829+
memory_len=len(pref_content),
830+
memcube_name=self._map_memcube_name(mem_cube_id),
831+
)
832+
event.task_id = message.task_id
833+
self._submit_web_logs([event])
834+
801835
except Exception as e:
802836
logger.error(f"Error processing pref_add message: {e}", exc_info=True)
803837

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
class ScheduleMessageItem(BaseModel, DictConversionMixin):
3535
item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4()))
36+
task_id: str | None = Field(default=None, description="Parent task ID, if applicable")
3637
redis_message_id: str = Field(default="", description="the message get from redis stream")
3738
user_id: str = Field(..., description="user id")
3839
mem_cube_id: str = Field(..., description="memcube id")
@@ -114,13 +115,14 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
114115
item_id: str = Field(
115116
description="Unique identifier for the log entry", default_factory=lambda: str(uuid4())
116117
)
118+
task_id: str | None = Field(default=None, description="Identifier for the parent task")
117119
user_id: str = Field(..., description="Identifier for the user associated with the log")
118120
mem_cube_id: str = Field(
119121
..., description="Identifier for the memcube associated with this log entry"
120122
)
121123
label: str = Field(..., description="Label categorizing the type of log")
122-
from_memory_type: str = Field(..., description="Source memory type")
123-
to_memory_type: str = Field(..., description="Destination memory type")
124+
from_memory_type: str | None = Field(None, description="Source memory type")
125+
to_memory_type: str | None = Field(None, description="Destination memory type")
124126
log_content: str = Field(..., description="Detailed content of the log entry")
125127
current_memory_sizes: MemorySizes = Field(
126128
default_factory=lambda: dict(DEFAULT_MEMORY_SIZES),

src/memos/multi_mem_cube/single_cube.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ def _schedule_memory_tasks(
425425
try:
426426
message_item_read = ScheduleMessageItem(
427427
user_id=add_req.user_id,
428+
task_id=add_req.task_id,
428429
session_id=target_session_id,
429430
mem_cube_id=self.cube_id,
430431
mem_cube=self.naive_mem_cube,
@@ -446,6 +447,7 @@ def _schedule_memory_tasks(
446447
else:
447448
message_item_add = ScheduleMessageItem(
448449
user_id=add_req.user_id,
450+
task_id=add_req.task_id,
449451
session_id=target_session_id,
450452
mem_cube_id=self.cube_id,
451453
mem_cube=self.naive_mem_cube,
@@ -485,6 +487,7 @@ def _process_pref_mem(
485487
messages_list = [add_req.messages]
486488
message_item_pref = ScheduleMessageItem(
487489
user_id=add_req.user_id,
490+
task_id=add_req.task_id,
488491
session_id=target_session_id,
489492
mem_cube_id=self.cube_id,
490493
mem_cube=self.naive_mem_cube,

0 commit comments

Comments
 (0)