Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/agents/memory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from .openai_conversations_session import OpenAIConversationsSession
from .openai_responses_compaction_session import OpenAIResponsesCompactionSession
from .session import (
SERVER_MANAGED_CONVERSATION_SESSION_ATTR,
OpenAIResponsesCompactionArgs,
OpenAIResponsesCompactionAwareSession,
ServerManagedConversationSession,
Session,
SessionABC,
SessionHistoryMutation,
SessionHistoryRewriteArgs,
SessionHistoryRewriteAwareSession,
apply_session_history_mutations,
is_openai_responses_compaction_aware_session,
is_server_managed_conversation_session,
is_session_history_rewrite_aware_session,
)
from .session_settings import SessionSettings
from .sqlite_session import SQLiteSession
Expand All @@ -21,5 +29,13 @@
"OpenAIResponsesCompactionSession",
"OpenAIResponsesCompactionArgs",
"OpenAIResponsesCompactionAwareSession",
"SERVER_MANAGED_CONVERSATION_SESSION_ATTR",
"SessionHistoryMutation",
"SessionHistoryRewriteArgs",
"SessionHistoryRewriteAwareSession",
"ServerManagedConversationSession",
"apply_session_history_mutations",
"is_server_managed_conversation_session",
"is_openai_responses_compaction_aware_session",
"is_session_history_rewrite_aware_session",
]
1 change: 1 addition & 0 deletions src/agents/memory/openai_conversations_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async def start_openai_conversations_session(openai_client: AsyncOpenAI | None =


class OpenAIConversationsSession(SessionABC):
_server_managed_conversation_session = True
session_settings: SessionSettings | None = None

def __init__(
Expand Down
228 changes: 196 additions & 32 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
OpenAIResponsesCompactionArgs,
OpenAIResponsesCompactionAwareSession,
SessionABC,
SessionHistoryRewriteArgs,
apply_session_history_mutations,
is_session_history_rewrite_aware_session,
)

if TYPE_CHECKING:
Expand All @@ -24,6 +27,14 @@
OpenAIResponsesCompactionMode = Literal["previous_response_id", "input", "auto"]


def _is_user_message_item(item: TResponseInputItem) -> bool:
if not isinstance(item, dict):
return False
if item.get("type") == "message":
return item.get("role") == "user"
return item.get("role") == "user" and "content" in item


def select_compaction_candidate_items(
items: list[TResponseInputItem],
) -> list[TResponseInputItem]:
Expand All @@ -32,18 +43,12 @@ def select_compaction_candidate_items(
Excludes user messages and compaction items.
"""

def _is_user_message(item: TResponseInputItem) -> bool:
if not isinstance(item, dict):
return False
if item.get("type") == "message":
return item.get("role") == "user"
return item.get("role") == "user" and "content" in item

return [
item
for item in items
if not (
_is_user_message(item) or (isinstance(item, dict) and item.get("type") == "compaction")
_is_user_message_item(item)
or (isinstance(item, dict) and item.get("type") == "compaction")
)
]

Expand Down Expand Up @@ -129,48 +134,119 @@ def __init__(
self._session_items: list[TResponseInputItem] | None = None
self._response_id: str | None = None
self._deferred_response_id: str | None = None
self._last_unstored_response_id: str | None = None
self._last_store: bool | None = None
self._has_pending_local_history_rewrite = False
self._local_history_rewrite_response_id: str | None = None
self._has_unacknowledged_local_session_adds = False

@property
def client(self) -> AsyncOpenAI:
if self._client is None:
self._client = get_default_openai_client() or AsyncOpenAI()
return self._client

def _resolve_compaction_mode_for_response(
def _resolve_compaction_mode(
self,
*,
requested_mode: OpenAIResponsesCompactionMode,
response_id: str | None,
store: bool | None,
requested_mode: OpenAIResponsesCompactionMode | None,
turn_has_local_adds_without_new_response_id: bool,
) -> _ResolvedCompactionMode:
mode = requested_mode or self.compaction_mode
if (
mode == "auto"
and store is None
and response_id is not None
and response_id == self._last_unstored_response_id
):
resolved_mode = _resolve_compaction_mode(
requested_mode,
response_id=response_id,
store=store,
)

if turn_has_local_adds_without_new_response_id and resolved_mode == "previous_response_id":
self._has_unacknowledged_local_session_adds = False
self._mark_local_history_rewrite()
logger.debug(
"compact: forcing input mode after local session delta without new response id"
)
return "input"

if not self._has_pending_local_history_rewrite:
return resolved_mode

if resolved_mode == "previous_response_id":
if self._local_history_rewrite_response_id is None and response_id is not None:
self._local_history_rewrite_response_id = response_id
logger.debug("compact: forcing input mode after local history rewrite")
return "input"
return _resolve_compaction_mode(mode, response_id=response_id, store=store)

return resolved_mode

def _resolve_store_tracking(
self,
*,
response_id: str | None,
previous_response_id: str | None,
store: bool | None,
store_was_provided: bool,
) -> bool | None:
"""Resolve the effective store setting for the current response id.

Reuse `_last_store` only while compaction still refers to the same response. A new
response id with no explicit `store` falls back to the Responses API default behavior.
"""
if store_was_provided:
self._last_store = store
return store

if response_id is not None and response_id != previous_response_id:
self._last_store = None
return None

return self._last_store

def _get_effective_store_for_response_id(
self,
*,
response_id: str | None,
store: bool | None,
) -> bool | None:
"""Return the effective store setting without mutating response tracking."""
if store is not None:
return store
if response_id is not None and response_id != self._response_id:
return None
return self._last_store

async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None) -> None:
"""Run compaction using responses.compact API."""
previous_response_id = self._response_id
if args and args.get("response_id"):
self._response_id = args["response_id"]
requested_mode = args.get("compaction_mode") if args else None
if args and "store" in args:
store = args["store"]
if store is False and self._response_id:
self._last_unstored_response_id = self._response_id
elif store is True and self._response_id == self._last_unstored_response_id:
self._last_unstored_response_id = None
else:
store = None
resolved_mode = self._resolve_compaction_mode_for_response(
store_was_provided = bool(args and "store" in args)
requested_store: bool | None = (
args["store"] if args is not None and "store" in args else None
)
store = self._resolve_store_tracking(
response_id=self._response_id,
previous_response_id=previous_response_id,
store=requested_store,
store_was_provided=store_was_provided,
)
turn_has_local_adds_without_new_response_id = (
self._has_unacknowledged_local_session_adds
and (args is None or args.get("response_id") in {None, previous_response_id})
)
if (
args
and args.get("response_id") is not None
and args["response_id"] != previous_response_id
):
self._has_unacknowledged_local_session_adds = False
resolved_mode = self._resolve_compaction_mode(
response_id=self._response_id,
store=store,
requested_mode=requested_mode,
requested_mode=requested_mode or self.compaction_mode,
turn_has_local_adds_without_new_response_id=(
turn_has_local_adds_without_new_response_id
),
)

if resolved_mode == "previous_response_id" and not self._response_id:
Expand Down Expand Up @@ -198,6 +274,15 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
)
return

frontier_unresolved_function_calls = _find_frontier_unresolved_function_calls(session_items)
if frontier_unresolved_function_calls:
logger.debug(
"compact: blocked unresolved function calls for %s: %s",
self._response_id,
frontier_unresolved_function_calls,
)
return

self._deferred_response_id = None
logger.debug(
f"compact: start for {self._response_id} using {self.model} (mode={resolved_mode})"
Expand Down Expand Up @@ -229,6 +314,8 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None

self._compaction_candidate_items = select_compaction_candidate_items(output_items)
self._session_items = output_items
if resolved_mode == "input":
self._clear_pending_local_history_rewrite()

logger.debug(
f"compact: done for {self._response_id} "
Expand All @@ -239,14 +326,37 @@ async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)

async def apply_history_mutations(self, args: SessionHistoryRewriteArgs) -> None:
"""Rewrite persisted history and keep compaction caches aligned with the new transcript."""
mutations = list(args.get("mutations", []))
if not mutations:
return

if is_session_history_rewrite_aware_session(self.underlying_session):
await self.underlying_session.apply_history_mutations({"mutations": mutations})
await self._refresh_caches_from_underlying_session()
self._mark_local_history_rewrite()
return

rewritten_items = apply_session_history_mutations(
await self.underlying_session.get_items(),
mutations,
)
await self.underlying_session.clear_session()
if rewritten_items:
await self.underlying_session.add_items(rewritten_items)
self._session_items = rewritten_items
self._compaction_candidate_items = select_compaction_candidate_items(rewritten_items)
self._mark_local_history_rewrite()

async def _defer_compaction(self, response_id: str, store: bool | None = None) -> None:
if self._deferred_response_id is not None:
return
compaction_candidate_items, session_items = await self._ensure_compaction_candidates()
resolved_mode = self._resolve_compaction_mode_for_response(
resolved_mode = _resolve_compaction_mode(
self.compaction_mode,
response_id=response_id,
store=store,
requested_mode=None,
store=self._get_effective_store_for_response_id(response_id=response_id, store=store),
)
should_compact = self.should_trigger_compaction(
{
Expand All @@ -266,7 +376,10 @@ def _clear_deferred_compaction(self) -> None:
self._deferred_response_id = None

async def add_items(self, items: list[TResponseInputItem]) -> None:
if not items:
return
await self.underlying_session.add_items(items)
self._has_unacknowledged_local_session_adds = True
if self._compaction_candidate_items is not None:
new_candidates = select_compaction_candidate_items(items)
if new_candidates:
Expand All @@ -286,6 +399,15 @@ async def clear_session(self) -> None:
self._compaction_candidate_items = []
self._session_items = []
self._deferred_response_id = None
self._has_pending_local_history_rewrite = False
self._local_history_rewrite_response_id = None
self._has_unacknowledged_local_session_adds = False
self._last_store = None

async def _refresh_caches_from_underlying_session(self) -> None:
history = await self.underlying_session.get_items()
self._session_items = history
self._compaction_candidate_items = select_compaction_candidate_items(history)

async def _ensure_compaction_candidates(
self,
Expand All @@ -304,10 +426,52 @@ async def _ensure_compaction_candidates(
)
return (candidates[:], history[:])

def _mark_local_history_rewrite(self) -> None:
self._has_pending_local_history_rewrite = True
self._local_history_rewrite_response_id = self._response_id

def _clear_pending_local_history_rewrite(self) -> None:
self._has_pending_local_history_rewrite = False
self._local_history_rewrite_response_id = None


_ResolvedCompactionMode = Literal["previous_response_id", "input"]


def _find_frontier_unresolved_function_calls(items: list[TResponseInputItem]) -> list[str]:
"""Return unresolved function-call ids that remain in the active conversation frontier.

Once a later user message appears, earlier unresolved tool calls are considered abandoned and
should no longer block future compaction for the session.
"""
function_call_indices: dict[str, int] = {}
resolved_call_ids: set[str] = set()
last_user_message_index = -1

for index, item in enumerate(items):
if _is_user_message_item(item):
last_user_message_index = index
if isinstance(item, dict):
item_type = item.get("type")
call_id = item.get("call_id")
else:
item_type = getattr(item, "type", None)
call_id = getattr(item, "call_id", None)

if not isinstance(call_id, str):
continue
if item_type == "function_call":
function_call_indices[call_id] = index
elif item_type == "function_call_output":
resolved_call_ids.add(call_id)

return [
call_id
for call_id, index in function_call_indices.items()
if call_id not in resolved_call_ids and index > last_user_message_index
]


def _resolve_compaction_mode(
requested_mode: OpenAIResponsesCompactionMode,
*,
Expand Down
Loading
Loading