Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationStartTransport;
use codex_protocol::protocol::ConversationTextParams;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkedHistory;
#[cfg(test)]
use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::InitialHistory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use codex_external_agent_sessions::PendingSessionImport;
use codex_external_agent_sessions::prepare_validated_session_imports;
use codex_external_agent_sessions::record_imported_session;
use codex_protocol::ThreadId;
use codex_protocol::protocol::ForkedHistory;
use codex_protocol::protocol::InitialHistory;
use codex_thread_store::ThreadMetadataPatch;
use std::collections::HashSet;
Expand Down Expand Up @@ -305,7 +306,10 @@ impl ExternalAgentConfigRequestProcessor {
.thread_manager
.start_thread_with_options(StartThreadOptions {
config,
initial_history: InitialHistory::Forked(rollout_items),
initial_history: InitialHistory::Forked(ForkedHistory {
source_thread_id: None,
history: rollout_items,
}),
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
Expand Down
14 changes: 8 additions & 6 deletions codex-rs/app-server/src/request_processors/thread_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2501,7 +2501,7 @@ impl ThreadRequestProcessor {
let include_turns = !exclude_turns;

let (thread_history, resume_source_thread) = match if let Some(history) = history {
self.resume_thread_from_history(history.as_slice())
self.resume_thread_from_history(&thread_id, history.as_slice())
.await
.map(|thread_history| (thread_history, None))
} else {
Expand Down Expand Up @@ -2928,18 +2928,20 @@ impl ThreadRequestProcessor {

async fn resume_thread_from_history(
&self,
thread_id: &str,
history: &[ResponseItem],
) -> Result<InitialHistory, JSONRPCErrorError> {
if history.is_empty() {
return Err(invalid_request("history must not be empty"));
}
Ok(InitialHistory::Forked(
history
Ok(InitialHistory::Forked(ForkedHistory {
source_thread_id: ThreadId::from_string(thread_id).ok(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not reuse ignored resume id as fork source

When thread/resume supplies history, the v2 contract says history takes precedence and thread_id is ignored (app-server-protocol/src/protocol/v2/thread.rs lines 321-323). Copying that parameter into ForkedHistory means any parseable placeholder or unrelated id is persisted and returned as thread.forkedFromId; clients such as the TUI then treat it as real fork lineage and immediately issue thread/read for that parent while mapping the resume response. This makes history-based resumes expose bogus parent metadata and can trigger spurious parent lookups unless the source is carried in a separate validated field or left None.

Useful? React with 👍 / 👎.

history: history
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect(),
))
}))
}

async fn resume_thread_from_rollout(
Expand Down Expand Up @@ -3119,14 +3121,14 @@ impl ThreadRequestProcessor {
}
}
}
InitialHistory::Forked(items) => {
InitialHistory::Forked(forked) => {
let mut thread = build_thread_from_snapshot(
thread_id,
session_id.clone(),
&config_snapshot,
Some(rollout_path.into()),
);
thread.preview = preview_from_rollout_items(items);
thread.preview = preview_from_rollout_items(&forked.history);
Ok(thread)
}
InitialHistory::New | InitialHistory::Cleared => Err(format!(
Expand Down
6 changes: 5 additions & 1 deletion codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ContentItem;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ForkedHistory;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
Expand Down Expand Up @@ -489,7 +490,10 @@ impl AgentControl {
state
.fork_thread_with_source(
config.clone(),
InitialHistory::Forked(forked_rollout_items),
InitialHistory::Forked(ForkedHistory {
source_thread_id: Some(parent_thread_id),
history: forked_rollout_items,
}),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
Expand Down
8 changes: 6 additions & 2 deletions codex-rs/core/src/guardian/review_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkedHistory;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
Expand Down Expand Up @@ -223,7 +224,10 @@ impl GuardianReviewSession {
let prior_review_count = state.prior_review_count;
let last_reviewed_transcript_cursor = state.last_reviewed_transcript_cursor;
state.last_committed_fork_snapshot = Some(GuardianReviewForkSnapshot {
initial_history: InitialHistory::Forked(items),
initial_history: InitialHistory::Forked(ForkedHistory {
source_thread_id: Some(self.codex.session.conversation_id),
history: items,
}),
prior_review_count,
last_reviewed_transcript_cursor,
});
Expand Down Expand Up @@ -476,7 +480,7 @@ impl GuardianReviewSessionManager {
let state = trunk.state.lock().await;
let snapshot = state.last_committed_fork_snapshot.as_ref()?;
match &snapshot.initial_history {
InitialHistory::Forked(items) => Some(items.clone()),
InitialHistory::Forked(forked) => Some(forked.history.clone()),
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Resumed(_) => None,
}
}
Expand Down
10 changes: 5 additions & 5 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,20 +1230,20 @@ impl Session {
let _ = self.flush_rollout().await;
}
}
InitialHistory::Forked(rollout_items) => {
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
InitialHistory::Forked(forked) => {
self.apply_rollout_reconstruction(&turn_context, &forked.history)
.await;

// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
if let Some(info) = Self::last_token_info_from_rollout(&forked.history) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}

// If persisting, persist all rollout items as-is (the store filters).
if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
if !forked.history.is_empty() {
self.persist_rollout_items(&forked.history).await;
}

// Forked threads should remain file-backed immediately after startup.
Expand Down
11 changes: 9 additions & 2 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::ForkedHistory;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
Expand Down Expand Up @@ -2256,7 +2257,10 @@ async fn record_initial_history_reconstructs_forked_transcript() {
let (rollout_items, expected) = sample_rollout(&session, &turn_context).await;

session
.record_initial_history(InitialHistory::Forked(rollout_items))
.record_initial_history(InitialHistory::Forked(ForkedHistory {
source_thread_id: None,
history: rollout_items,
}))
.await;

let history = session.state.lock().await.clone_history();
Expand Down Expand Up @@ -2513,7 +2517,10 @@ async fn record_initial_history_forked_hydrates_previous_turn_settings() {
];

session
.record_initial_history(InitialHistory::Forked(rollout_items))
.record_initial_history(InitialHistory::Forked(ForkedHistory {
source_thread_id: None,
history: rollout_items,
}))
.await;

let history = session.clone_history().await;
Expand Down
30 changes: 22 additions & 8 deletions codex-rs/core/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use codex_protocol::error::Result as CodexResult;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ForkedHistory;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ResumedHistory;
Expand Down Expand Up @@ -1439,6 +1440,7 @@ fn truncate_before_nth_user_message(
n: usize,
snapshot_state: &SnapshotTurnState,
) -> InitialHistory {
let source_thread_id = history.source_thread_id();
let items: Vec<RolloutItem> = history.get_rollout_items();
let user_positions = truncation::user_message_positions_in_rollout(&items);
let rolled = if snapshot_state.ends_mid_turn && n >= user_positions.len() {
Expand All @@ -1457,7 +1459,10 @@ fn truncate_before_nth_user_message(
if rolled.is_empty() {
InitialHistory::New
} else {
InitialHistory::Forked(rolled)
InitialHistory::Forked(ForkedHistory {
source_thread_id,
history: rolled,
})
}
}

Expand Down Expand Up @@ -1536,7 +1541,10 @@ fn fork_history_from_snapshot(
InitialHistory::New => InitialHistory::New,
InitialHistory::Cleared => InitialHistory::Cleared,
InitialHistory::Forked(history) => InitialHistory::Forked(history),
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
InitialHistory::Resumed(resumed) => InitialHistory::Forked(ForkedHistory {
source_thread_id: Some(resumed.conversation_id),
history: resumed.history,
}),
};
if snapshot_state.ends_mid_turn {
append_interrupted_boundary(
Expand Down Expand Up @@ -1573,21 +1581,27 @@ fn append_interrupted_boundary(
history.push(RolloutItem::ResponseItem(marker));
}
history.push(aborted_event);
InitialHistory::Forked(history)
InitialHistory::Forked(ForkedHistory {
source_thread_id: None,
history,
})
}
InitialHistory::Forked(mut history) => {
InitialHistory::Forked(mut forked) => {
if let Some(marker) = interrupted_turn_history_marker(interrupted_marker) {
history.push(RolloutItem::ResponseItem(marker));
forked.history.push(RolloutItem::ResponseItem(marker));
}
history.push(aborted_event);
InitialHistory::Forked(history)
forked.history.push(aborted_event);
InitialHistory::Forked(forked)
}
InitialHistory::Resumed(mut resumed) => {
if let Some(marker) = interrupted_turn_history_marker(interrupted_marker) {
resumed.history.push(RolloutItem::ResponseItem(marker));
}
resumed.history.push(aborted_event);
InitialHistory::Forked(resumed.history)
InitialHistory::Forked(ForkedHistory {
source_thread_id: Some(resumed.conversation_id),
history: resumed.history,
})
}
}
}
Expand Down
Loading
Loading