Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: repair telemetry processing #64

Merged
merged 6 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions crates/cli/src/analytics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use anyhow::Result;
use clap::Args;
use lazy_static::lazy_static;
Expand All @@ -19,6 +20,26 @@ pub enum AnalyticsEventName {
Errored,
}

impl fmt::Display for AnalyticsEventName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AnalyticsEventName::Invoked => write!(f, "command-invoked"),
AnalyticsEventName::Completed => write!(f, "command-completed"),
AnalyticsEventName::Errored => write!(f, "command-errored"),
}
}
}

impl<'a> From<&'a AnalyticsEvent<'a>> for AnalyticsEventName {
fn from(event: &'a AnalyticsEvent) -> Self {
match event {
AnalyticsEvent::Invoked(_) => AnalyticsEventName::Invoked,
AnalyticsEvent::Completed(_) => AnalyticsEventName::Completed,
AnalyticsEvent::Errored(_) => AnalyticsEventName::Errored,
}
}
}

#[derive(Debug, Serialize, Clone)]
#[serde(untagged)]
pub enum AnalyticsEvent<'a> {
Expand Down Expand Up @@ -107,7 +128,7 @@ lazy_static! {

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SegmentPayload {
struct SegmentPayload<'a> {
user_id: Option<String>,
///
/// Anonymous ID is used, as we don't
Expand All @@ -117,33 +138,24 @@ struct SegmentPayload {
/// https://segment.com/docs/connections/spec/identify/#anonymous-id
///
anonymous_id: Uuid,
event: AnalyticsEventName,
event: &'a AnalyticsEventName,
properties: AnalyticsProperties,
}

async fn track_event_segment(
analytics_event_name: AnalyticsEventName,
analytics_properties: AnalyticsProperties,
pub async fn track_event_line(
line: &str,
command: String,
args: Vec<String>,
installation_id: Uuid,
user_id: Option<String>,
) -> Result<()> {
let payload = SegmentPayload {
user_id,
anonymous_id: installation_id,
event: analytics_event_name,
properties: analytics_properties,
};
let (name, json) = line
.split_once('\t')
.ok_or(anyhow::anyhow!("Invalid line, no tab found"))?;
let event = serde_json::from_str::<AnalyticsEventName>(name).context("Invalid event name")?;
let data = serde_json::from_str::<serde_json::Value>(json).context("Invalid event data")?;

//
// https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track
//
reqwest::Client::new()
.post("https://api.segment.io/v1/track")
.basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None)
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await?;
track_event(event, data, command, args, installation_id, user_id).await;

Ok(())
}
Expand Down Expand Up @@ -180,13 +192,39 @@ pub async fn track_event(
data: Some(analytics_event_data),
};

let _ = tokio::task::spawn(track_event_segment(
analytics_event_name,
properties,
installation_id,
let payload = SegmentPayload {
user_id,
))
.await;
anonymous_id: installation_id,
event: &analytics_event_name,
properties,
};

//
// https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track
//
match reqwest::Client::new()
.post("https://api.segment.io/v1/track")
.basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None)
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(response) => {
if !response.status().is_success() {
eprintln!(
"Failed to send event {}: {}",
analytics_event_name,
response.status()
);
}
}
Err(e) => {
eprintln!("Failed to send event {}: {:#}", analytics_event_name, e);
}
}

println!("Successfully sent event {}", analytics_event_name);
}

