Skip to content
291 changes: 113 additions & 178 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import hashlib
import inspect
import json
import math
import os
import random
import re
Expand All @@ -29,7 +28,6 @@
import threading
import time
import uuid
import warnings
from datetime import datetime
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -71,7 +69,6 @@
MemoryRecord,
ScoreBasedContextCreator,
)
from camel.memories.blocks.chat_history_block import EmptyMemoryWarning
from camel.messages import (
BaseMessage,
FunctionCallingMessage,
Expand Down Expand Up @@ -823,16 +820,6 @@ def update_memory(
) -> None:
r"""Updates the agent memory with a new message.

If the single *message* exceeds the model's context window, it will
be **automatically split into multiple smaller chunks** before being
written into memory. This prevents later failures in
`ScoreBasedContextCreator` where an over-sized message cannot fit
into the available token budget at all.

This slicing logic handles both regular text messages (in the
`content` field) and long tool call results (in the `result` field of
a `FunctionCallingMessage`).

Args:
message (BaseMessage): The new message to add to the stored
messages.
Expand All @@ -843,153 +830,17 @@ def update_memory(
(default: obj:`None`)
"""

# 1. Helper to write a record to memory
def _write_single_record(
message: BaseMessage, role: OpenAIBackendRole, timestamp: float
):
self.memory.write_record(
MemoryRecord(
message=message,
role_at_backend=role,
timestamp=timestamp,
agent_id=self.agent_id,
)
self.memory.write_record(
MemoryRecord(
message=message,
role_at_backend=role,
timestamp=timestamp
if timestamp is not None
else time.time_ns() / 1_000_000_000, # Nanosecond precision
agent_id=self.agent_id,
)

base_ts = (
timestamp
if timestamp is not None
else time.time_ns() / 1_000_000_000
)

# 2. Get token handling utilities, fallback if unavailable
try:
context_creator = self.memory.get_context_creator()
token_counter = context_creator.token_counter
token_limit = context_creator.token_limit
except AttributeError:
_write_single_record(message, role, base_ts)
return

# 3. Check if slicing is necessary
try:
current_tokens = token_counter.count_tokens_from_messages(
[message.to_openai_message(role)]
)

with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=EmptyMemoryWarning)
_, ctx_tokens = self.memory.get_context()

remaining_budget = max(0, token_limit - ctx_tokens)

if current_tokens <= remaining_budget:
_write_single_record(message, role, base_ts)
return
except Exception as e:
logger.warning(
f"Token calculation failed before chunking, "
f"writing message as-is. Error: {e}"
)
_write_single_record(message, role, base_ts)
return

# 4. Perform slicing
logger.warning(
f"Message with {current_tokens} tokens exceeds remaining budget "
f"of {remaining_budget}. Slicing into smaller chunks."
)

text_to_chunk: Optional[str] = None
is_function_result = False

if isinstance(message, FunctionCallingMessage) and isinstance(
message.result, str
):
text_to_chunk = message.result
is_function_result = True
elif isinstance(message.content, str):
text_to_chunk = message.content

if not text_to_chunk or not text_to_chunk.strip():
_write_single_record(message, role, base_ts)
return
# Encode the entire text to get a list of all token IDs
try:
all_token_ids = token_counter.encode(text_to_chunk)
except Exception as e:
logger.error(f"Failed to encode text for chunking: {e}")
_write_single_record(message, role, base_ts) # Fallback
return

if not all_token_ids:
_write_single_record(message, role, base_ts) # Nothing to chunk
return

# 1. Base chunk size: one-tenth of the smaller of (a) total token
# limit and (b) current remaining budget. This prevents us from
# creating chunks that are guaranteed to overflow the
# immediate context window.
base_chunk_size = max(1, remaining_budget) // 10

# 2. Each chunk gets a textual prefix such as:
# "[chunk 3/12 of a long message]\n"
# The prefix itself consumes tokens, so if we do not subtract its
# length the *total* tokens of the outgoing message (prefix + body)
# can exceed the intended bound. We estimate the prefix length
# with a representative example that is safely long enough for the
# vast majority of cases (three-digit indices).
sample_prefix = "[chunk 1/1000 of a long message]\n"
prefix_token_len = len(token_counter.encode(sample_prefix))

# 3. The real capacity for the message body is therefore the base
# chunk size minus the prefix length. Fallback to at least one
# token to avoid zero or negative sizes.
chunk_body_limit = max(1, base_chunk_size - prefix_token_len)

# 4. Calculate how many chunks we will need with this body size.
num_chunks = math.ceil(len(all_token_ids) / chunk_body_limit)
group_id = str(uuid.uuid4())

for i in range(num_chunks):
start_idx = i * chunk_body_limit
end_idx = start_idx + chunk_body_limit
chunk_token_ids = all_token_ids[start_idx:end_idx]

chunk_body = token_counter.decode(chunk_token_ids)

prefix = f"[chunk {i + 1}/{num_chunks} of a long message]\n"
new_body = prefix + chunk_body

if is_function_result and isinstance(
message, FunctionCallingMessage
):
new_msg: BaseMessage = FunctionCallingMessage(
role_name=message.role_name,
role_type=message.role_type,
meta_dict=message.meta_dict,
content=message.content,
func_name=message.func_name,
args=message.args,
result=new_body,
tool_call_id=message.tool_call_id,
)
else:
new_msg = message.create_new_instance(new_body)

meta = (new_msg.meta_dict or {}).copy()
meta.update(
{
"chunk_idx": i + 1,
"chunk_total": num_chunks,
"chunk_group_id": group_id,
}
)
new_msg.meta_dict = meta

# Increment timestamp slightly to maintain order
_write_single_record(new_msg, role, base_ts + i * 1e-6)

def load_memory(self, memory: AgentMemory) -> None:
r"""Load the provided memory into the agent.

Expand Down Expand Up @@ -1785,17 +1636,60 @@ def _step_impl(
return self._step_terminate(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
# Get response from model backend
response = self._get_model_response(
openai_messages,
num_tokens=num_tokens,
current_iteration=iteration_count,
response_format=response_format,
tool_schemas=[]
if disable_tools
else self._get_full_tool_schemas(),
prev_num_openai_messages=prev_num_openai_messages,
)
# Get response from model backend with token limit error handling
try:
response = self._get_model_response(
openai_messages,
num_tokens=num_tokens,
current_iteration=iteration_count,
response_format=response_format,
tool_schemas=[]
if disable_tools
else self._get_full_tool_schemas(),
prev_num_openai_messages=prev_num_openai_messages,
)
except Exception as e:
# Check if this is a token limit error
error_message = str(e).lower()
is_token_limit_error = (
'context_length_exceeded' in error_message
or 'exceeded your current quota' in error_message
or 'tokens must be reduced' in error_message
or 'context length' in error_message
or 'token count' in error_message
or 'context limit' in error_message
)

if is_token_limit_error:
logger.warning(
"Token limit exceeded error detected. "
"Summarizing context."
)
full_context_file_path = (
self.working_directory / "full_context.json"
)
self.save_memory(path=str(full_context_file_path))
self.memory.clear()
summary = self.summarize()
summary_messages = (
"[Context Summary]\n\n"
+ summary.get('summary', '')
+ "\n\n[Full Context file path is]\n\n"
+ str(full_context_file_path)
)
camel_workdir = os.environ.get("CAMEL_WORKDIR")
if camel_workdir:
self.working_directory = (
Path(camel_workdir) / "context_files"
)
else:
self.working_directory = Path("context_files")

self.update_memory(
summary_messages, OpenAIBackendRole.ASSISTANT
)
self._step_impl(input_message, response_format)

prev_num_openai_messages = len(openai_messages)
iteration_count += 1

Expand Down Expand Up @@ -1990,6 +1884,7 @@ async def _astep_non_streaming_task(
step_token_usage = self._create_token_usage_tracker()
iteration_count: int = 0
prev_num_openai_messages: int = 0

while True:
if self.pause_event is not None and not self.pause_event.is_set():
if isinstance(self.pause_event, asyncio.Event):
Expand All @@ -2005,16 +1900,56 @@ async def _astep_non_streaming_task(
return self._step_terminate(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
response = await self._aget_model_response(
openai_messages,
num_tokens=num_tokens,
current_iteration=iteration_count,
response_format=response_format,
tool_schemas=[]
if disable_tools
else self._get_full_tool_schemas(),
prev_num_openai_messages=prev_num_openai_messages,
)
# Get response from model backend with token limit error handling
try:
response = await self._aget_model_response(
openai_messages,
num_tokens=num_tokens,
current_iteration=iteration_count,
response_format=response_format,
tool_schemas=[]
if disable_tools
else self._get_full_tool_schemas(),
prev_num_openai_messages=prev_num_openai_messages,
)
except Exception as e:
# Check if this is a token limit error
error_message = str(e).lower()
is_token_limit_error = (
'context_length_exceeded' in error_message
or 'exceeded your current quota' in error_message
or 'tokens must be reduced' in error_message
or 'context length' in error_message
or 'token count exceeds' in error_message
or 'context limit' in error_message
)

if is_token_limit_error:
logger.warning(
"Token limit exceeded error detected. "
"Summarizing context."
)

full_context_file_path = (
self.working_directory / "full_context.json"
)
self.save_memory(path=str(full_context_file_path))
self.memory.clear()
summary = self.summarize()
summary_messages = (
"[Context Summary]\n\n"
+ summary.get('summary', '')
+ "\n\n[Full Context file path is]\n\n"
+ str(full_context_file_path)
)

self.update_memory(
summary_messages, OpenAIBackendRole.ASSISTANT
)
self._astep_non_streaming_task(
input_message, response_format
)

prev_num_openai_messages = len(openai_messages)
iteration_count += 1

Expand Down
9 changes: 5 additions & 4 deletions camel/storages/vectordb_storages/oceanbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ def __init__(
)

# Get the first index parameter
first_index_param = next(iter(index_params))
self._client.create_vidx_with_vec_index_param(
table_name=self.table_name, vidx_param=first_index_param
)
first_index_param = next(iter(index_params), None)
if first_index_param is not None:
self._client.create_vidx_with_vec_index_param(
table_name=self.table_name, vidx_param=first_index_param
)

logger.info(f"Created table {self.table_name} with vector index")
else:
Expand Down
Loading