Skip to content

Commit

Permalink
add instrumentation and dynamic debug bes events
Browse files Browse the repository at this point in the history
  • Loading branch information
DolceTriade committed Mar 8, 2024
1 parent c6248e7 commit 2ec9bd2
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 30 deletions.
5 changes: 3 additions & 2 deletions blade/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ rust_binary(
"//blade/bep",
"//blade/bytestream",
"//blade/db",
"//blade/metrics",
"//blade/state",
"//blade/tailwindmerge",
"//blade/metrics",
"@crate//:actix-files",
"@crate//:actix-web",
"@crate//:ansi-to-html",
"@crate//:anyhow",
"@crate//:broadcaster",
"@crate//:cfg-if",
"@crate//:clap",
"@crate//:prometheus-client",
"@crate//:diesel",
"@crate//:futures",
"@crate//:humantime",
Expand All @@ -50,6 +49,7 @@ rust_binary(
"@crate//:leptos_meta",
"@crate//:leptos_router",
"@crate//:log",
"@crate//:prometheus-client",
"@crate//:serde",
"@crate//:time",
"@crate//:tokio",
Expand All @@ -61,6 +61,7 @@ rust_binary(
"@crate//:url",
"@crate//:url-escape",
"@crate//:zip",
"@crate__regex-1.9.3//:regex",
"@crate__wasm-bindgen-0.2.87//:wasm_bindgen",
"@crate__web-sys-0.3.64//:web_sys",
"@rules_rust//tools/runfiles",
Expand Down
22 changes: 21 additions & 1 deletion blade/admin.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
use actix_web::*;
use anyhow::Context;
use futures::prelude::future::FutureExt;
use std::net::SocketAddr;
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
use tracing::instrument;

#[instrument]
pub async fn run_admin_server(
admin_host: SocketAddr,
filter_channel: tokio::sync::mpsc::Sender<String>,
span_channel: tokio::sync::mpsc::Sender<bool>,
re_handle: Arc<Mutex<regex::Regex>>,
) -> anyhow::Result<()> {
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(filter_channel.clone()))
.app_data(web::Data::new(span_channel.clone()))
.app_data(web::Data::new(re_handle.clone()))
.service(set_filter)
.service(set_span)
.service(metrics_handler)
.service(debug_message_handler)
.wrap(tracing_actix_web::TracingLogger::<
super::BladeRootSpanBuilder,
>::new())
Expand Down Expand Up @@ -71,3 +77,17 @@ async fn metrics_handler() -> Result<HttpResponse> {
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.body(body))
}

#[post("/admin/debug_message")]
#[instrument(skip(re_handle))]
async fn debug_message_handler(
re_handle: web::Data<Arc<Mutex<regex::Regex>>>,
body: String,
) -> Result<HttpResponse> {
{
let mut re = re_handle.lock().unwrap();
*re = regex::Regex::new(&body)
.map_err(|e| error::ErrorInternalServerError(format!("{e}")))?;
}
HttpResponse::Ok().await
}
1 change: 1 addition & 0 deletions blade/bep/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rust_library(
"//blade/bep/protos:bep_proto",
"//blade/bep/protos:build_event_stream_rust_proto",
"//blade/prototime",
"//blade/metrics",
"//blade/state",
"@crate//:anyhow",
"@crate//:async-stream",
Expand Down
87 changes: 70 additions & 17 deletions blade/bep/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ use anyhow::Result;
use build_event_stream_proto::*;
use build_proto::google::devtools::build::v1::*;
use prost::Message;
use regex::Regex;
use state::DBManager;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Response, Status};
use tracing::instrument;
use tracing::span;
use tracing::Instrument;
use tracing::Level;

mod buildinfo;
mod options;
Expand Down Expand Up @@ -73,6 +80,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
type PublishBuildToolEventStreamStream =
ReceiverStream<Result<PublishBuildToolEventStreamResponse, Status>>;

#[instrument(skip_all)]
async fn publish_lifecycle_event(
&self,
_request: tonic::Request<
Expand All @@ -83,6 +91,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
return Ok(Response::new(empty_proto::google::protobuf::Empty {}));
}

#[instrument(skip_all)]
async fn publish_build_tool_event_stream(
&self,
request: tonic::Request<tonic::Streaming<PublishBuildToolEventStreamRequest>>,
Expand All @@ -94,6 +103,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
let handlers = self.handlers.clone();
tokio::spawn(async move {
let mut session_uuid = "".to_string();
let span = tracing::span::Span::current();
loop {
let maybe_message = in_stream
.message()
Expand All @@ -120,7 +130,8 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
};
if session_uuid.is_empty() {
session_uuid = uuid.clone();
tracing::info!("{}: Stream started", session_uuid);
span.record("session_uuid", &session_uuid);
tracing::info!("Stream started");
let mut already_over = false;
if let Ok(mut db) = global.db_manager.as_ref().get() {
if let Ok(inv) = db.get_shallow_invocation(&session_uuid) {
Expand All @@ -136,7 +147,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
}
}
if already_over {
tracing::warn!("{}: session already ended", session_uuid);
tracing::warn!("session already ended");

let _ = tx
.send(Err(Status::new(
Expand Down Expand Up @@ -170,8 +181,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
let be_or = build_event_stream::BuildEvent::decode(&any.value[..]);
if be_or.is_err() {
tracing::error!(
"{}: invalid event: {:#?}",
uuid,
"invalid event: {:#?}",
be_or.unwrap_err()
);
{
Expand Down Expand Up @@ -221,7 +231,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
&uuid,
&be,
) {
tracing::warn!("{}: {:#?}", session_uuid, e);
tracing::warn!("{:#?}", e);
}
}
}
Expand All @@ -240,56 +250,60 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
}))
.await
.inspect_err(|e| {
tracing::warn!("{}: failed to send message: {:#?}", session_uuid, e)
tracing::warn!("failed to send message: {:#?}", e)
})
.map(|_| false)
.unwrap_or(true);
if build_ended || send_fail {
tracing::info!("{}: Build over", session_uuid);
tracing::info!("Build over");
return;
}
}
Err(err) => {
tracing::error!("{}: Error: {}", session_uuid, err);
// Tonic gives us this scary message for disconnects. It's really just a disconnect.
if err.message().contains("error reading a body from connection: stream closed because of a broken pipe") {
tracing::warn!("Client closed stream. Closing session.")
} else {
tracing::error!("Error: {}", err);
}
if !session_uuid.is_empty() {
let _ = unexpected_cleanup_session(
global.db_manager.as_ref(),
&session_uuid,
)
.map_err(|e| {
tracing::error!("{session_uuid}: error closing stream: {e:#?}")
tracing::error!("error closing stream: {e:#?}")
});
}
drop(tx);
return;
}
}
}
});
}.instrument(span!(Level::INFO, "bep_grpc_stream", "session_uuid" = tracing::field::Empty)));
let out_stream = ReceiverStream::new(rx);
return Ok(Response::new(out_stream));
}
}

