Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ fn sample_turn_start_response(turn_id: &str) -> ClientResponsePayload {
started_at: None,
completed_at: None,
duration_ms: None,
attribution: None,
},
})
}
Expand All @@ -315,6 +316,7 @@ fn sample_turn_started_notification(thread_id: &str, turn_id: &str) -> ServerNot
started_at: Some(455),
completed_at: None,
duration_ms: None,
attribution: None,
},
})
}
Expand Down Expand Up @@ -354,6 +356,7 @@ fn sample_turn_completed_notification(
started_at: None,
completed_at: Some(456),
duration_ms: Some(1234),
attribution: None,
},
})
}
Expand Down
189 changes: 188 additions & 1 deletion codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::TurnAppAttribution;
use codex_app_server_protocol::TurnAttribution;
use codex_app_server_protocol::TurnPluginAttribution;
use codex_app_server_protocol::TurnSkillAttribution;
use codex_app_server_protocol::TurnToolAttribution;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
Expand All @@ -54,6 +61,7 @@ pub(crate) struct AnalyticsEventsQueue {
#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: Option<AnalyticsEventsQueue>,
turn_attributions: Arc<Mutex<HashMap<String, TurnAttribution>>>,
}

impl AnalyticsEventsQueue {
Expand Down Expand Up @@ -124,11 +132,15 @@ impl AnalyticsEventsClient {
Self {
queue: (analytics_enabled != Some(false))
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
turn_attributions: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn disabled() -> Self {
Self { queue: None }
Self {
queue: None,
turn_attributions: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn track_skill_invocations(
Expand All @@ -139,6 +151,7 @@ impl AnalyticsEventsClient {
if invocations.is_empty() {
return;
}
self.record_skill_invocations(&tracking, &invocations);
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
SkillInvokedInput {
tracking,
Expand Down Expand Up @@ -210,11 +223,13 @@ impl AnalyticsEventsClient {

pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
let Some(queue) = self.queue.as_ref() else {
self.record_app_used(&tracking, &app);
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_app_used(&tracking, &app);
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
AppUsedInput { tracking, app },
)));
Expand All @@ -228,16 +243,27 @@ impl AnalyticsEventsClient {

pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
let Some(queue) = self.queue.as_ref() else {
self.record_plugin_used(&tracking, &plugin);
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_plugin_used(&tracking, &plugin);
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
crate::facts::PluginUsedInput { tracking, plugin },
)));
}

pub fn take_turn_attribution(&self, turn_id: &str) -> Option<TurnAttribution> {
let attribution = self
.turn_attributions
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.remove(turn_id)?;
(!attribution.is_empty()).then_some(attribution)
}

pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
Box::new(event),
Expand Down Expand Up @@ -371,6 +397,7 @@ impl AnalyticsEventsClient {
}

pub fn track_notification(&self, notification: ServerNotification) {
self.record_tool_from_notification(&notification);
if !matches!(
notification,
ServerNotification::TurnStarted(_)
Expand All @@ -385,6 +412,166 @@ impl AnalyticsEventsClient {
}
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}

fn update_turn_attribution(&self, turn_id: &str, update: impl FnOnce(&mut TurnAttribution)) {
let mut attributions = self
.turn_attributions
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
update(attributions.entry(turn_id.to_string()).or_default());
}

fn record_skill_invocations(
&self,
tracking: &TrackEventsContext,
invocations: &[SkillInvocation],
) {
self.update_turn_attribution(&tracking.turn_id, |attribution| {
for invocation in invocations {
let skill = TurnSkillAttribution {
skill_id: crate::reducer::skill_id_for_local_skill(
None,
None,
invocation.skill_path.as_path(),
invocation.skill_name.as_str(),
),
skill_name: invocation.skill_name.clone(),
skill_scope: Some(skill_scope_name(invocation.skill_scope).to_string()),
plugin_id: invocation.plugin_id.clone(),
invoke_type: Some(invocation_type_name(invocation.invocation_type).to_string()),
};
push_unique(&mut attribution.skills, skill);
}
});
}

fn record_app_used(&self, tracking: &TrackEventsContext, app: &AppInvocation) {
self.update_turn_attribution(&tracking.turn_id, |attribution| {
let app = TurnAppAttribution {
connector_id: app.connector_id.clone(),
app_name: app.app_name.clone(),
invoke_type: app
.invocation_type
.map(invocation_type_name)
.map(str::to_string),
};
push_unique(&mut attribution.apps, app);
});
}

fn record_plugin_used(&self, tracking: &TrackEventsContext, plugin: &PluginTelemetryMetadata) {
self.update_turn_attribution(&tracking.turn_id, |attribution| {
let plugin = TurnPluginAttribution {
plugin_id: plugin
.remote_plugin_id
.clone()
.unwrap_or_else(|| plugin.plugin_id.as_key()),
plugin_name: plugin.plugin_id.plugin_name.clone(),
marketplace_name: plugin.plugin_id.marketplace_name.clone(),
display_name: plugin
.capability_summary
.as_ref()
.map(|summary| summary.display_name.clone()),
};
push_unique(&mut attribution.plugins, plugin);
});
}

fn record_tool_from_notification(&self, notification: &ServerNotification) {
let (turn_id, item) = match notification {
ServerNotification::ItemStarted(notification) => {
(&notification.turn_id, &notification.item)
}
_ => return,
};
let Some(tool) = tool_attribution_from_item(item) else {
return;
};
self.update_turn_attribution(turn_id, |attribution| {
push_unique(&mut attribution.tools, tool);
});
}
}

fn push_unique<T: PartialEq>(items: &mut Vec<T>, item: T) {
if !items.contains(&item) {
items.push(item);
}
}

fn invocation_type_name(invocation_type: crate::facts::InvocationType) -> &'static str {
match invocation_type {
crate::facts::InvocationType::Explicit => "explicit",
crate::facts::InvocationType::Implicit => "implicit",
}
}

fn skill_scope_name(skill_scope: codex_protocol::protocol::SkillScope) -> &'static str {
match skill_scope {
codex_protocol::protocol::SkillScope::User => "user",
codex_protocol::protocol::SkillScope::Repo => "repo",
codex_protocol::protocol::SkillScope::System => "system",
codex_protocol::protocol::SkillScope::Admin => "admin",
}
}

fn tool_attribution_from_item(item: &ThreadItem) -> Option<TurnToolAttribution> {
match item {
ThreadItem::CommandExecution { id, .. } => Some(TurnToolAttribution {
id: id.clone(),
kind: "command_execution".to_string(),
name: Some("shell".to_string()),
server: None,
plugin_id: None,
}),
ThreadItem::McpToolCall {
id,
server,
tool,
plugin_id,
..
} => Some(TurnToolAttribution {
id: id.clone(),
kind: "mcp".to_string(),
name: Some(tool.clone()),
server: Some(server.clone()),
plugin_id: plugin_id.clone(),
}),
ThreadItem::DynamicToolCall {
id,
namespace,
tool,
..
} => Some(TurnToolAttribution {
id: id.clone(),
kind: "dynamic".to_string(),
name: Some(tool.clone()),
server: namespace.clone(),
plugin_id: None,
}),
ThreadItem::CollabAgentToolCall { id, tool, .. } => Some(TurnToolAttribution {
id: id.clone(),
kind: "collab_agent".to_string(),
name: Some(format!("{tool:?}")),
server: None,
plugin_id: None,
}),
ThreadItem::WebSearch { id, .. } => Some(TurnToolAttribution {
id: id.clone(),
kind: "web_search".to_string(),
name: Some("web_search".to_string()),
server: None,
plugin_id: None,
}),
ThreadItem::ImageGeneration { id, .. } => Some(TurnToolAttribution {
id: id.clone(),
kind: "image_generation".to_string(),
name: Some("image_generation".to_string()),
server: None,
plugin_id: None,
}),
_ => None,
}
}

async fn send_track_events(
Expand Down
Loading
Loading