Skip to content

Commit aef6bcf

Browse files
authored
fix: Make from_memory_type and to_memory_type optional and add task_i… (#538)
fix: Make from_memory_type and to_memory_type optional and add task_id propagation - Make from_memory_type and to_memory_type fields optional in ScheduleLogForWebItem - This fixes RabbitMQ log submission validation errors in cloud service scenario - Add task_id field to ScheduleMessageItem and ScheduleLogForWebItem - Propagate task_id from API request through scheduler to web logs - Add logging for preference memory additions in _pref_add_message_consumer Fixes validation error: '2 validation errors for ScheduleLogForWebItem from_memory_type Field required to_memory_type Field required' Changes: - src/memos/mem_scheduler/schemas/message_schemas.py: Add task_id fields - src/memos/multi_mem_cube/single_cube.py: Pass task_id to ScheduleMessageItem - src/memos/mem_scheduler/general_scheduler.py: Propagate task_id to logs Co-authored-by: [email protected] <>
1 parent 801994b commit aef6bcf

File tree

3 files changed

+53
-2
lines changed

3 files changed

+53
-2
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
189189
memory_len=1,
190190
memcube_name=self._map_memcube_name(msg.mem_cube_id),
191191
)
192+
event.task_id = msg.task_id
192193
self._submit_web_logs([event])
193194
except Exception:
194195
logger.exception("Failed to record addMessage log for query")
@@ -233,6 +234,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
233234
memory_len=1,
234235
memcube_name=self._map_memcube_name(msg.mem_cube_id),
235236
)
237+
event.task_id = msg.task_id
236238
self._submit_web_logs([event])
237239
except Exception:
238240
logger.exception("Failed to record addMessage log for answer")
@@ -798,6 +800,50 @@ def process_message(message: ScheduleMessageItem):
798800
f"Successfully processed and add preferences for user_id={user_id}, mem_cube_id={mem_cube_id}, pref_ids={pref_ids}"
799801
)
800802

803+
# Create and submit log for web display
804+
# Only send logs if RabbitMQ is configured with direct exchange (cloud service scenario)
805+
should_send_log = (
806+
self.rabbitmq_config is not None
807+
and hasattr(self.rabbitmq_config, "exchange_type")
808+
and self.rabbitmq_config.exchange_type == "direct"
809+
)
810+
if pref_ids and should_send_log:
811+
pref_content = []
812+
pref_meta = []
813+
for i, pref_mem_item in enumerate(pref_memories):
814+
if i < len(pref_ids):
815+
pref_content.append(
816+
{
817+
"content": pref_mem_item.memory,
818+
"ref_id": pref_ids[i],
819+
}
820+
)
821+
pref_meta.append(
822+
{
823+
"ref_id": pref_ids[i],
824+
"id": pref_ids[i],
825+
"memory": pref_mem_item.memory,
826+
"memory_type": getattr(
827+
pref_mem_item.metadata, "memory_type", "preference"
828+
),
829+
}
830+
)
831+
832+
event = self.create_event_log(
833+
label="addMemory",
834+
from_memory_type=USER_INPUT_TYPE,
835+
to_memory_type=LONG_TERM_MEMORY_TYPE,
836+
user_id=user_id,
837+
mem_cube_id=mem_cube_id,
838+
mem_cube=mem_cube,
839+
memcube_log_content=pref_content,
840+
metadata=pref_meta,
841+
memory_len=len(pref_content),
842+
memcube_name=self._map_memcube_name(mem_cube_id),
843+
)
844+
event.task_id = message.task_id
845+
self._submit_web_logs([event])
846+
801847
except Exception as e:
802848
logger.error(f"Error processing pref_add message: {e}", exc_info=True)
803849

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
class ScheduleMessageItem(BaseModel, DictConversionMixin):
3535
item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4()))
36+
task_id: str | None = Field(default=None, description="Parent task ID, if applicable")
3637
redis_message_id: str = Field(default="", description="the message get from redis stream")
3738
user_id: str = Field(..., description="user id")
3839
mem_cube_id: str = Field(..., description="memcube id")
@@ -114,13 +115,14 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
114115
item_id: str = Field(
115116
description="Unique identifier for the log entry", default_factory=lambda: str(uuid4())
116117
)
118+
task_id: str | None = Field(default=None, description="Identifier for the parent task")
117119
user_id: str = Field(..., description="Identifier for the user associated with the log")
118120
mem_cube_id: str = Field(
119121
..., description="Identifier for the memcube associated with this log entry"
120122
)
121123
label: str = Field(..., description="Label categorizing the type of log")
122-
from_memory_type: str = Field(..., description="Source memory type")
123-
to_memory_type: str = Field(..., description="Destination memory type")
124+
from_memory_type: str | None = Field(None, description="Source memory type")
125+
to_memory_type: str | None = Field(None, description="Destination memory type")
124126
log_content: str = Field(..., description="Detailed content of the log entry")
125127
current_memory_sizes: MemorySizes = Field(
126128
default_factory=lambda: dict(DEFAULT_MEMORY_SIZES),

src/memos/multi_mem_cube/single_cube.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ def _schedule_memory_tasks(
427427
try:
428428
message_item_read = ScheduleMessageItem(
429429
user_id=add_req.user_id,
430+
task_id=add_req.task_id,
430431
session_id=target_session_id,
431432
mem_cube_id=self.cube_id,
432433
mem_cube=self.naive_mem_cube,
@@ -448,6 +449,7 @@ def _schedule_memory_tasks(
448449
else:
449450
message_item_add = ScheduleMessageItem(
450451
user_id=add_req.user_id,
452+
task_id=add_req.task_id,
451453
session_id=target_session_id,
452454
mem_cube_id=self.cube_id,
453455
mem_cube=self.naive_mem_cube,
@@ -487,6 +489,7 @@ def _process_pref_mem(
487489
messages_list = [add_req.messages]
488490
message_item_pref = ScheduleMessageItem(
489491
user_id=add_req.user_id,
492+
task_id=add_req.task_id,
490493
session_id=target_session_id,
491494
mem_cube_id=self.cube_id,
492495
mem_cube=self.naive_mem_cube,

0 commit comments

Comments
 (0)