#[instrument(skip_all)]
pub async fn run_bes_grpc(
host: SocketAddr,
state: Arc<state::Global>,
print_message_re: &str,
print_message_re: Arc<Mutex<Regex>>,
) -> Result<()> {
let reflect = tonic_reflection::server::Builder::configure()
.register_file_descriptor_set(*proto_registry::DESCRIPTORS.clone())
.build()?;
let mut handlers: Vec<Box<dyn EventHandler + Sync + Send>> = vec![
let handlers: Vec<Box<dyn EventHandler + Sync + Send>> = vec![
Box::new(progress::Handler {}),
Box::new(target::Handler {}),
Box::new(buildinfo::Handler {}),
Box::new(options::Handler {}),
Box::new(print_event::Handler {
message_re: print_message_re,
}),
];
if !print_message_re.is_empty() {
handlers.push(Box::new(print_event::Handler {
message_re: regex::Regex::new(print_message_re).unwrap(),
}));
}
proto_registry::init_global_descriptor_pool()?;
let server = BuildEventService {
state,
Expand All @@ -305,3 +319,42 @@ pub async fn run_bes_grpc(
.await
.context("error starting grpc server")
}

pub struct RegexHandle {
enabled: AtomicBool,
regex: Mutex<regex::Regex>,
}

impl RegexHandle {
pub fn new(re: &str) -> anyhow::Result<Self> {
Ok(Self {
enabled: AtomicBool::new(!re.is_empty()),
regex: Mutex::new(regex::Regex::new(re)?),
})
}

pub fn enabled(&self) -> bool {
self.enabled.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn update_regex(&mut self, re: &str) -> anyhow::Result<()> {
if re.is_empty() {
self.enabled
.store(false, std::sync::atomic::Ordering::Relaxed);
return Ok(());
}
let mut r = self.regex.lock().unwrap();
*r = regex::Regex::new(re)?;
self.enabled
.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

pub fn is_match(&self, text: &str) -> bool {
if !self.enabled.load(std::sync::atomic::Ordering::Relaxed) {
return false;
}
let r = self.regex.lock().unwrap();
r.is_match(text)
}
}
19 changes: 11 additions & 8 deletions blade/bep/print_event.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use std::sync::{Arc, Mutex};

use build_event_stream_proto::build_event_stream;
use prost_reflect::ReflectMessage;
use regex::Regex;
use state::DBManager;
pub(crate) struct Handler {
pub message_re: Regex,
pub message_re: Arc<Mutex<Regex>>,
}

impl crate::EventHandler for Handler {
fn handle_event(
&self,
_db_mgr: &dyn DBManager,
invocation_id: &str,
_invocation_id: &str,
event: &build_event_stream::BuildEvent,
) -> anyhow::Result<()> {
let re = self.message_re.lock().unwrap();
if re.as_str().is_empty() {
return Ok(());
}
let desc = event.descriptor();
let dm = event.transcode_to_dynamic();
let oneof = match desc.oneofs().next() {
Expand All @@ -22,13 +28,10 @@ impl crate::EventHandler for Handler {
Some(o) => o,
};
let _ = oneof.fields().try_for_each(|f| {
if dm.has_field(&f)
&& self
.message_re
.is_match(f.field_descriptor_proto().type_name())
{
if dm.has_field(&f) && re.is_match(f.field_descriptor_proto().type_name()) {
let type_name = f.field_descriptor_proto().name();
let j = serde_json::ser::to_string(&dm).map_err(|_| ())?;
tracing::info!("{}: {}", invocation_id, j);
tracing::info!(type_name, "{}", j);
return Err(());
}
Ok(())
Expand Down
6 changes: 4 additions & 2 deletions blade/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Write;
use std::net::SocketAddr;
use std::sync::Mutex;

use actix_web::body::MessageBody;
use actix_web::dev::ServiceResponse;
Expand Down Expand Up @@ -258,9 +259,10 @@ cfg_if! {
.disable_signals()
.bind(&addr)?
.run();
let fut2 = bep::run_bes_grpc(args.grpc_host, state, &args.debug_message_pattern);
let re_handle = Arc::new(Mutex::new(regex::Regex::new(&args.debug_message_pattern)?));
let fut2 = bep::run_bes_grpc(args.grpc_host, state, re_handle.clone());
let fut3 = periodic_cleanup(cleanup_state);
let fut4 = admin::run_admin_server(args.admin_host, filter_tx, span_tx);
let fut4 = admin::run_admin_server(args.admin_host, filter_tx, span_tx, re_handle);

let res = join!(fut1, fut2, fut3, fut4, set_filter_fut, set_span_fut);
if res.0.is_ok() && res.1.is_ok() {
Expand Down

0 comments on commit 2ec9bd2

Please sign in to comment.