Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions codex-rs/codex-mcp/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,23 @@ impl McpConnectionManager {
normalize_tools_for_model_with_prefix(tools, self.prefix_mcp_tool_names)
}

/// Returns servers whose tool inventory is not yet available without waiting.
pub fn pending_server_names_without_cached_tool_info_snapshot(&self) -> Vec<String> {
let mut pending = self
.clients
.iter()
.filter(|(_, managed_client)| {
managed_client.cached_tool_info_snapshot.is_none()
&& !managed_client
.startup_complete
.load(std::sync::atomic::Ordering::Acquire)
})
.map(|(server_name, _)| server_name.clone())
.collect::<Vec<_>>();
pending.sort();
pending
}

/// Force-refresh codex apps tools by bypassing the in-process cache.
///
/// On success, the refreshed tools replace the cache contents and the
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/codex-mcp/src/connection_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,10 @@ async fn list_ready_or_cached_tools_skips_pending_client_without_cached_tool_inf
},
);

assert_eq!(
manager.pending_server_names_without_cached_tool_info_snapshot(),
vec!["optional".to_string()]
);
let tools = tokio::time::timeout(
Duration::from_millis(/*millis*/ 10),
manager.list_ready_or_cached_tools(),
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ pub(crate) use self::session::SessionSettingsUpdate;
use self::turn::AssistantMessageStreamParsers;
#[cfg(test)]
use self::turn::collect_explicit_app_ids_from_skill_items;
#[cfg(test)]
use self::turn::filter_discoverable_tools_while_apps_inventory_pending;
use self::turn::realtime_text_for_event;
use self::turn_context::TurnContext;
use self::turn_context::TurnSkillsContext;
Expand Down
33 changes: 32 additions & 1 deletion codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,37 @@ fn collect_explicit_app_ids_from_skill_items_skips_plain_mentions_with_skill_con
assert_eq!(connector_ids, HashSet::<String>::new());
}

#[test]
fn pending_apps_inventory_omits_connector_install_suggestions_but_keeps_plugins() {
let tools = vec![
codex_tools::DiscoverableTool::Connector(Box::new(make_connector("calendar", "Calendar"))),
codex_tools::DiscoverableTool::Plugin(Box::new(codex_tools::DiscoverablePluginInfo {
id: "docs".to_string(),
name: "Docs".to_string(),
description: None,
has_skills: false,
mcp_server_names: Vec::new(),
app_connector_ids: Vec::new(),
})),
];

assert_eq!(
filter_discoverable_tools_while_apps_inventory_pending(
tools, /*apps_inventory_pending*/ true
),
vec![codex_tools::DiscoverableTool::Plugin(Box::new(
codex_tools::DiscoverablePluginInfo {
id: "docs".to_string(),
name: "Docs".to_string(),
description: None,
has_skills: false,
mcp_server_names: Vec::new(),
app_connector_ids: Vec::new(),
}
))]
);
}