pub fn is_telemetry_disabled() -> bool {
Expand All @@ -195,3 +233,13 @@ pub fn is_telemetry_disabled() -> bool {
.parse::<bool>()
.unwrap_or(false)
}

/// By default, telemetry is sent in the background so the main process can exit quickly.
/// If this environment variable is set to true, telemetry will be sent in the foreground.
/// This is useful for debugging telemetry issues.
pub fn is_telemetry_foregrounded() -> bool {
env::var("GRIT_TELEMETRY_FOREGROUND")
.unwrap_or_else(|_| "false".to_owned())
.parse::<bool>()
.unwrap_or(false)
}
64 changes: 43 additions & 21 deletions crates/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ pub(crate) mod workflows_list;
pub(crate) mod docgen;

use crate::{
analytics::{is_telemetry_disabled, AnalyticsEvent, CompletedEvent, ErroredEvent},
analytics::{
is_telemetry_disabled, is_telemetry_foregrounded, AnalyticsEvent, AnalyticsEventName,
CompletedEvent, ErroredEvent,
},
flags::{GlobalFormatFlags, OutputFormat},
updater::Updater,
};
Comment on lines 33 to 42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [55-55]

Unused import: debug. Since it's not used in this file, consider removing it to clean up the code.

- use log::{debug, LevelFilter};
+ use log::LevelFilter;

📝 NOTE
This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [43-43]

Unused import: Error. It appears to be unused in the current implementation, so consider removing it.

- use anyhow::{Error, Result};
+ use anyhow::Result;

use anyhow::{Error, Result};
use anyhow::{Result};
use apply::ApplyArgs;
use auth::{Auth, AuthCommands};
use check::CheckArg;
Expand All @@ -49,17 +52,17 @@ use indicatif_log_bridge::LogWrapper;
use init::InitArgs;
use install::InstallArgs;
use list::ListArgs;
use log::{debug, LevelFilter};
use log::{LevelFilter};
use lsp::LspArgs;
use marzano_messenger::emit::ApplyDetails;
use parse::ParseArgs;
use patterns::{PatternCommands, Patterns};
use plumbing::PlumbingArgs;
use serde::Serialize;
use std::fmt;
use std::io::Write;
use std::process::{ChildStdin, Command, Stdio};
use std::time::Instant;
use std::{fmt, process::Child};
use tracing::instrument;
use version::VersionArgs;

Expand Down Expand Up @@ -186,7 +189,7 @@ fn maybe_spawn_analytics_worker(
command: &Commands,
args: &[String],
updater: &Updater,
) -> Result<Option<ChildStdin>> {
) -> Result<Option<Child>> {
if is_telemetry_disabled() {
return Ok(None);
}
Expand Down Expand Up @@ -217,33 +220,37 @@ fn maybe_spawn_analytics_worker(
.arg(command.to_string())
.arg("--args")
.arg(args.join(" "))
.stdout(Stdio::null())
.stderr(Stdio::null())
.stdin(Stdio::piped());

let stdin = cmd.spawn()?.stdin;

match stdin {
Some(stdin) => Ok(Some(stdin)),
None => Err(Error::msg(
"Failed to open stdin of analytics worker process",
)),
if !is_telemetry_foregrounded() {
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
}

let child = cmd.spawn()?;

Ok(Some(child))
}

fn write_analytics_event(
analytics_worker: Option<&mut ChildStdin>,
analytics_event: &AnalyticsEvent,
) {
let serialized_name = serde_json::to_string(&analytics_event);
let serialized_name = serde_json::to_string(&AnalyticsEventName::from(analytics_event));
let serialized_event = serde_json::to_string(&analytics_event);
match (analytics_worker, serialized_name, serialized_event) {
(Some(analytics_worker), Ok(serialized_name), Ok(serialized_event)) => {
let data = format!("{}\t{}\n", serialized_name, serialized_event);
let _ = analytics_worker.write_all(data.as_bytes());
let res = analytics_worker.write_all(data.as_bytes());
if let Err(e) = res {
println!("Failed to write to analytics worker: {:?}", e);
}
}
(None, _, _) => {
// No analytics worker to send event to, do nothing
}
(worker, name_err, event_err) => {
debug!(
println!(
"Failed to serialize analytics event: {:?} {:?} {:?}",
worker, name_err, event_err
);
Expand All @@ -261,14 +268,15 @@ pub async fn run_command() -> Result<()> {
let mut updater = Updater::from_current_bin().await?;
updater.dump().await?;

let mut analytics_worker =
let mut analytics_child =
match maybe_spawn_analytics_worker(&app.command, &analytics_args, &updater) {
Err(_e) => {
println!("Failed to start the analytics worker process");
// We failed to start the analytics worker process
None
}
Ok(None) => None,
Ok(Some(analytics_worker)) => Some(analytics_worker),
Ok(Some(child)) => Some(child),
};

let log_level = app.format_flags.log_level.unwrap_or(match &app.command {
Expand Down Expand Up @@ -303,7 +311,7 @@ pub async fn run_command() -> Result<()> {
let start = Instant::now();

write_analytics_event(
analytics_worker.as_mut(),
analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()),
&AnalyticsEvent::from_cmd(&app.command),
);

Expand Down Expand Up @@ -359,7 +367,21 @@ pub async fn run_command() -> Result<()> {
Err(_) => AnalyticsEvent::Errored(ErroredEvent::from_elapsed(elapsed)),
};

write_analytics_event(analytics_worker.as_mut(), &final_analytics_event);
write_analytics_event(
analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()),
&final_analytics_event,
);

// If we are in the foreground, wait for the analytics worker to finish
if is_telemetry_foregrounded() {
if let Some(mut child) = analytics_child {
println!("Waiting for analytics worker to finish");
let res = child.wait();
if let Err(e) = res {
println!("Failed to wait for analytics worker: {:?}", e);
}
}
}

res
}
26 changes: 13 additions & 13 deletions crates/cli/src/commands/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use std::io::{stdin, Read};
use std::path::Path;
use std::path::PathBuf;

use crate::analytics::AnalyticsEventName;
use crate::analytics::{track_event_line};
use crate::flags::GlobalFormatFlags;
use crate::lister::list_applyables;
use crate::resolver::{get_grit_files_from, resolve_from, Source};
use crate::utils::is_pattern_name;
use futures::future::join_all;

use super::super::analytics::{track_event, AnalyticsArgs};

use super::super::analytics::{AnalyticsArgs};
use super::apply_pattern::{run_apply_pattern, ApplyPatternArgs};
use super::check::{run_check, CheckArg};
use super::init::{init_config_from_cwd, init_global_grit_modules};
Expand Down Expand Up @@ -173,20 +173,20 @@ pub(crate) async fn run_plumbing(
}
PlumbingArgs::Analytics { args, shared_args } => {
let buffer = read_input(&shared_args)?;
let events_to_send = buffer.lines().filter_map(|line| {
// Split line in name and JSON
let (name, json) = line.split_once('\t')?;

Some(track_event(
serde_json::from_str::<AnalyticsEventName>(name).ok()?,
serde_json::from_str::<serde_json::Value>(json).ok()?,
for line in buffer.lines() {
let result = track_event_line(
line,
args.command.clone(),
args.args.clone(),
args.installation_id,
args.user_id.clone(),
))
});
join_all(events_to_send).await;
)
.await;
if let Err(e) = result {
eprintln!("Error when processing {}: {:#}", line, e);
}
}

Ok(())
}
PlumbingArgs::Check { args, shared_args } => {
Expand Down
28 changes: 28 additions & 0 deletions crates/cli_bin/tests/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::common::{get_fixture, get_test_cmd};
use anyhow::Result;

mod common;

#[test]
fn confirm_telemetry_flush() -> Result<()> {
let (_temp_dir, temp_fixtures_root) = get_fixture("grit_modules", true)?;

let mut cmd = get_test_cmd()?;
cmd.env("GRIT_TELEMETRY_DISABLED", "false");
cmd.env("GRIT_TELEMETRY_FOREGROUND", "true");
cmd.arg("doctor").current_dir(temp_fixtures_root);

let output = cmd.output()?;
println!("output: {:?}", String::from_utf8(output.stdout.clone())?);

assert!(
output.status.success(),
"Command didn't finish successfully"
);

// Confirm output flushed
let output_str = String::from_utf8(output.stdout.clone())?;
assert!(output_str.contains("Successfully sent event command-completed"));

Ok(())
}
Loading