Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppServerStartedInput;
use crate::facts::AppUsedInput;
use crate::facts::CustomAnalyticsFact;
use crate::facts::HookRunFact;
Expand Down Expand Up @@ -169,6 +170,21 @@ impl AnalyticsEventsClient {
));
}

pub fn track_app_server_started(
&self,
rpc_transport: AppServerRpcTransport,
duration: Duration,
) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::AppServerStarted(AppServerStartedInput {
runtime: current_runtime_metadata(),
rpc_transport,
duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
created_at: crate::now_unix_seconds(),
}),
));
}

pub fn track_guardian_review(
&self,
tracking: &GuardianReviewTrackContext,
Expand Down
23 changes: 23 additions & 0 deletions codex-rs/analytics/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use serde::Serialize;
#[serde(rename_all = "snake_case")]
pub enum AppServerRpcTransport {
Stdio,
UnixSocket,
Websocket,
InProcess,
}
Expand All @@ -56,6 +57,7 @@ pub(crate) struct TrackEventsRequest {
#[serde(untagged)]
pub(crate) enum TrackEventRequest {
SkillInvocation(SkillInvocationEventRequest),
AppServerStarted(CodexAppServerEventRequest),
ThreadInitialized(ThreadInitializedEvent),
GuardianReview(Box<GuardianReviewEventRequest>),
AppMentioned(CodexAppMentionedEventRequest),
Expand Down Expand Up @@ -144,6 +146,27 @@ pub(crate) struct CodexRuntimeMetadata {
pub(crate) runtime_arch: String,
}

/// Analytics parameters emitted when an app-server runtime starts.
#[derive(Serialize)]
pub(crate) struct CodexAppServerStartedEventParams {
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) rpc_transport: AppServerRpcTransport,
/// Elapsed measured startup duration, in milliseconds from a monotonic clock.
pub(crate) duration_ms: u64,
/// Time at which the event was recorded, in seconds since the Unix epoch.
pub(crate) created_at: u64,
}

/// Analytics events emitted for app-server lifecycle changes.
#[derive(Serialize)]
#[serde(tag = "event_type")]
pub(crate) enum CodexAppServerEventRequest {
#[serde(rename = "codex_app_server_started")]
Started {
event_params: CodexAppServerStartedEventParams,
},
}

#[derive(Serialize)]
pub(crate) struct ThreadInitializedEventParams {
pub(crate) thread_id: String,
Expand Down
11 changes: 11 additions & 0 deletions codex-rs/analytics/src/facts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ pub(crate) enum AnalyticsFact {
}

pub(crate) enum CustomAnalyticsFact {
AppServerStarted(AppServerStartedInput),
SubAgentThreadStarted(SubAgentThreadStartedInput),
Compaction(Box<CodexCompactionEvent>),
GuardianReview(Box<GuardianReviewEventParams>),
Expand All @@ -337,6 +338,16 @@ pub(crate) enum CustomAnalyticsFact {
PluginStateChanged(PluginStateChangedInput),
}

/// Analytics input captured when an app-server runtime starts.
pub(crate) struct AppServerStartedInput {
pub runtime: CodexRuntimeMetadata,
pub rpc_transport: AppServerRpcTransport,
/// Elapsed measured startup duration, in milliseconds from a monotonic clock.
pub duration_ms: u64,
/// Time at which the event was recorded, in seconds since the Unix epoch.
pub created_at: u64,
}

pub(crate) struct SkillInvokedInput {
pub tracking: TrackEventsContext,
pub invocations: Vec<SkillInvocation>,
Expand Down
23 changes: 23 additions & 0 deletions codex-rs/analytics/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::accepted_lines::accepted_line_repo_hash_for_cwd;
use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppServerEventRequest;
use crate::events::CodexAppServerStartedEventParams;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCollabAgentToolCallEventParams;
use crate::events::CodexCollabAgentToolCallEventRequest;
Expand Down Expand Up @@ -60,6 +62,7 @@ use crate::events::subagent_thread_started_event_request;
use crate::facts::AnalyticsFact;
use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppMentionedInput;
use crate::facts::AppServerStartedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CustomAnalyticsFact;
Expand Down Expand Up @@ -446,6 +449,9 @@ impl AnalyticsReducer {
self.ingest_server_request_aborted(completed_at_ms, request_id, out);
}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::AppServerStarted(input) => {
self.ingest_app_server_started(input, out);
}
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
}
Expand Down Expand Up @@ -483,6 +489,23 @@ impl AnalyticsReducer {
}
}

fn ingest_app_server_started(
&mut self,
input: AppServerStartedInput,
out: &mut Vec<TrackEventRequest>,
) {
out.push(TrackEventRequest::AppServerStarted(
CodexAppServerEventRequest::Started {
event_params: CodexAppServerStartedEventParams {
runtime: input.runtime,
rpc_transport: input.rpc_transport,
duration_ms: input.duration_ms,
created_at: input.created_at,
},
},
));
}

fn ingest_initialize(
&mut self,
connection_id: u64,
Expand Down
7 changes: 6 additions & 1 deletion codex-rs/app-server/src/analytics_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ use codex_login::AuthManager;
pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
default_analytics_enabled: bool,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
Some(
config
.analytics_enabled
.unwrap_or(default_analytics_enabled),
),
)
}
21 changes: 19 additions & 2 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;

use crate::analytics_utils::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
Expand Down Expand Up @@ -370,6 +371,7 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
}

async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let startup_started = Instant::now();
let channel_capacity = args.channel_capacity.max(1);
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
Expand All @@ -380,8 +382,11 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
let analytics_events_client = analytics_events_client_from_config(
Arc::clone(&auth_manager),
args.config.as_ref(),
/*default_analytics_enabled*/ true,
);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
Expand Down Expand Up @@ -421,6 +426,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let app_server_started_analytics_events_client = analytics_events_client.clone();
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
Expand All @@ -442,6 +448,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let mut thread_created_rx = processor.thread_created_receiver();
let session = Arc::new(ConnectionSessionState::new());
let mut listen_for_threads = true;
let mut app_server_started_tracked = false;

loop {
tokio::select! {
Expand Down Expand Up @@ -479,7 +486,17 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
}
}
Some(ProcessorCommand::Notification(notification)) => {
let initialized =
matches!(notification, ClientNotification::Initialized);
processor.process_client_notification(notification).await;
if initialized && !app_server_started_tracked {
app_server_started_analytics_events_client
.track_app_server_started(
AppServerRpcTransport::InProcess,
startup_started.elapsed(),
);
app_server_started_tracked = true;
}
}
None => {
break;
Expand Down
35 changes: 29 additions & 6 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::time::Instant;

use crate::analytics_utils::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
Expand Down Expand Up @@ -428,6 +429,7 @@ pub async fn run_main_with_transport_options(
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
let startup_started = Instant::now();
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
Expand Down Expand Up @@ -787,14 +789,19 @@ pub async fn run_main_with_transport_options(

let processor_handle = tokio::spawn({
let auth_manager = Arc::clone(&auth_manager);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
let analytics_events_client = analytics_events_client_from_config(
Arc::clone(&auth_manager),
&config,
default_analytics_enabled,
);
let rpc_transport = analytics_rpc_transport(&transport);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
));
let initialize_notification_sender = outgoing_message_sender.clone();
let outbound_control_tx = outbound_control_tx;
analytics_events_client.track_app_server_started(rpc_transport, startup_started.elapsed());
Comment thread
anp-oai marked this conversation as resolved.
Comment thread
anp-oai marked this conversation as resolved.
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
analytics_events_client,
Expand All @@ -809,7 +816,7 @@ pub async fn run_main_with_transport_options(
session_source,
auth_manager,
installation_id,
rpc_transport: analytics_rpc_transport(&transport),
rpc_transport,
remote_control_handle: Some(remote_control_handle.clone()),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
}));
Expand Down Expand Up @@ -1086,15 +1093,19 @@ pub async fn run_main_with_transport_options(
fn analytics_rpc_transport(transport: &AppServerTransport) -> AppServerRpcTransport {
match transport {
AppServerTransport::Stdio => AppServerRpcTransport::Stdio,
AppServerTransport::UnixSocket { .. }
| AppServerTransport::WebSocket { .. }
| AppServerTransport::Off => AppServerRpcTransport::Websocket,
AppServerTransport::UnixSocket { .. } => AppServerRpcTransport::UnixSocket,
AppServerTransport::WebSocket { .. } | AppServerTransport::Off => {
AppServerRpcTransport::Websocket
}
}
}

#[cfg(test)]
mod tests {
use super::LogFormat;
use super::analytics_rpc_transport;
use crate::transport::AppServerTransport;
use codex_analytics::AppServerRpcTransport;
use pretty_assertions::assert_eq;

#[test]
Expand All @@ -1114,4 +1125,16 @@ mod tests {
assert_eq!(LogFormat::from_env_value(Some("text")), LogFormat::Default);
assert_eq!(LogFormat::from_env_value(Some("jsonl")), LogFormat::Default);
}

#[test]
fn analytics_rpc_transport_preserves_unix_socket() {
let transport = "unix://codex-app-server.sock"
.parse::<AppServerTransport>()
.expect("unix socket transport should parse");

assert!(matches!(
analytics_rpc_transport(&transport),
AppServerRpcTransport::UnixSocket
));
}
}
7 changes: 5 additions & 2 deletions codex-rs/app-server/src/message_processor_tracing_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,11 @@ async fn build_test_processor(
Arg0DispatchPaths::default(),
Arc::new(codex_config::NoopThreadConfigLoader),
);
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref());
let analytics_events_client = analytics_events_client_from_config(
Arc::clone(&auth_manager),
config.as_ref(),
/*default_analytics_enabled*/ true,
);
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
analytics_events_client.clone(),
Expand Down
Loading
Loading