From 73e866f7fd141c45cbcaa92a468b9e230751886e Mon Sep 17 00:00:00 2001 From: Morgante Pell Date: Sat, 23 Mar 2024 14:54:12 -0400 Subject: [PATCH] fix: repair telemetry processing (#64) --- Cargo.lock | 1 - crates/cli/Cargo.toml | 1 - crates/cli/src/analytics.rs | 102 ++++++++++++++++++++-------- crates/cli/src/commands/mod.rs | 64 +++++++++++------ crates/cli/src/commands/plumbing.rs | 26 +++---- crates/cli_bin/tests/analytics.rs | 28 ++++++++ 6 files changed, 159 insertions(+), 63 deletions(-) create mode 100644 crates/cli_bin/tests/analytics.rs diff --git a/Cargo.lock b/Cargo.lock index 15abb13e3..6e6071b2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,7 +1915,6 @@ dependencies = [ "dialoguer", "env_logger", "flate2", - "futures", "git2", "grit_cache", "ignore", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 48bc31be5..3a2dd3490 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -46,7 +46,6 @@ tempfile = "3.1" similar = "2.2.1" dialoguer = "0.10.4" console = "0.15.7" -futures = "0.3.28" rayon = "1.8.0" dashmap = "5.5.3" clap-markdown = { git = "https://github.com/getgrit/clap-markdown", optional = true } diff --git a/crates/cli/src/analytics.rs b/crates/cli/src/analytics.rs index 1711f978d..2e37fc568 100644 --- a/crates/cli/src/analytics.rs +++ b/crates/cli/src/analytics.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use anyhow::Result; use clap::Args; use lazy_static::lazy_static; @@ -19,6 +20,26 @@ pub enum AnalyticsEventName { Errored, } +impl fmt::Display for AnalyticsEventName { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + AnalyticsEventName::Invoked => write!(f, "command-invoked"), + AnalyticsEventName::Completed => write!(f, "command-completed"), + AnalyticsEventName::Errored => write!(f, "command-errored"), + } + } +} + +impl<'a> From<&'a AnalyticsEvent<'a>> for AnalyticsEventName { + fn from(event: &'a AnalyticsEvent) -> Self { + match event { + AnalyticsEvent::Invoked(_) => AnalyticsEventName::Invoked, + AnalyticsEvent::Completed(_) => AnalyticsEventName::Completed, + AnalyticsEvent::Errored(_) => AnalyticsEventName::Errored, + } + } +} + #[derive(Debug, Serialize, Clone)] #[serde(untagged)] pub enum AnalyticsEvent<'a> { @@ -107,7 +128,7 @@ lazy_static! { #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] -struct SegmentPayload { +struct SegmentPayload<'a> { user_id: Option, /// /// Anonymous ID is used, as we don't @@ -117,33 +138,24 @@ struct SegmentPayload { /// https://segment.com/docs/connections/spec/identify/#anonymous-id /// anonymous_id: Uuid, - event: AnalyticsEventName, + event: &'a AnalyticsEventName, properties: AnalyticsProperties, } -async fn track_event_segment( - analytics_event_name: AnalyticsEventName, - analytics_properties: AnalyticsProperties, +pub async fn track_event_line( + line: &str, + command: String, + args: Vec, installation_id: Uuid, user_id: Option, ) -> Result<()> { - let payload = SegmentPayload { - user_id, - anonymous_id: installation_id, - event: analytics_event_name, - properties: analytics_properties, - }; + let (name, json) = line + .split_once('\t') + .ok_or(anyhow::anyhow!("Invalid line, no tab found"))?; + let event = serde_json::from_str::(name).context("Invalid event name")?; + let data = serde_json::from_str::(json).context("Invalid event data")?; - // - // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track - // - reqwest::Client::new() - .post("https://api.segment.io/v1/track") - .basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None) - .json(&payload) - .timeout(Duration::from_secs(5)) - .send() - .await?; + track_event(event, data, command, args, installation_id, user_id).await; Ok(()) } @@ -180,13 +192,39 @@ pub async fn track_event( data: Some(analytics_event_data), }; - let _ = tokio::task::spawn(track_event_segment( - analytics_event_name, - properties, - installation_id, + let payload = SegmentPayload { user_id, - )) - .await; + anonymous_id: installation_id, + event: &analytics_event_name, + properties, + }; + + // + // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track + // + match reqwest::Client::new() + .post("https://api.segment.io/v1/track") + .basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None) + .json(&payload) + .timeout(Duration::from_secs(5)) + .send() + .await + { + Ok(response) => { + if !response.status().is_success() { + eprintln!( + "Failed to send event {}: {}", + analytics_event_name, + response.status() + ); + } + } + Err(e) => { + eprintln!("Failed to send event {}: {:#}", analytics_event_name, e); + } + } + + println!("Successfully sent event {}", analytics_event_name); } pub fn is_telemetry_disabled() -> bool { @@ -195,3 +233,13 @@ pub fn is_telemetry_disabled() -> bool { .parse::() .unwrap_or(false) } + +/// By default, telemetry is sent in the background so the main process can exit quickly. +/// If this environment variable is set to true, telemetry will be sent in the foreground. +/// This is useful for debugging telemetry issues. +pub fn is_telemetry_foregrounded() -> bool { + env::var("GRIT_TELEMETRY_FOREGROUND") + .unwrap_or_else(|_| "false".to_owned()) + .parse::() + .unwrap_or(false) +} diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 8948883d5..3eafd26ee 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -33,11 +33,14 @@ pub(crate) mod workflows_list; pub(crate) mod docgen; use crate::{ - analytics::{is_telemetry_disabled, AnalyticsEvent, CompletedEvent, ErroredEvent}, + analytics::{ + is_telemetry_disabled, is_telemetry_foregrounded, AnalyticsEvent, AnalyticsEventName, + CompletedEvent, ErroredEvent, + }, flags::{GlobalFormatFlags, OutputFormat}, updater::Updater, }; -use anyhow::{Error, Result}; +use anyhow::{Result}; use apply::ApplyArgs; use auth::{Auth, AuthCommands}; use check::CheckArg; @@ -49,17 +52,17 @@ use indicatif_log_bridge::LogWrapper; use init::InitArgs; use install::InstallArgs; use list::ListArgs; -use log::{debug, LevelFilter}; +use log::{LevelFilter}; use lsp::LspArgs; use marzano_messenger::emit::ApplyDetails; use parse::ParseArgs; use patterns::{PatternCommands, Patterns}; use plumbing::PlumbingArgs; use serde::Serialize; -use std::fmt; use std::io::Write; use std::process::{ChildStdin, Command, Stdio}; use std::time::Instant; +use std::{fmt, process::Child}; use tracing::instrument; use version::VersionArgs; @@ -186,7 +189,7 @@ fn maybe_spawn_analytics_worker( command: &Commands, args: &[String], updater: &Updater, -) -> Result> { +) -> Result> { if is_telemetry_disabled() { return Ok(None); } @@ -217,33 +220,37 @@ fn maybe_spawn_analytics_worker( .arg(command.to_string()) .arg("--args") .arg(args.join(" ")) - .stdout(Stdio::null()) - .stderr(Stdio::null()) .stdin(Stdio::piped()); - let stdin = cmd.spawn()?.stdin; - - match stdin { - Some(stdin) => Ok(Some(stdin)), - None => Err(Error::msg( - "Failed to open stdin of analytics worker process", - )), + if !is_telemetry_foregrounded() { + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::null()); } + + let child = cmd.spawn()?; + + Ok(Some(child)) } fn write_analytics_event( analytics_worker: Option<&mut ChildStdin>, analytics_event: &AnalyticsEvent, ) { - let serialized_name = serde_json::to_string(&analytics_event); + let serialized_name = serde_json::to_string(&AnalyticsEventName::from(analytics_event)); let serialized_event = serde_json::to_string(&analytics_event); match (analytics_worker, serialized_name, serialized_event) { (Some(analytics_worker), Ok(serialized_name), Ok(serialized_event)) => { let data = format!("{}\t{}\n", serialized_name, serialized_event); - let _ = analytics_worker.write_all(data.as_bytes()); + let res = analytics_worker.write_all(data.as_bytes()); + if let Err(e) = res { + println!("Failed to write to analytics worker: {:?}", e); + } + } + (None, _, _) => { + // No analytics worker to send event to, do nothing } (worker, name_err, event_err) => { - debug!( + println!( "Failed to serialize analytics event: {:?} {:?} {:?}", worker, name_err, event_err ); @@ -261,14 +268,15 @@ pub async fn run_command() -> Result<()> { let mut updater = Updater::from_current_bin().await?; updater.dump().await?; - let mut analytics_worker = + let mut analytics_child = match maybe_spawn_analytics_worker(&app.command, &analytics_args, &updater) { Err(_e) => { + println!("Failed to start the analytics worker process"); // We failed to start the analytics worker process None } Ok(None) => None, - Ok(Some(analytics_worker)) => Some(analytics_worker), + Ok(Some(child)) => Some(child), }; let log_level = app.format_flags.log_level.unwrap_or(match &app.command { @@ -303,7 +311,7 @@ pub async fn run_command() -> Result<()> { let start = Instant::now(); write_analytics_event( - analytics_worker.as_mut(), + analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()), &AnalyticsEvent::from_cmd(&app.command), ); @@ -359,7 +367,21 @@ pub async fn run_command() -> Result<()> { Err(_) => AnalyticsEvent::Errored(ErroredEvent::from_elapsed(elapsed)), }; - write_analytics_event(analytics_worker.as_mut(), &final_analytics_event); + write_analytics_event( + analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()), + &final_analytics_event, + ); + + // If we are in the foreground, wait for the analytics worker to finish + if is_telemetry_foregrounded() { + if let Some(mut child) = analytics_child { + println!("Waiting for analytics worker to finish"); + let res = child.wait(); + if let Err(e) = res { + println!("Failed to wait for analytics worker: {:?}", e); + } + } + } res } diff --git a/crates/cli/src/commands/plumbing.rs b/crates/cli/src/commands/plumbing.rs index 375b48a9f..c5264b642 100644 --- a/crates/cli/src/commands/plumbing.rs +++ b/crates/cli/src/commands/plumbing.rs @@ -11,14 +11,14 @@ use std::io::{stdin, Read}; use std::path::Path; use std::path::PathBuf; -use crate::analytics::AnalyticsEventName; +use crate::analytics::{track_event_line}; use crate::flags::GlobalFormatFlags; use crate::lister::list_applyables; use crate::resolver::{get_grit_files_from, resolve_from, Source}; use crate::utils::is_pattern_name; -use futures::future::join_all; -use super::super::analytics::{track_event, AnalyticsArgs}; + +use super::super::analytics::{AnalyticsArgs}; use super::apply_pattern::{run_apply_pattern, ApplyPatternArgs}; use super::check::{run_check, CheckArg}; use super::init::{init_config_from_cwd, init_global_grit_modules}; @@ -173,20 +173,20 @@ pub(crate) async fn run_plumbing( } PlumbingArgs::Analytics { args, shared_args } => { let buffer = read_input(&shared_args)?; - let events_to_send = buffer.lines().filter_map(|line| { - // Split line in name and JSON - let (name, json) = line.split_once('\t')?; - - Some(track_event( - serde_json::from_str::(name).ok()?, - serde_json::from_str::(json).ok()?, + for line in buffer.lines() { + let result = track_event_line( + line, args.command.clone(), args.args.clone(), args.installation_id, args.user_id.clone(), - )) - }); - join_all(events_to_send).await; + ) + .await; + if let Err(e) = result { + eprintln!("Error when processing {}: {:#}", line, e); + } + } + Ok(()) } PlumbingArgs::Check { args, shared_args } => { diff --git a/crates/cli_bin/tests/analytics.rs b/crates/cli_bin/tests/analytics.rs new file mode 100644 index 000000000..67414cfa4 --- /dev/null +++ b/crates/cli_bin/tests/analytics.rs @@ -0,0 +1,28 @@ +use crate::common::{get_fixture, get_test_cmd}; +use anyhow::Result; + +mod common; + +#[test] +fn confirm_telemetry_flush() -> Result<()> { + let (_temp_dir, temp_fixtures_root) = get_fixture("grit_modules", true)?; + + let mut cmd = get_test_cmd()?; + cmd.env("GRIT_TELEMETRY_DISABLED", "false"); + cmd.env("GRIT_TELEMETRY_FOREGROUND", "true"); + cmd.arg("doctor").current_dir(temp_fixtures_root); + + let output = cmd.output()?; + println!("output: {:?}", String::from_utf8(output.stdout.clone())?); + + assert!( + output.status.success(), + "Command didn't finish successfully" + ); + + // Confirm output flushed + let output_str = String::from_utf8(output.stdout.clone())?; + assert!(output_str.contains("Successfully sent event command-completed")); + + Ok(()) +}