| status | accepted |
|---|---|
| contact | eavanvalkenburg |
| date | 2026-02-10 |
| deciders | eavanvalkenburg, markwallace-microsoft, sphenry, alliscode, johanst, brettcannon, westey-m |
| consulted | taochenosu, moonbox3, dmytrostruk, giles17 |
Long-running agents need context compaction — automatically summarizing or truncating conversation history when approaching token limits. This is particularly important for agents that make many tool calls in succession (10s or 100s), where the context can grow unboundedly.
ADR-0016 established the ContextProvider (hooks pattern) and HistoryProvider architecture for session management and context engineering. The .NET SDK comparison table notes:
Message reduction:
IChatReduceronInMemoryChatHistoryProvider→ Not yet designed (see Open Discussion: Context Compaction)
This ADR proposes a design for context compaction that integrates with the chosen architecture.
An analysis of the current message flow identified three structural barriers to implementing compaction inside the tool loop:
-
History loaded once:
HistoryProvider.get_messages()is only called once duringbefore_runat the start ofagent.run(). The tool loop maintains its own message list internally and never re-reads from the provider. -
ChatMiddlewaremodifies copies:ChatMiddlewarereceives a copy of the message list each iteration. Clearing/replacingcontext.messagesin middleware only affects that single LLM call — the tool loop's internal message list keeps growing with each tool result. -
FunctionMiddlewarewraps tool calls, not LLM calls:FunctionMiddlewareruns around individual tool executions, not around the LLM call that triggers them. It cannot modify the message history between iterations.
agent.run(task)
│
├── ContextProvider.before_run() ← Load history, inject context ONCE
│
├── chat_client.get_response(messages)
│ │
│ ├── messages = copy(messages) ← NEW list created
│ │
│ └── for attempt in range(max_iterations): ← TOOL LOOP
│ ├── ChatMiddleware(copy of messages) ← Modifies copy only
│ ├── LLM call(messages) ← Response may contain tool_calls
│ ├── FunctionMiddleware(tool_call) ← Wraps each tool execution
│ │ └── Execute single tool call
│ └── messages.extend(tool_results) ← List grows unbounded
│
└── ContextProvider.after_run() ← Store messages ONCE
Consequence: There is currently no way to compact messages during the tool loop such that subsequent LLM calls use the reduced context. Any middleware-based approach only affects individual LLM calls but the underlying list keeps growing.
A critical correctness constraint for any compaction strategy: tool calls and their results must be kept together. LLM APIs (OpenAI, Azure, etc.) require that an assistant message containing tool_calls is always followed by corresponding tool result messages. A compaction strategy that removes one without the other will cause API errors. This is extended for reasoning models, at least in the OpenAI Responses API with a Reasoning content, without it you also get failed calls.
Strategies must treat [assistant message with tool_calls] + [tool result messages] as atomic groups — either keep the entire group or remove it entirely. Option 1 addresses this structurally in both Variant C1 (precomputed MessageGroups) and Variant C2 (precomputed _group_* annotations on messages), so strategy authors do not need to rediscover raw boundaries on every pass.
Compaction must be applicable in three primary points in the agent lifecycle:
| Point | When | Purpose |
|---|---|---|
| In-run | During the (potentially) multiple calls to a ChatClient's get_response within a single agent.run() |
Keep context within limits as tool calls accumulate and project only included messages per model call |
| Pre-write* | Before HistoryProvider.save_messages() in after_run |
Compact before persisting to storage, limiting storage size, only applies to messages from a run |
| On existing storage* | Outside of agent.run(), as a maintenance operation |
Compact stored history (e.g., cron job, manual trigger) |
*: Should pre-write and existing-storage compaction share one unified configuration/setup to reduce duplicate strategy wiring, and then either: each write overrides the full storage, or only new messages are compacted while a separate interface can be called to compact the existing storage?
All compaction discussed in this ADR is irrelevant when using only service-managed storage (service_session_id is set). In that scenario:
- The service manages message history internally — the client never holds the full conversation
- Only new messages are sent to/from the service each turn
- The service is responsible for its own context window management and compaction
- The client has no message list to compact
This ADR applies to two scenarios where the client constructs and manages the message list sent to the model:
- With local storage (e.g.,
InMemoryHistoryProvider, Redis, Cosmos) — compaction is needed during a run, currently no compaction is done in our abstractions. - Without any storage (
store=False, noHistoryProvider) — in-run compaction is still critical for long-running, tool-heavy agent invocations where the message list grows unbounded within a singleagent.run()call
- Applicable across primary points: The strategy model must work at pre-write, in-run, and on existing storage, this means it must be:
- Composable with HistoryProvider: Works naturally with the
HistoryProvidersubclass from ADR-0016 - Composable with function calling/chat clients: Can be applied during the inner loop of the chat clients
- Composable with HistoryProvider: Works naturally with the
- Message-list correctness: Compaction must preserve required assistant/tool/result ordering and reasoning/tool-call pairings so the model input stays valid
- Chainable/Composable: Multiple strategies must be composable (e.g., summarize older messages then truncate to fit token budget).
- Standalone
CompactionStrategyobject composed intoHistoryProviderandChatClient CompactionStrategyas a mixin forHistoryProvidersubclasses- Separate
CompactionProviderset directly on the agent - Mutable message access in
ChatMiddleware
Define an abstract CompactionStrategy that can be composed into any HistoryProvider and also passed to the agent for in-run compaction.
There are three sub-variants for the method signature, which differ in mutability semantics and input structure, all of them use __call__ to be easily used as a callable, and allow simple strategies to be expressed as simple functions, and if you need additional state or helper methods you can implement a class with __call__:
The strategy mutates the provided list directly and returns bool indicating whether compaction occurred. Zero-allocation in the no-op case, and the tool loop doesn't need to reassign the list.
@runtime_checkable
class CompactionStrategy(Protocol):
"""Abstract strategy for compacting a list of messages in place."""
async def __call__(self, messages: list[Message]) -> bool:
"""Compact messages in place. Returns True if compaction occurred."""
...The strategy returns a new list (leaving the original unchanged) plus a bool indicating whether compaction occurred. This is safer when the caller needs the original list preserved (e.g., for logging or fallback), and is a more functional style that avoids side-effect surprises.
@runtime_checkable
class CompactionStrategy(Protocol):
"""Abstract strategy for compacting a list of messages."""
async def __call__(self, messages: Sequence[Message]) -> tuple[list[Message], bool]:
"""Return (compacted_messages, did_compact)."""
...Tool loop integration requires reassignment:
# Inside the function invocation loop
messages.append(tool_result_message)
if compacter := config.get("compaction_strategy"):
compacted, did_compact = await compacter(messages)
if did_compact:
messages.clear()
messages.extend(compacted)Variant C has two sub-variants that provide the same logical grouping behavior:
- C1 (
MessageGroupsstate object): group metadata lives in a sidecar container. - C2 (
_-prefixed message attributes): group metadata lives directly on messages inadditional_properties.
Both approaches let strategies operate on logical units (system, user, assistant_text, tool_call) instead of re-deriving boundaries every time.
@dataclass
class MessageGroup:
"""A logical group of messages that must be kept or removed together."""
kind: Literal["system", "user", "assistant_text", "tool_call"]
messages: list[Message]
@property
def length(self) -> int:
"""Number of messages in this group."""
return len(self.messages)
@dataclass
class MessageGroups:
groups: list[MessageGroup]
@classmethod
def from_messages(cls, messages: list[Message]) -> "MessageGroups":
"""Build grouped state from a flat message list."""
groups: list[MessageGroup] = []
i = 0
while i < len(messages):
msg = messages[i]
if msg.role == "system":
groups.append(MessageGroup(kind="system", messages=[msg]))
i += 1
elif msg.role == "user":
groups.append(MessageGroup(kind="user", messages=[msg]))
i += 1
elif msg.role == "assistant" and getattr(msg, "tool_calls", None):
group_msgs = [msg]
i += 1
while i < len(messages) and messages[i].role == "tool":
group_msgs.append(messages[i])
i += 1
groups.append(MessageGroup(kind="tool_call", messages=group_msgs))
else:
groups.append(MessageGroup(kind="assistant_text", messages=[msg]))
i += 1
return cls(groups)
def summary(self) -> dict[str, int]:
return {
"group_count": len(self.groups),
"message_count": sum(len(g.messages) for g in self.groups),
"tool_call_count": sum(1 for g in self.groups if g.kind == "tool_call"),
}
def to_messages(self) -> list[Message]:
"""Flatten grouped state back into a flat message list."""
return [msg for group in self.groups for msg in group.messages]
class CompactionStrategy(Protocol):
"""Callable strategy for group-aware compaction."""
async def __call__(self, groups: MessageGroups) -> bool:
"""Compact by mutating grouped state. Returns True if changed.
Group kinds:
- "system": system message(s)
- "user": a single user message
- "assistant_text": an assistant message without tool calls
- "tool_call": an assistant message with tool_calls + all corresponding
tool result messages (atomic unit)
"""
...Class-based strategies implement __call__ directly:
class ExcludeOldestGroupsStrategy:
async def __call__(self, groups: MessageGroups) -> bool:
# Mutate grouped state in place.
...The framework builds and flattens grouped state through MessageGroups methods:
# Usage at a compaction point:
groups = MessageGroups.from_messages(messages)
logger.debug("Pre-compaction summary: %s", groups.summary())
# optional also emit OTEL events next to these loggers, but not sure if needed
await strategy(groups)
logger.debug("Post-compaction summary: %s", groups.summary())
response = await get_response(messages=groups.to_messages())
# add messages from response into new group and to the groups.Note on in-run integration (C1): Variant C1 requires maintaining grouped sidecar state (MessageGroups / underlying list[MessageGroup]) alongside the function-calling loop message list. Because BaseChatClient is stateless between calls, C1 cannot be cleanly implemented only in BaseChatClient; a stateful loop layer must own and update that grouped structure across roundtrips.
Variant C2 achieves the same grouping behavior as C1 but stores grouping metadata on messages instead of in a sidecar MessageGroups object.
def _annotate_groups(messages: list[Message]) -> None:
"""Annotate messages with group metadata in additional_properties.
Metadata keys:
- "_group_id": stable group id for all messages in the same logical unit
- "_group_kind": "system" | "user" | "assistant_text" | "tool_call"
- "_group_index": order of groups in the current list
"""
group_index = 0
i = 0
while i < len(messages):
msg = messages[i]
group_id = f"g-{group_index}"
if msg.role == "assistant" and getattr(msg, "tool_calls", None):
msg.additional_properties["_group_id"] = group_id
msg.additional_properties["_group_kind"] = "tool_call"
msg.additional_properties["_group_index"] = group_index
i += 1
while i < len(messages) and messages[i].role == "tool":
messages[i].additional_properties["_group_id"] = group_id
messages[i].additional_properties["_group_kind"] = "tool_call"
messages[i].additional_properties["_group_index"] = group_index
i += 1
else:
kind = (
"system" if msg.role == "system"
else "user" if msg.role == "user"
else "assistant_text"
)
msg.additional_properties["_group_id"] = group_id
msg.additional_properties["_group_kind"] = kind
msg.additional_properties["_group_index"] = group_index
i += 1
group_index += 1
class CompactionStrategy(Protocol):
async def __call__(self, messages: list[Message]) -> bool:
"""Compact using message annotations; mutate in place."""
...Note on in-run integration (C2): BaseChatClient should annotate new messages incrementally as they are appended (rather than re-running _annotate_groups over the full list every roundtrip). Unlike C1, C2 does not require a separate grouped sidecar in the function-calling loop; strategies can operate directly on list[Message] using _group_* metadata attached to the messages themselves. This makes C2 feasible as a fully BaseChatClient-localized implementation and provides a cleaner separation of responsibilities. In C2 and derived variants (D2/E2/F2), full ownership of compaction and message-attribute lifecycle belongs to the chat client to avoid double work: the chat client assigns/updates attributes (including _group_id for new tool-result messages added by function calling), and the function-calling layer remains unaware of this mechanism.
Variant D also has two sub-variants:
- D1: exclusion state on
MessageGroup. - D2: exclusion state on message
_-attributes.
@dataclass
class MessageGroup:
kind: Literal["system", "user", "assistant_text", "tool_call"]
messages: list[Message]
excluded: bool = False
exclude_reason: str | None = None
@dataclass
class MessageGroups:
groups: list[MessageGroup]
def summary(self) -> dict[str, int]:
return {
"group_count": len(self.groups),
"message_count": sum(len(g.messages) for g in self.groups),
"tool_call_count": sum(1 for g in self.groups if g.kind == "tool_call"),
"included_group_count": sum(1 for g in self.groups if not g.excluded),
"included_message_count": sum(len(g.messages) for g in self.groups if not g.excluded),
"included_tool_call_count": sum(
1 for g in self.groups if g.kind == "tool_call" and not g.excluded
),
}
def get_messages(self, *, excluded: bool = False) -> list[Message]:
if excluded:
return [msg for g in self.groups for msg in g.messages]
return [msg for g in self.groups if not g.excluded for msg in g.messages]
def included_messages(self) -> list[Message]:
return self.get_messages(excluded=False)During compaction, strategies/orchestrators mutate group.excluded/group.exclude_reason (including re-including groups with excluded=False) instead of discarding data.
def set_group_excluded(messages: list[Message], *, group_id: str, reason: str | None = None) -> None:
for msg in messages:
if msg.additional_properties.get("_group_id") == group_id:
msg.additional_properties["_excluded"] = True
msg.additional_properties["_exclude_reason"] = reason
def clear_group_excluded(messages: list[Message], *, group_id: str) -> None:
for msg in messages:
if msg.additional_properties.get("_group_id") == group_id:
msg.additional_properties["_excluded"] = False
msg.additional_properties["_exclude_reason"] = None
def included_messages(messages: list[Message]) -> list[Message]:
return [m for m in messages if not m.additional_properties.get("_excluded", False)]In D2, strategies project included context by filtering on _excluded instead of filtering MessageGroup objects.
Variant E has two sub-variants:
- E1: token rollups cached on
MessageGroup/MessageGroups. - E2: token rollups cached directly on messages via
_-attributes.
Variant E1 adds tokenization metadata and cached token rollups to grouped state. This is independent of exclusion: token-aware strategies can use token metrics even if no groups are excluded. When combined with Variant D, token budgets can be enforced against included messages.
To make token-budget compaction deterministic:
- Before every
get_responsecall in the tool loop, tokenize every message currently inall_messages(regardless of source). - Persist per-content token counts in
content.additional_properties["_token_count"]. - Build/update grouped state from tokenized messages and use cached rollups for threshold checks and summaries.
class TokenizerProtocol(Protocol):
def count_tokens(self, content: AIContent, *, model_id: str | None = None) -> int: ...
@dataclass
class MessageGroup:
kind: Literal["system", "user", "assistant_text", "tool_call"]
messages: list[Message]
_token_count_cache: int | None = None
def token_count(self) -> int:
if self._token_count_cache is None:
self._token_count_cache = sum(
content.additional_properties.get("_token_count", 0)
for message in self.messages
for content in message.contents
)
return self._token_count_cache
@dataclass
class MessageGroups:
groups: list[MessageGroup]
_total_tokens_cache: int | None = None
def total_tokens(self) -> int:
if self._total_tokens_cache is None:
self._total_tokens_cache = sum(group.token_count() for group in self.groups)
return self._total_tokens_cache
def summary(self) -> dict[str, int]:
return {
"group_count": len(self.groups),
"message_count": sum(len(g.messages) for g in self.groups),
"tool_call_count": sum(1 for g in self.groups if g.kind == "tool_call"),
"total_tokens": self.total_tokens(),
"tool_call_tokens": sum(g.token_count() for g in self.groups if g.kind == "tool_call"),
}And the following helper method should also be added:
def _to_tokenized_groups(
messages: list[Message], *, tokenizer: TokenizerProtocol
) -> MessageGroups:
tokenize_messages(messages, tokenizer=tokenizer)
return MessageGroups.from_messages(messages)def annotate_token_counts(messages: list[Message], *, tokenizer: TokenizerProtocol) -> None:
for message in messages:
message_token_count = 0
for content in message.contents:
count = tokenizer.count_tokens(content)
content.additional_properties["_token_count"] = count
message_token_count += count
message.additional_properties["_message_token_count"] = message_token_count
def sum_tokens_by_group(messages: list[Message]) -> dict[str, int]:
"""Compute group totals on demand from `_message_token_count`."""
tokens_by_group: dict[str, int] = {}
for message in messages:
group_id = message.additional_properties["_group_id"]
tokens_by_group[group_id] = tokens_by_group.get(group_id, 0) + message.additional_properties.get(
"_message_token_count", 0
)
return tokens_by_groupIn E2, strategies evaluate _message_token_count/_token_count directly from messages and compute per-group totals on demand via _group_id (instead of caching _group_token_count on every message). This avoids duplicated state and ambiguity when one copy is updated but others are stale. If needed for performance, the function-invocation loop can keep an ephemeral dict[group_id, token_count] alongside the annotated message list.
Variant F has two sub-variants:
- F1: combined model on
MessageGroups. - F2: combined model on
_-annotated messages.
Variant F1 combines Variant C1's grouped interface, Variant D1's exclusion semantics, and Variant E1's token accounting in one integrated model. This gives one state container for projection (excluded) and budget control (token_count), while preserving full history for final-return and diagnostics.
For Variant F1, MessageGroups.from_messages(...) accepts an optional tokenizer and handles both tokenization and grouping before strategy execution:
class TokenizerProtocol(Protocol):
def count_tokens(self, content: AIContent, *, model_id: str | None = None) -> int: ...
@dataclass
class MessageGroup:
kind: Literal["system", "user", "assistant_text", "tool_call"]
messages: list[Message]
excluded: bool = False
exclude_reason: str | None = None
_token_count_cache: int | None = None
def token_count(self) -> int:
if self._token_count_cache is None:
self._token_count_cache = sum(
content.additional_properties.get("_token_count", 0)
for message in self.messages
for content in message.contents
)
return self._token_count_cache
@dataclass
class MessageGroups:
groups: list[MessageGroup]
_total_tokens_cache: int | None = None
@classmethod
def from_messages(
cls,
messages: list[Message],
*,
tokenizer: TokenizerProtocol | None = None,
) -> "MessageGroups":
if tokenizer is not None:
tokenize_messages(messages, tokenizer=tokenizer)
groups: list[MessageGroup] = []
i = 0
while i < len(messages):
msg = messages[i]
if msg.role == "system":
groups.append(MessageGroup(kind="system", messages=[msg]))
i += 1
elif msg.role == "user":
groups.append(MessageGroup(kind="user", messages=[msg]))
i += 1
elif msg.role == "assistant" and getattr(msg, "tool_calls", None):
group_msgs = [msg]
i += 1
while i < len(messages) and messages[i].role == "tool":
group_msgs.append(messages[i])
i += 1
groups.append(MessageGroup(kind="tool_call", messages=group_msgs))
else:
groups.append(MessageGroup(kind="assistant_text", messages=[msg]))
i += 1
return cls(groups)
def get_messages(self, *, excluded: bool = False) -> list[Message]:
if excluded:
return [msg for g in self.groups for msg in g.messages]
return [msg for g in self.groups if not g.excluded for msg in g.messages]
def included_messages(self) -> list[Message]:
return self.get_messages(excluded=False)
def total_tokens(self) -> int:
if self._total_tokens_cache is None:
self._total_tokens_cache = sum(group.token_count() for group in self.groups)
return self._total_tokens_cache
def included_token_count(self) -> int:
return sum(g.token_count() for g in self.groups if not g.excluded)
def summary(self) -> dict[str, int]:
return {
"group_count": len(self.groups),
"message_count": sum(len(g.messages) for g in self.groups),
"tool_call_count": sum(1 for g in self.groups if g.kind == "tool_call"),
"included_group_count": sum(1 for g in self.groups if not g.excluded),
"included_message_count": sum(len(g.messages) for g in self.groups if not g.excluded),
"included_tool_call_count": sum(
1 for g in self.groups if g.kind == "tool_call" and not g.excluded
),
"total_tokens": self.total_tokens(),
"tool_call_tokens": sum(g.token_count() for g in self.groups if g.kind == "tool_call"),
"included_tokens": self.included_token_count(),
}
class CompactionStrategy(Protocol):
async def __call__(self, groups: MessageGroups) -> None:
"""Mutate the provided groups in place."""
...class CompactionStrategy(Protocol):
async def __call__(self, messages: list[Message]) -> bool:
"""Mutate message annotations in place."""
...
async def compact_with_annotations(
messages: list[Message], *, strategy: CompactionStrategy, tokenizer: TokenizerProtocol
) -> list[Message]:
# C2: annotate group boundaries
_annotate_groups(messages)
# E2: annotate token metrics
annotate_token_counts(messages, tokenizer=tokenizer)
_ = sum_tokens_by_group(messages) # optional ephemeral aggregate in loop state
# D2/F2: strategy toggles _excluded/_exclude_reason and can rewrite messages
_ = await strategy(messages)
# Project only included messages for model call
return [m for m in messages if not m.additional_properties.get("_excluded", False)]F2 avoids a sidecar object but requires strict ownership rules for _ attributes (who sets, updates, clears, and validates them). To prevent duplicate work and drift, this ownership should live entirely in BaseChatClient, while the function-calling layer remains attribute-unaware.
Trade-offs between variants:
| Aspect | Variant A (in-place) | Variant B (return new) | Variant C1 (MessageGroups) |
Variant C2 (_ attrs) |
Variant D1 (MessageGroups exclude) |
Variant D2 (_excluded attrs) |
Variant E1 (group token caches) | Variant E2 (message token attrs + on-demand group sums) | Variant F1 (MessageGroups combined) |
Variant F2 (_ attrs combined) |
|---|---|---|---|---|---|---|---|---|---|---|
| Allocation | Zero in no-op case | Always allocates tuple | Grouping sidecar allocation | No sidecar; metadata writes | D1 + exclusion state | D2 + metadata writes | E1 + token cache sidecar | E2 + message metadata writes | Highest sidecar state | No sidecar; highest metadata writes |
| Safety | Caller loses original | Original preserved | State isolated in sidecar | Metadata mutates source messages | Full grouped history preserved | Full message history preserved | Deterministic token rollups in sidecar | Deterministic token rollups on messages | Strong isolation of all compaction state | Shared-message mutation can leak across layers |
| Strategy complexity | Must handle atomic groups | Must handle atomic groups | Groups pre-computed by framework | Reads _group_* fields |
Exclude/re-include by group | Exclude/re-include by _group_id |
Token budget via group APIs | Token budget via _token* fields |
Unified exclude + token policy via group APIs | Unified policy via many message attrs |
| Chaining | Natural (same list) | Pipe output to next input | Natural (same group state) | Natural (same annotated message list) | Natural | Natural | Natural | Natural | Natural | Natural |
| Framework complexity | Minimal | Reassignment logic | Grouping + flattening layer | Annotation lifecycle/validation | C1 + exclusion semantics | C2 + projection/filter semantics | C1 + tokenizer + cache invalidation | C2 + tokenizer + attr invalidation | Highest sidecar orchestration | Highest attr lifecycle orchestration |
Usage with HistoryProvider:
The compaction_strategy parameter accepts either a single CompactionStrategy or it can take a composed/chained strategy.
class HistoryProvider(ContextProvider):
def __init__(
self,
source_id: str,
*,
load_messages: bool = True,
store_inputs: bool = True,
store_responses: bool = True,
store_excluded_messages: bool = True, # NEW: persist excluded groups/messages or only included
# NEW: optional compaction strategy, can be a single strategy or a chained/composed strategy
compaction_strategy: CompactionStrategy | None = None,
# NEW: optional tokenizer for token-aware compaction strategies
tokenizer: TokenizerProtocol | None = None,
): ...
async def after_run(self, agent, session, context, state) -> None:
messages_to_store = self._collect_messages(context)
groups = MessageGroups.from_messages(messages_to_store, tokenizer=self.tokenizer)
if self.compaction_strategy:
await self.compaction_strategy(groups)
messages_to_store = groups.get_messages(excluded=self.store_excluded_messages)
if messages_to_store:
await self.save_messages(context.session_id, messages_to_store)Simple usage:
strategy = SlidingWindowStrategy(max_messages=100)
agent = client.create_agent(
context_providers=[
InMemoryHistoryProvider("memory", compaction_strategy=strategy),
],
)There are two ways we can do this:
-
Before writing to storage in
after_run, compaction is called on the new messages, combined with: a newcompactmethod, that reads the full history, calls the compaction strategy with the full history, then writes the compacted result back to storage (also requires aoverwriteflag on thesave_messagesmethod). This makes removing old messages from storage a explicit action that the user initiaties instead of being implicitly triggered byafter_runwrites, but it also means compaction strategies only see new messages instead of the full history (unless they read it themselves), thecompactmethod could then also have a override for the strategy to use (and/or the tokenizer in case of Variant E1/E2/F1/F2).class HistoryProvider(ContextProvider): ... async def compact(self, session_id: str, *, strategy: CompactionStrategy | None = None, tokenizer: TokenizerProtocol | None = None) -> None: history = await self.get_messages(session_id) if tokenizer: tokenize_messages(history, tokenizer=tokenizer) applicable_strategy = strategy or self.compaction_strategy await applicable_strategy(history) # compaction mutates history in place or returns new list depending on variant await self.save_messages(session_id, history, overwrite=True) # write compacted history back to storage
-
Before writing the history is loaded (could already be in-memory from
before_run), compaction is called on the full history (old + new), then the compacted result is written back to storage. This allows compaction strategies to consider the full history when deciding what to keep, but it also means the provider needs to support writing the full history back (not just appending new messages).
Given the explicit nature, and the ability to do the heavy lifting of reading, compacting and writing outside of the agent loop, we decide to go with the first setup, if we decide to use Option 1 overall.
Usage for in-run compaction (BaseChatClient):
In-run compaction should execute in BaseChatClient before every get_response call, regardless of whether function calling is enabled. This makes compaction behavior uniform for single-shot and looped invocations.
For token-aware variants (E1/E2/F1/F2), a tokenizer must be configured because token counts are part of compaction decisions. For the grouped-state path (F1), use MessageGroups.from_messages(..., tokenizer=...) so tokenization and grouping happen together before strategy invocation.
For C2/D2/E2/F2 specifically, BaseChatClient is the sole owner of compaction + _-attribute lifecycle. It should assume this work is required, annotate/refresh metadata on appended messages (including tool-result messages coming from function calling), and project included messages for model calls. The function-calling layer should not implement or duplicate any part of this mechanism.
class BaseChatClient:
# NEW attributes on the existing class
compaction_strategy: CompactionStrategy | None = None
tokenizer: TokenizerProtocol | None = None # required for token-aware variantsAgent attributes stay the same and are passed into the chat client (similar to ChatMiddleware propagation):
agent = Agent(
client=chat_client,
context_providers=[
InMemoryHistoryProvider("memory", compaction_strategy=boundary_strategy),
],
compaction_strategy=compaction_strategy,
tokenizer=model_tokenizer, # required for token-aware variants (E1/E2/F1/F2)
)
chat_client.compaction_strategy = agent.compaction_strategy
chat_client.tokenizer = agent.tokenizerExecution then lives in BaseChatClient.get_response(...):
def get_response(
self,
messages: Sequence[Message],
*,
stream: bool = False,
options: Mapping[str, Any] | None = None,
**kwargs: Any,
) -> Awaitable[ChatResponse[Any]] | ResponseStream[ChatResponseUpdate, ChatResponse[Any]]:
if not self.compaction_strategy:
return self._inner_get_response(
messages=messages,
stream=stream,
options=options or {},
**kwargs,
)
groups = MessageGroups.from_messages(
messages,
tokenizer=self.tokenizer,
)
# Compaction hook runs here and updates included/excluded state on groups.
projected = groups.included_messages()
return self._inner_get_response(
messages=projected,
stream=stream,
options=options or {},
**kwargs,
)BaseChatClient always keeps the full grouped state (included + excluded) in memory and uses only the projected included messages for model calls. Return/persistence policy is handled outside the client (e.g., HistoryProvider.store_excluded_messages).
When function calling is enabled, every model roundtrip still goes through BaseChatClient.get_response(...), so compaction runs automatically without duplicating logic in function-invocation code.
Built-in strategies:
class TruncationStrategy(CompactionStrategy):
"""Keep the last N messages, optionally preserving the system message."""
def __init__(self, *, max_messages: int, max_tokens: int, preserve_system: bool = True): ...
class SlidingWindowStrategy(CompactionStrategy):
"""Keep system message + last N messages."""
def __init__(self, *, max_messages: int, max_tokens: int): ...
class SummarizationStrategy(CompactionStrategy):
"""Summarize older messages using an LLM."""
def __init__(self, *, client: ..., max_messages_before_summary: int, max_tokens_before_summary: int): ...
# etcOpinionated token budget based composed strategy pattern (Variant F1/F2):
This ADR proposes shipping a built-in composed strategy that enforces a token budget by running a list of regular strategies from top to bottom until the conversation fits the budget. This is intentionally opinionated and serves as a practical default/inspiration; advanced users can still implement custom orchestration logic. In F1, this strategy should drive MessageGroup.excluded; in F2, it should drive message _excluded annotations so model calls project only included context while preserving the full list.
class TokenBudgetComposedStrategy(CompactionStrategy):
def __init__(
self,
*,
token_budget: int,
strategies: Sequence[CompactionStrategy],
early_stop: bool = False, # optional flag to stop after first strategy that meets the budget, or run all strategies regardless
):
self.token_budget = token_budget
self.strategies = strategies
self.early_stop = early_stop
async def __call__(self, groups: MessageGroups) -> None:
if groups.included_token_count() <= self.token_budget:
return
for strategy in self.strategies:
await strategy(groups)
if self.early_stop and groups.included_token_count() <= self.token_budget:
breakThis pattern keeps composition explicit and deterministic: ordered strategies, shared token metric, exclusion-flag semantics, optional re-inclusion by later strategies, and early stop as soon as budget is satisfied.
- Good, because the same strategy model works at the three primary compaction points (pre-write, in-run, existing storage)
- Good, because strategies are fully reusable — one instance can be shared across providers and agents
- Good, because new strategies can be added without modifying
HistoryProvider - Good, because with Variant A (in-place), the tool loop integration is zero-allocation in the no-op case
- Good, because with Variant B (return new list), the caller retains the original list for logging or fallback
- Good, because with Variants C1-F1 (grouped-state), strategy authors don't need to implement atomic group preservation — the framework handles grouping/flattening, making strategies simpler and less error-prone
- Good, because with Variants C2-F2 (message annotations), we can avoid a sidecar
MessageGroupscontainer while still preserving logical groups through_group_*attributes - Good, because it is easy to test strategies in isolation
- Good, because strategies can inspect
source_idattribution on messages for informed decisions - Good, because in-run settings can be first-class
Agentparameters and are propagated intoBaseChatClientattributes - Good, because chaining is natural — for Variants A/C1-F2, each strategy mutates the same shared state in sequence; for Variant B, output pipes into the next input
- Neutral, because Variants C1-F2 add framework complexity (grouping/flattening or annotation lifecycle, plus tokenization/exclusion accounting) but reduce strategy complexity
- Bad, because it adds a new concept (
CompactionStrategy) alongside the existingContextProvider/HistoryProviderhierarchy - Bad, because Variants C1-F1 introduce a
MessageGroupmodel that must stay in sync with any future message role changes - Bad, because Variants C2-F2 depend on careful
_-attribute lifecycle management to avoid stale or inconsistent annotations
Define compaction behavior as a mixin that HistoryProvider subclasses can opt into. The mixin adds compact() as an overridable method.
class CompactingHistoryMixin:
"""Mixin that adds compaction to a HistoryProvider."""
async def compact(self, messages: Sequence[ChatMessage]) -> list[ChatMessage]:
"""Override to implement compaction logic. Default: no-op."""
return list(messages)
class InMemoryHistoryProvider(CompactingHistoryMixin, HistoryProvider):
"""In-memory history with compaction support."""
def __init__(
self,
source_id: str,
*,
max_messages: int | None = None,
**kwargs,
):
super().__init__(source_id, **kwargs)
self.max_messages = max_messages
async def compact(self, messages: Sequence[ChatMessage]) -> list[ChatMessage]:
if self.max_messages and len(messages) > self.max_messages:
return list(messages[-self.max_messages:])
return list(messages)The base HistoryProvider checks for the mixin and calls compact() at the right points:
class HistoryProvider(ContextProvider):
async def before_run(self, agent, session, context, state) -> None:
history = await self.get_messages(context.session_id)
if isinstance(self, CompactingHistoryMixin):
history = await self.compact(history)
context.extend_messages(self.source_id, history)For in-run compaction, BaseChatClient attributes would reference the provider's compact() method, but this requires knowing which provider to use:
# Awkward: must extract compaction from a specific provider
compacting_provider = next(
(p for p in agent._context_providers if isinstance(p, CompactingHistoryMixin)),
None,
)
base_chat_client.compaction_strategy = compacting_provider # provider IS the strategyFor existing storage:
# Provider must implement CompactingHistoryMixin
provider = InMemoryHistoryProvider("memory", max_messages=100)
history = await provider.get_messages(session_id)
compacted = await provider.compact(history)
await provider.save_messages(session_id, compacted)- Good, because no new top-level concept — compaction is part of the provider
- Good, because the provider controls its own compaction logic
- Neutral, because mixins are idiomatic Python but can be harder to reason about in complex hierarchies
- Bad, because compaction strategy is coupled to the provider — cannot share the same strategy across different providers, or in-run.
- Bad, because different strategies per compaction point (pre-write vs existing) require additional configuration or separate methods
- Bad, because in-run compaction via
BaseChatClientattributes requires extracting the mixin from the provider list — unclear which one to use if multiple exist - Bad, because
isinstancechecks are fragile and don't compose well - Bad, because testing compaction requires instantiating a full provider rather than testing the strategy in isolation
- Bad, because existing storage compaction requires having the right provider type, not just any strategy
- Bad, because chaining is difficult — compaction logic is embedded in the provider's
compact()override, so composing multiple strategies (e.g., summarize then truncate) requires subclass nesting or manual delegation within a singlecompact()method, rather than declarative composition
Define compaction as a special ContextProvider subclass that the agent calls at all compaction points (pre-load, pre-write, in-run (calls compact), existing storage). It is added to the agent's context_providers list like any other provider.
class CompactionProvider(ContextProvider):
"""Context provider specialized for compaction.
Unlike regular ContextProviders, CompactionProvider is also invoked
during the function calling loop and can be used for storage maintenance.
"""
@abstractmethod
async def compact(self, messages: Sequence[ChatMessage]) -> list[ChatMessage]:
"""Reduce a list of messages."""
...
async def before_run(self, agent, session, context, state) -> None:
"""Compact messages loaded by previous providers before model invocation."""
all_messages = context.get_all_messages()
compacted = await self.compact(all_messages)
context.replace_messages(compacted)
async def after_run(self, agent, session, context, state) -> None:
"""No-op by default. Subclasses can override for pre-write behavior."""
passUsage:
agent = ChatAgent(
chat_client=client,
context_providers=[
InMemoryHistoryProvider("memory"), # Loads history
RAGContextProvider("rag"), # Adds RAG context
SlidingWindowCompaction("compaction", max_messages=100), # Compacts everything
],
)The agent recognizes CompactionProvider instances and wires compact() into BaseChatClient attributes:
class ChatAgent:
def _configure_base_chat_client(self, base_client: BaseChatClient) -> None:
compactors = [p for p in self._context_providers if isinstance(p, CompactionProvider)]
strategy = compactors[0] if compactors else None # Which one if multiple?
base_client.compaction_strategy = strategyFor existing storage, the compact() method is called directly:
compactor = SlidingWindowCompaction("compaction", max_messages=100)
history = await my_history_provider.get_messages(session_id)
compacted = await compactor.compact(history)
await my_history_provider.save_messages(session_id, compacted)- Good, because it lives within the existing
ContextProviderpipeline — no new concept - Good, because ordering relative to other providers is explicit (runs after RAG provider, etc.)
- Good, because
before_runcan compact the combined output of all prior providers (history + RAG) - Good, because the
compact()method works standalone for existing storage maintenance - Neutral, because chaining is partially supported — multiple
CompactionProviderinstances can be added to the provider list and will run in order duringbefore_run/after_run, but in-run compaction viaBaseChatClientattributes only wires a single strategy (which one to pick is ambiguous), so chaining works at boundaries but not during the tool loop - Bad, because the
CompactionProviderhas dual roles (context provider + compaction strategy), which muddies the ContextProvider contract - Bad, because
context.replace_messages()is a new operation that doesn't exist today and conflicts with the append-only design ofSessionContext - Bad, because in-run compaction still requires
isinstancechecks to wire intoBaseChatClientattributes - Bad, because ordering sensitivity is subtle — must come after storage providers but before model invocation
- Bad, because a
CompactionProvideras a context provider getsbefore_run/after_runcalls even when only itscompact()method is needed (in-run and storage maintenance)
Instead of introducing a new compaction abstraction, change ChatMiddleware so that it can replace the actual message list used by the tool loop, rather than modifying a copy. This makes the existing middleware pattern sufficient for in-run compaction.
Required changes to the tool loop:
# Inside the function invocation loop
# Current: ChatMiddleware modifies a copy, tool loop keeps its own list
# Proposed: ChatMiddleware can replace the list, tool loop uses the replacement
for attempt_idx in range(max_iterations):
context = ChatContext(messages=messages)
response = await middleware_pipeline.process(context)
# NEW: if middleware replaced messages, use the replacement
messages = context.messages # May be a new, compacted list
messages.extend(tool_results)Usage:
@chat_middleware
async def compacting_middleware(context: ChatContext, next):
if count_tokens(context.messages) > budget:
compacted = compact(context.messages)
context.messages.clear()
context.messages.extend(compacted) # Persists because tool loop reads back
await next(context)
agent = chat_client.create_agent(
middleware=[compacting_middleware],
)For boundary compaction, the same middleware runs at the chat client level. For existing storage compaction, a standalone utility function is needed since middleware only runs during agent.run().
- Good, because it uses the existing
ChatMiddlewarepattern — no new compaction concept - Good, because middleware already runs between LLM calls in the tool loop — it just needs the mutations to stick
- Good, because users familiar with middleware get compaction "for free"
- Neutral, because chaining is implicit — multiple compaction middleware can be stacked and will run in pipeline order, but there is no explicit composition model; middleware interact through side effects (mutating the shared message list) rather than declarative input/output, making chain behavior harder to reason about and debug
- Bad, because it requires changing how the tool loop manages messages — the current copy-based architecture must be rethought
- Bad, because multiple middleware could conflict when replacing messages (no coordination)
- Bad, because it does not cover existing storage compaction
- Bad, because it does not cover pre-write compaction —
ChatMiddlewareruns before the LLM call, not afterContextProvider.after_run() - Bad, because message replacement semantics in middleware are implicit (mutating a list) rather than explicit (returning a new list)
- Bad, because it requires significant internal refactoring of the copy-based message flow in the function invocation layer
Chosen option: Option 1: Standalone CompactionStrategy Object with F2 (_-annotated messages) as the primary implementation model. We still document F1 as a valid alternative, but F2 is preferred because it introduces one less concept (no sidecar MessageGroups container), aligns with BaseChatClient statelessness by carrying state on messages themselves, and allows in-run compaction to stay localized to BaseChatClient rather than requiring extra grouped-state ownership in the function-calling loop.
The .NET SDK uses IChatReducer composed into InMemoryChatHistoryProvider:
| Aspect | .NET | Proposed Options |
|---|---|---|
| Interface | IChatReducer with ReduceAsync(messages) -> messages |
CompactionStrategy.compact() with three signature variants (Options 1-3) / ChatMiddleware mutation (Option 4) |
| Attachment | Property on InMemoryChatHistoryProvider |
Composed into HistoryProvider (Option 1) / mixin (Option 2) / separate provider (Option 3) / middleware (Option 4) |
| Trigger | ChatReducerTriggerEvent enum: AfterMessageAdded, BeforeMessagesRetrieval |
Pre-write + in-run + storage maintenance (Options 1-3 primary scope); post-load-style behavior can be covered by in-run pre-send projection |
| Scope | Only within InMemoryChatHistoryProvider |
Applicable to any HistoryProvider and the tool loop (Option 1) |
Option 1's CompactionStrategy is the closest equivalent to .NET's IChatReducer, with a broader scope.
| Python scenario | .NET/MEAI mechanism | How it maps |
|---|---|---|
| Pre-write compaction | InMemoryChatHistoryProvider + ChatReducerTriggerEvent.AfterMessageAdded |
Reducer runs in StoreChatHistoryAsync after new request/response messages are added to storage (closest equivalent to pre-write persistence compaction). |
| Agent-level whole-list compaction (pre-send overlap with post-load) | ChatClientAgent message assembly + chat-client decoration via clientFactory / ChatClientAgentRunOptions.ChatClientFactory |
ChatClientAgent builds the full invocation message list (ChatHistoryProvider + AIContextProviders + input). A delegating IChatClient can compact that assembled list immediately before forwarding GetResponseAsync. |
In-run compaction before every get_response call |
Base chat-client layer + delegating IChatClient wrapper |
Compaction is executed in the base chat client before every GetResponseAsync call, so both single-shot and function-calling roundtrips get the same behavior. |
Variant C1 grouped-state maintenance (MessageGroup) |
Keep grouped state in the same function-invocation/delegating-chat-client layer | Maintain and update grouped state across loop iterations in that layer, then flatten only for model calls. |
Variant C2 message-annotation maintenance (_group_*) |
Keep message annotations in the same function-invocation/delegating-chat-client layer | Incrementally annotate newly appended messages with _group_id, _group_kind, and related metadata; filter/project directly from annotated message lists. |
| Compaction on existing storage | InMemoryChatHistoryProvider.GetMessages(...) + SetMessages(...) (or custom provider equivalent) |
Read stored history, apply reducer/strategy, and write back compacted history as a maintenance operation. |
How each option addresses the three primary compaction points and the current architectural limitations:
| Compaction Point | Option 1 (Strategy) | Option 2 (Mixin) | Option 3 (Provider) | Option 4 (Middleware) |
|---|---|---|---|---|
| Pre-write | ✅ HistoryProvider param |
after_run override |
❌ Not supported | |
| In-run (tool loop) | ✅ BaseChatClient attrs |
isinstance wiring |
||
| Existing storage | ✅ Standalone compact() |
✅ Provider's compact() |
✅ Standalone compact() |
❌ Not supported |
| Solves copy problem | ✅ Runs inside loop | |||
| Chaining | ✅ Natural composition via wrapper | ❌ Coupled to provider | ||
| New concepts | 1 (CompactionStrategy) |
1 (mixin) | 0.5 (reuses ContextProvider, but adds new method) |
0 (reuses ChatMiddleware) |
A compaction strategy takes a list of messages and returns a (potentially shorter) list, in almost all cases, there is certain logic that needs to be applied universally, such as retaining system messages, not breaking up function call and result pairs (for Responses that includes Reasoning as well, see context section above for more info) as tool calls, etc. Beyond that, strategies can be as simple or complex as needed:
- Truncation: Keep only the last N messages or N tokens, this is a likely done as a kind of zigzag, where the history grows, then get's truncated to some value below the token limit, then grows again, etc. This can be done on a simple message count basis, a character count basis, or more complex token counting basis.
- Summarization: Replace older messages with an LLM-generated summary (depending on the implementation this could be done, by replacing the summarized messages, or by inserting a summary message in between and not loading messages older then the summarized ones)
- Selective removal: Remove tool call/result pairs while keeping user/assistant turns
- Sliding window with anchor: Keep system message + last N messages
- Custom logic: The design should be extendible so that users can implement their own strategies.
ADR-0016 introduces source_id attribution on messages — each message tracks which ContextProvider added it. Compaction strategies can use this attribution to make informed decisions about what to compact and what to preserve:
- Preserve RAG context: Messages from a RAG provider (e.g.
source_id: "rag") may be critical and should survive compaction - Remove ephemeral context: Messages marked as ephemeral (e.g.,
source_id: "time") can be safely removed - Protect user input: Messages without a
source_id(direct user input) should typically be preserved - Selective tool result compaction: Tool results from specific providers can be summarized while others are kept verbatim
This means strategies don't need to rely solely on message position or role — they can make semantically meaningful compaction decisions based on the origin of each message.
Running compaction after every tool call is wasteful — most iterations the context is well within limits. Instead, compaction should only trigger when a threshold is exceeded. There are several approaches to consider:
-
Message count threshold: Trigger when the message list exceeds N messages. Simple to implement and predictable, but message count is a poor proxy for token usage — a single tool result can contain thousands of tokens while counting as one message.
-
Character/token count threshold: Trigger when the estimated token count exceeds a budget. More accurate but requires a token counting mechanism (exact tokenization is model-specific and expensive; character-based heuristics like
len(text) / 4are fast but approximate). -
Iteration-based: Trigger every N tool loop iterations (e.g., every 10th iteration). Predictable cadence but doesn't account for actual context growth — 10 iterations with small results may not need compaction while 3 iterations with large results might.
-
Strategy-internal: Let the
CompactionStrategy.compact()method decide internally — it receives the full message list and can return it unchanged if no compaction is needed. This is the simplest integration point (always callcompact(), let the strategy no-op when appropriate) but has the overhead of calling into the strategy every iteration.
The recommended approach is strategy-internal with a lightweight guard: the compact() method is called after each tool result, but strategy implementations should include a fast short-circuit check (e.g., if len(messages) < self.threshold: return False) to minimize overhead when compaction is not needed. This keeps the tool loop simple (always call compact()) while letting each strategy define its own trigger logic.
The following example illustrates this for Variant A (in-place flat list). See Variant C1/C2 under Option 1 for group-aware equivalents.
class SlidingWindowStrategy(CompactionStrategy):
"""Example with built-in trigger logic and atomic group preservation (Variant A)."""
def __init__(self, max_messages: int, *, compact_to: int | None = None):
self.max_messages = max_messages
self.compact_to = compact_to or max_messages // 2
async def compact(self, messages: list[ChatMessage]) -> bool:
# Fast short-circuit: no-op if under threshold
if len(messages) <= self.max_messages:
return False
# Partition into anchors (system messages) and the rest
anchors: list[ChatMessage] = []
rest: list[ChatMessage] = []
for m in messages:
(anchors if m.role == "system" else rest).append(m)
# Group into atomic units: [assistant w/ tool_calls + tool results]
# count as one group; standalone messages are their own group
groups: list[list[ChatMessage]] = []
i = 0
while i < len(rest):
msg = rest[i]
if msg.role == "assistant" and getattr(msg, "tool_calls", None):
# Collect this assistant message + all following tool results
group = [msg]
i += 1
while i < len(rest) and rest[i].role == "tool":
group.append(rest[i])
i += 1
groups.append(group)
else:
groups.append([msg])
i += 1
# Keep the last N groups (by message count) that fit within compact_to
kept: list[ChatMessage] = []
count = 0
for group in reversed(groups):
if count + len(group) > self.compact_to:
break
kept = group + kept
count += len(group)
# Mutate in place
messages.clear()
messages.extend(anchors + kept)
return TrueGiven a situation where a compaction strategy is known, the following would need to happen:
- At that moment in the run, the message list is passed to the strategy's
compact()method, which returns whether compaction occurred (and depending on the variant, either mutates in place or returns a new list). - The caller continues with the (potentially reduced) list for the next steps (sending to the model, saving to storage, or continuing the tool loop with the reduced context)
- We need to decide how to handle a failed compaction (e.g., the strategy raises an exception) — likely we should have a fallback to continue without compaction rather than failing the entire agent run.
ADR-0016's HistoryProvider.save_messages() is an append operation — after_run collects the new messages from the current invocation and appends them to storage. There is no built-in way to replace the full stored history with a compacted version.
For compaction on existing storage (and pre-write compaction that rewrites history), we need a way to overwrite rather than append. Two options:
- Add a
replace_messages()method toHistoryProvider:
class HistoryProvider(ContextProvider):
@abstractmethod
async def save_messages(self, session_id: str | None, messages: Sequence[ChatMessage]) -> None:
"""Append messages to storage for this session."""
...
async def replace_messages(self, session_id: str | None, messages: Sequence[ChatMessage]) -> None:
"""Replace all stored messages for this session. Used for compaction.
Default implementation raises NotImplementedError. Providers that support
compaction on existing storage must override this method.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support replace_messages. "
"Override this method to enable storage compaction."
)- Add a
overwriteparameter tosave_messages():
class HistoryProvider(ContextProvider):
@abstractmethod
async def save_messages(
self,
session_id: str | None,
messages: Sequence[ChatMessage],
*,
overwrite: bool = False,
) -> None:
"""Persist messages for this session.
Args:
overwrite: If True, replace all existing messages instead of appending.
Used for compaction workflows.
"""
...Either approach enables the compaction-on-existing-storage workflow:
history = await provider.get_messages(session_id)
compacted = await strategy.compact(history)
await provider.replace_messages(session_id, compacted) # Option 1
# or
await provider.save_messages(session_id, compacted, overwrite=True) # Option 2This could then be combined with a convenience method on the provider for compaction:
class HistoryProvider:
compaction_strategy: CompactionStrategy | None = None # Optional default strategy for this provider
async def compact_storage(self, session_id: str | None, *, strategy: CompactionStrategy | None = None) -> None:
"""Compact stored history for this session using the given strategy."""
history = await self.get_messages(session_id)
used_strategy = strategy or self._get_strategy("existing") or self._get_strategy("pre_write")
if used_strategy is None:
raise ValueError("No compaction strategy configured for existing storage.")
await used_strategy.compact(history)
await self.replace_messages(session_id, history) # or save_messages with overwrite
# or
await self.save_messages(session_id, history, overwrite=True)This design choice is orthogonal to the compaction strategy options below — any option requires one of these HistoryProvider extensions and optionally the convenience method.
The source_id attribution system from ADR-0016 enables intelligent compaction:
class AttributionAwareStrategy(CompactionStrategy):
"""Example: remove ephemeral context but preserve RAG and user messages."""
async def compact(self, messages: list[ChatMessage]) -> bool:
ephemeral = [m for m in messages if m.additional_properties.get("source_id") == "ephemeral"]
if not ephemeral:
return False
for msg in ephemeral:
messages.remove(msg)
return True- ADR-0016: Unifying Context Management with ContextPlugin — Parent ADR that established
ContextProvider,HistoryProvider, andAgentSessionarchitecture. - Context Compaction Limitations Analysis — Detailed analysis of why current architecture cannot support in-run compaction, with attempted solutions and their failure modes. Option 4 in this ADR corresponds to "Option A: Middleware Access to Mutable Message Source" from that analysis; Options 1-3 correspond to "Option B: Tool Loop Hook", adapted here to a
BaseChatClienthook instead ofFunctionInvocationConfiguration.
Implementation is split into two phases:
- Phase 1 (PR 1): runtime compaction foundation in
agent_framework/_compaction.py, in-run integration, and extensive core tests, plus in-run compaction samples (basics,advanced,custom). - Phase 2 (PR 2): history/storage compaction (
upsert-based full replacement), provider support, storage tests, and storage-focused sample (storage).