#[tokio::test]
async fn reconstruct_history_matches_live_compactions() {
let (session, turn_context) = make_session_and_context().await;
Expand Down Expand Up @@ -9737,8 +9768,8 @@ async fn fatal_tool_error_stops_turn_and_reports_error() {
&turn_context,
crate::tools::router::ToolRouterParams {
deferred_mcp_tools,
lazy_mcp_tools: None,
mcp_tools: Some(tools),
lazy_mcp_tools: None,
discoverable_tools: None,
extension_tool_executors: Vec::new(),
dynamic_tools: turn_context.dynamic_tools.as_slice(),
Expand Down
158 changes: 147 additions & 11 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use crate::stream_events_utils::record_completed_response_item_with_finalized_fa
use crate::tasks::emit_compact_metric;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::handlers::LazyMcpToolSearchLoader;
use crate::tools::handlers::LazyMcpToolSearchLoaders;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::registry::ToolArgumentDiffConsumer;
use crate::tools::router::ToolRouterParams;
Expand Down Expand Up @@ -97,6 +99,7 @@ use codex_protocol::protocol::ReasoningRawContentDeltaEvent;
use codex_protocol::protocol::TurnDiffEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_tools::DiscoverableTool;
use codex_tools::ToolName;
use codex_tools::filter_request_plugin_install_discoverable_tools_for_client;
use codex_utils_stream_parser::AssistantTextChunk;
Expand Down Expand Up @@ -1177,7 +1180,7 @@ async fn run_sampling_request(

#[expect(
clippy::await_holding_invalid_type,
reason = "ready-or-cached tool inspection never awaits unresolved MCP startup"
reason = "MCP tool inspection holds the session-owned manager guard while listing tools"
)]
#[instrument(level = "trace",
skip_all,
Expand All @@ -1199,10 +1202,28 @@ pub(crate) async fn built_tools(
.instrument(trace_span!("read_mcp_connection_manager"))
.await;
let has_mcp_servers = mcp_connection_manager.has_servers();
let all_mcp_tools = mcp_connection_manager
.list_ready_or_cached_tools()
.or_cancel(cancellation_token)
.await?;
let lazy_mcp_tool_search_available =
search_tool_enabled(turn_context) && turn_context.provider.capabilities().namespace_tools;
let (all_mcp_tools, mut pending_mcp_server_names) = if lazy_mcp_tool_search_available {
let pending_mcp_server_names =
mcp_connection_manager.pending_server_names_without_cached_tool_info_snapshot();
let tools = mcp_connection_manager
.list_ready_or_cached_tools()
.or_cancel(cancellation_token)
.await?;
(tools, pending_mcp_server_names)
} else {
let tools = mcp_connection_manager
.list_all_tools()
.or_cancel(cancellation_token)
.await?;
(tools, Vec::new())
};
pending_mcp_server_names.retain(|server_name| {
!all_mcp_tools
.iter()
.any(|tool| tool.server_name == *server_name)
});
drop(mcp_connection_manager);
let loaded_plugins = sess
.services
Expand All @@ -1211,6 +1232,11 @@ pub(crate) async fn built_tools(
.await;

let apps_enabled = turn_context.apps_enabled();
let effective_plugin_connector_ids = loaded_plugins
.effective_apps()
.into_iter()
.map(|connector_id| connector_id.0)
.collect::<Vec<_>>();
let accessible_connectors =
apps_enabled.then(|| connectors::accessible_connectors_from_mcp_tools(&all_mcp_tools));
let accessible_connectors_with_enabled_state =
Expand All @@ -1219,10 +1245,7 @@ pub(crate) async fn built_tools(
});
let connectors = if apps_enabled {
let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible(
loaded_plugins
.effective_apps()
.into_iter()
.map(|connector_id| connector_id.0),
effective_plugin_connector_ids.iter().cloned(),
accessible_connectors.clone().unwrap_or_default(),
);
Some(connectors::with_app_enabled_state(
Expand All @@ -1232,6 +1255,41 @@ pub(crate) async fn built_tools(
} else {
None
};
let apps_inventory_pending = pending_mcp_server_names
.iter()
.any(|server_name| server_name == CODEX_APPS_MCP_SERVER_NAME);
// Keep the tool_search schema stable as background startup finishes.
let lazy_mcp_tools: Option<LazyMcpToolSearchLoaders> =
(lazy_mcp_tool_search_available && has_mcp_servers).then(|| {
let mcp_connection_manager = Arc::clone(&sess.services.mcp_connection_manager);
let config = Arc::clone(&turn_context.config);
let effective_plugin_connector_ids = effective_plugin_connector_ids.clone();
let available = {
let mcp_connection_manager = Arc::clone(&mcp_connection_manager);
let config = Arc::clone(&config);
let effective_plugin_connector_ids = effective_plugin_connector_ids.clone();
Arc::new(move || {
load_available_mcp_tools_for_search(
Arc::clone(&mcp_connection_manager),
apps_enabled,
effective_plugin_connector_ids.clone(),
Arc::clone(&config),
)
.boxed()
}) as LazyMcpToolSearchLoader
};
let pending = Arc::new(move || {
load_pending_mcp_tools_for_search(
Arc::clone(&mcp_connection_manager),
pending_mcp_server_names.clone(),
apps_enabled,
effective_plugin_connector_ids.clone(),
Arc::clone(&config),
)
.boxed()
}) as LazyMcpToolSearchLoader;
LazyMcpToolSearchLoaders { available, pending }
});
let auth = sess.services.auth_manager.auth().await;
let loaded_plugin_app_connector_ids = loaded_plugins
.effective_apps()
Expand All @@ -1248,6 +1306,10 @@ pub(crate) async fn built_tools(
)
.await
.map(|discoverable_tools| {
let discoverable_tools = filter_discoverable_tools_while_apps_inventory_pending(
discoverable_tools,
apps_inventory_pending,
);
filter_request_plugin_install_discoverable_tools_for_client(
discoverable_tools,
turn_context.app_server_client_name.as_deref(),
Expand All @@ -1271,7 +1333,7 @@ pub(crate) async fn built_tools(
&all_mcp_tools,
connectors.as_deref(),
&turn_context.config,
search_tool_enabled(turn_context),
lazy_mcp_tool_search_available,
);
let mcp_tools = has_mcp_servers.then_some(mcp_tool_exposure.direct_tools);
let deferred_mcp_tools = mcp_tool_exposure.deferred_tools;
Expand All @@ -1280,14 +1342,88 @@ pub(crate) async fn built_tools(
ToolRouterParams {
mcp_tools,
deferred_mcp_tools,
lazy_mcp_tools: None,
lazy_mcp_tools,
discoverable_tools,
extension_tool_executors: extension_tool_executors(sess),
dynamic_tools: turn_context.dynamic_tools.as_slice(),
},
)))
}

pub(super) fn filter_discoverable_tools_while_apps_inventory_pending(
discoverable_tools: Vec<DiscoverableTool>,
apps_inventory_pending: bool,
) -> Vec<DiscoverableTool> {
if !apps_inventory_pending {
return discoverable_tools;
}

// Connector installability depends on current Apps accessibility, which is
// unknown until codex_apps startup completes. Plugin suggestions are
// independent of that pending inventory.
discoverable_tools
.into_iter()
.filter(|tool| matches!(tool, DiscoverableTool::Plugin(_)))
.collect()
}

async fn load_pending_mcp_tools_for_search(
mcp_connection_manager: Arc<tokio::sync::RwLock<codex_mcp::McpConnectionManager>>,
pending_mcp_server_names: Vec<String>,
apps_enabled: bool,
effective_plugin_connector_ids: Vec<String>,
config: Arc<crate::config::Config>,
) -> Vec<codex_mcp::ToolInfo> {
let readiness = {
let manager = mcp_connection_manager.read().await;
manager.wait_for_servers_ready(&pending_mcp_server_names)
};
let _failures = readiness.await;
load_available_mcp_tools_for_search(
mcp_connection_manager,
apps_enabled,
effective_plugin_connector_ids,
config,
)
.await
}

#[expect(
clippy::await_holding_invalid_type,
reason = "post-readiness tool inspection only reads available MCP inventory"
)]
async fn load_available_mcp_tools_for_search(
mcp_connection_manager: Arc<tokio::sync::RwLock<codex_mcp::McpConnectionManager>>,
apps_enabled: bool,
effective_plugin_connector_ids: Vec<String>,
config: Arc<crate::config::Config>,
) -> Vec<codex_mcp::ToolInfo> {
// Names are normalized across the entire visible MCP inventory. Rebuild
// all searchable MCP entries together so a newly introduced collision
// cannot invalidate a previously returned name.
let available_mcp_tools = mcp_connection_manager
.read()
.await
.list_ready_or_cached_tools()
.await;
let connectors = apps_enabled.then(|| {
let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible(
effective_plugin_connector_ids,
connectors::accessible_connectors_from_mcp_tools(&available_mcp_tools),
);
connectors::with_app_enabled_state(connectors, config.as_ref())
});
let mcp_tool_exposure = build_mcp_tool_exposure(
&available_mcp_tools,
connectors.as_deref(),
config.as_ref(),
/*search_tool_enabled*/ true,
);
let mut searchable_tools = mcp_tool_exposure.direct_tools;
searchable_tools.extend(mcp_tool_exposure.deferred_tools.unwrap_or_default());
searchable_tools
}

#[derive(Debug)]
struct SamplingRequestResult {
needs_follow_up: bool,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/tools/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub use request_user_input::RequestUserInputHandler;
pub use shell::ShellCommandHandler;
pub(crate) use shell::ShellCommandHandlerOptions;
pub use test_sync::TestSyncHandler;
pub(crate) use tool_search::LazyMcpToolSearchLoader;
pub(crate) use tool_search::LazyMcpToolSearchLoaders;
pub use tool_search::ToolSearchHandler;
pub use unified_exec::ExecCommandHandler;
Expand Down
9 changes: 8 additions & 1 deletion codex-rs/core/tests/common/apps_test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ impl AppsTestServer {
}

pub async fn mount_searchable(server: &MockServer) -> Result<Self> {
Self::mount_searchable_with_tools_list_delay(server, /*tools_list_delay*/ None).await
}

pub async fn mount_searchable_with_tools_list_delay(
server: &MockServer,
tools_list_delay: Option<Duration>,
) -> Result<Self> {
mount_oauth_metadata(server).await;
mount_connectors_directory(server).await;
mount_streamable_http_json_rpc(
Expand All @@ -74,7 +81,7 @@ impl AppsTestServer {
CONNECTOR_DESCRIPTION.to_string(),
/*searchable*/ true,
/*include_app_only_tool*/ false,
/*tools_list_delay*/ None,
tools_list_delay,
)
.await;
Ok(Self {
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/suite/openai_file_mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn codex_apps_file_params_upload_local_paths_before_mcp_tool_call() -> Res
tokio::fs::write(test.cwd.path().join("report.txt"), b"hello world").await?;

test.submit_turn_with_approval_and_permission_profile(
"Extract the report text with the app tool.",
"Use [$calendar](app://calendar) to extract the report text.",
AskForApproval::Never,
PermissionProfile::Disabled,
)
Expand Down
4 changes: 3 additions & 1 deletion codex-rs/core/tests/suite/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ async fn selected_skill_rewaits_for_app_after_installing_mcp_dependency() -> Res
Some(Duration::from_secs(/*secs*/ 2)),
)
.await?;
let dependency_server = start_mock_server().await;
AppsTestServer::mount_with_connector_name(&dependency_server, "Dependency").await?;
let dependency_url = format!("{}/api/codex/apps", dependency_server.uri());
let mock = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
Expand All @@ -516,7 +519,6 @@ async fn selected_skill_rewaits_for_app_after_installing_mcp_dependency() -> Res
.expect("write plugin app skill");
let skill_agents_dir = skill_path.parent().expect("skill dir").join("agents");
std::fs::create_dir_all(&skill_agents_dir).expect("create skill agents dir");
let dependency_url = format!("{}/api/codex/apps", server.uri());
std::fs::write(
skill_agents_dir.join("openai.yaml"),
format!(
Expand Down
Loading
Loading