From 1075fa72f6fea2e2d127949e0435b3d2a58b744d Mon Sep 17 00:00:00 2001 From: Morgante Pell Date: Sat, 23 Mar 2024 10:12:07 -0400 Subject: [PATCH] Reapply "fix: repair telemetry" This reverts commit 48892cadb9b4381470d0482f9059a6ba4e77d03a. --- crates/cli/src/analytics.rs | 102 ++++++++++++++++++++-------- crates/cli/src/commands/mod.rs | 60 ++++++++++------ crates/cli/src/commands/plumbing.rs | 22 +++--- 3 files changed, 127 insertions(+), 57 deletions(-) diff --git a/crates/cli/src/analytics.rs b/crates/cli/src/analytics.rs index 1711f978d..2e37fc568 100644 --- a/crates/cli/src/analytics.rs +++ b/crates/cli/src/analytics.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use anyhow::Result; use clap::Args; use lazy_static::lazy_static; @@ -19,6 +20,26 @@ pub enum AnalyticsEventName { Errored, } +impl fmt::Display for AnalyticsEventName { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + AnalyticsEventName::Invoked => write!(f, "command-invoked"), + AnalyticsEventName::Completed => write!(f, "command-completed"), + AnalyticsEventName::Errored => write!(f, "command-errored"), + } + } +} + +impl<'a> From<&'a AnalyticsEvent<'a>> for AnalyticsEventName { + fn from(event: &'a AnalyticsEvent) -> Self { + match event { + AnalyticsEvent::Invoked(_) => AnalyticsEventName::Invoked, + AnalyticsEvent::Completed(_) => AnalyticsEventName::Completed, + AnalyticsEvent::Errored(_) => AnalyticsEventName::Errored, + } + } +} + #[derive(Debug, Serialize, Clone)] #[serde(untagged)] pub enum AnalyticsEvent<'a> { @@ -107,7 +128,7 @@ lazy_static! { #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] -struct SegmentPayload { +struct SegmentPayload<'a> { user_id: Option, /// /// Anonymous ID is used, as we don't @@ -117,33 +138,24 @@ struct SegmentPayload { /// https://segment.com/docs/connections/spec/identify/#anonymous-id /// anonymous_id: Uuid, - event: AnalyticsEventName, + event: &'a AnalyticsEventName, properties: AnalyticsProperties, } -async fn track_event_segment( - analytics_event_name: AnalyticsEventName, - analytics_properties: AnalyticsProperties, +pub async fn track_event_line( + line: &str, + command: String, + args: Vec, installation_id: Uuid, user_id: Option, ) -> Result<()> { - let payload = SegmentPayload { - user_id, - anonymous_id: installation_id, - event: analytics_event_name, - properties: analytics_properties, - }; + let (name, json) = line + .split_once('\t') + .ok_or(anyhow::anyhow!("Invalid line, no tab found"))?; + let event = serde_json::from_str::(name).context("Invalid event name")?; + let data = serde_json::from_str::(json).context("Invalid event data")?; - // - // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track - // - reqwest::Client::new() - .post("https://api.segment.io/v1/track") - .basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None) - .json(&payload) - .timeout(Duration::from_secs(5)) - .send() - .await?; + track_event(event, data, command, args, installation_id, user_id).await; Ok(()) } @@ -180,13 +192,39 @@ pub async fn track_event( data: Some(analytics_event_data), }; - let _ = tokio::task::spawn(track_event_segment( - analytics_event_name, - properties, - installation_id, + let payload = SegmentPayload { user_id, - )) - .await; + anonymous_id: installation_id, + event: &analytics_event_name, + properties, + }; + + // + // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track + // + match reqwest::Client::new() + .post("https://api.segment.io/v1/track") + .basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None) + .json(&payload) + .timeout(Duration::from_secs(5)) + .send() + .await + { + Ok(response) => { + if !response.status().is_success() { + eprintln!( + "Failed to send event {}: {}", + analytics_event_name, + response.status() + ); + } + } + Err(e) => { + eprintln!("Failed to send event {}: {:#}", analytics_event_name, e); + } + } + + println!("Successfully sent event {}", analytics_event_name); } pub fn is_telemetry_disabled() -> bool { @@ -195,3 +233,13 @@ pub fn is_telemetry_disabled() -> bool { .parse::() .unwrap_or(false) } + +/// By default, telemetry is sent in the background so the main process can exit quickly. +/// If this environment variable is set to true, telemetry will be sent in the foreground. +/// This is useful for debugging telemetry issues. +pub fn is_telemetry_foregrounded() -> bool { + env::var("GRIT_TELEMETRY_FOREGROUND") + .unwrap_or_else(|_| "false".to_owned()) + .parse::() + .unwrap_or(false) +} diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 8948883d5..d542c9200 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -33,7 +33,10 @@ pub(crate) mod workflows_list; pub(crate) mod docgen; use crate::{ - analytics::{is_telemetry_disabled, AnalyticsEvent, CompletedEvent, ErroredEvent}, + analytics::{ + is_telemetry_disabled, is_telemetry_foregrounded, AnalyticsEvent, AnalyticsEventName, + CompletedEvent, ErroredEvent, + }, flags::{GlobalFormatFlags, OutputFormat}, updater::Updater, }; @@ -56,10 +59,10 @@ use parse::ParseArgs; use patterns::{PatternCommands, Patterns}; use plumbing::PlumbingArgs; use serde::Serialize; -use std::fmt; use std::io::Write; use std::process::{ChildStdin, Command, Stdio}; use std::time::Instant; +use std::{fmt, process::Child}; use tracing::instrument; use version::VersionArgs; @@ -186,7 +189,7 @@ fn maybe_spawn_analytics_worker( command: &Commands, args: &[String], updater: &Updater, -) -> Result> { +) -> Result> { if is_telemetry_disabled() { return Ok(None); } @@ -217,33 +220,37 @@ fn maybe_spawn_analytics_worker( .arg(command.to_string()) .arg("--args") .arg(args.join(" ")) - .stdout(Stdio::null()) - .stderr(Stdio::null()) .stdin(Stdio::piped()); - let stdin = cmd.spawn()?.stdin; - - match stdin { - Some(stdin) => Ok(Some(stdin)), - None => Err(Error::msg( - "Failed to open stdin of analytics worker process", - )), + if !is_telemetry_foregrounded() { + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::null()); } + + let child = cmd.spawn()?; + + Ok(Some(child)) } fn write_analytics_event( analytics_worker: Option<&mut ChildStdin>, analytics_event: &AnalyticsEvent, ) { - let serialized_name = serde_json::to_string(&analytics_event); + let serialized_name = serde_json::to_string(&AnalyticsEventName::from(analytics_event)); let serialized_event = serde_json::to_string(&analytics_event); match (analytics_worker, serialized_name, serialized_event) { (Some(analytics_worker), Ok(serialized_name), Ok(serialized_event)) => { let data = format!("{}\t{}\n", serialized_name, serialized_event); - let _ = analytics_worker.write_all(data.as_bytes()); + let res = analytics_worker.write_all(data.as_bytes()); + if let Err(e) = res { + println!("Failed to write to analytics worker: {:?}", e); + } + } + (None, _, _) => { + // No analytics worker to send event to, do nothing } (worker, name_err, event_err) => { - debug!( + println!( "Failed to serialize analytics event: {:?} {:?} {:?}", worker, name_err, event_err ); @@ -261,14 +268,15 @@ pub async fn run_command() -> Result<()> { let mut updater = Updater::from_current_bin().await?; updater.dump().await?; - let mut analytics_worker = + let mut analytics_child = match maybe_spawn_analytics_worker(&app.command, &analytics_args, &updater) { Err(_e) => { + println!("Failed to start the analytics worker process"); // We failed to start the analytics worker process None } Ok(None) => None, - Ok(Some(analytics_worker)) => Some(analytics_worker), + Ok(Some(child)) => Some(child), }; let log_level = app.format_flags.log_level.unwrap_or(match &app.command { @@ -303,7 +311,7 @@ pub async fn run_command() -> Result<()> { let start = Instant::now(); write_analytics_event( - analytics_worker.as_mut(), + analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()), &AnalyticsEvent::from_cmd(&app.command), ); @@ -359,7 +367,21 @@ pub async fn run_command() -> Result<()> { Err(_) => AnalyticsEvent::Errored(ErroredEvent::from_elapsed(elapsed)), }; - write_analytics_event(analytics_worker.as_mut(), &final_analytics_event); + write_analytics_event( + analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()), + &final_analytics_event, + ); + + // If we are in the foreground, wait for the analytics worker to finish + if is_telemetry_foregrounded() { + if let Some(mut child) = analytics_child { + println!("Waiting for analytics worker to finish"); + let res = child.wait(); + if let Err(e) = res { + println!("Failed to wait for analytics worker: {:?}", e); + } + } + } res } diff --git a/crates/cli/src/commands/plumbing.rs b/crates/cli/src/commands/plumbing.rs index 375b48a9f..58ebcf5c1 100644 --- a/crates/cli/src/commands/plumbing.rs +++ b/crates/cli/src/commands/plumbing.rs @@ -11,7 +11,7 @@ use std::io::{stdin, Read}; use std::path::Path; use std::path::PathBuf; -use crate::analytics::AnalyticsEventName; +use crate::analytics::{track_event_line, AnalyticsEventName}; use crate::flags::GlobalFormatFlags; use crate::lister::list_applyables; use crate::resolver::{get_grit_files_from, resolve_from, Source}; @@ -173,20 +173,20 @@ pub(crate) async fn run_plumbing( } PlumbingArgs::Analytics { args, shared_args } => { let buffer = read_input(&shared_args)?; - let events_to_send = buffer.lines().filter_map(|line| { - // Split line in name and JSON - let (name, json) = line.split_once('\t')?; - - Some(track_event( - serde_json::from_str::(name).ok()?, - serde_json::from_str::(json).ok()?, + for line in buffer.lines() { + let result = track_event_line( + line, args.command.clone(), args.args.clone(), args.installation_id, args.user_id.clone(), - )) - }); - join_all(events_to_send).await; + ) + .await; + if let Err(e) = result { + eprintln!("Error when processing {}: {:#}", line, e); + } + } + Ok(()) } PlumbingArgs::Check { args, shared_args } => {