Skip to content

Commit

Permalink
add metrics to the bep server
Browse files Browse the repository at this point in the history
  • Loading branch information
DolceTriade committed Mar 9, 2024
1 parent 2ec9bd2 commit 09b55e1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
1 change: 1 addition & 0 deletions blade/bep/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rust_library(
"@crate//:prost",
"@crate//:prost-reflect",
"@crate//:prost-types",
"@crate//:prometheus-client",
"@crate//:scopeguard",
"@crate//:serde",
"@crate//:serde_json",
Expand Down
88 changes: 51 additions & 37 deletions blade/bep/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ use anyhow::Context;
use anyhow::Result;
use build_event_stream_proto::*;
use build_proto::google::devtools::build::v1::*;
use lazy_static::lazy_static;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::encoding::EncodeLabelValue;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prost::Message;
use regex::Regex;
use scopeguard::defer;
use state::DBManager;
use std::fmt::Write;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
Expand All @@ -23,6 +31,13 @@ mod print_event;
mod progress;
mod target;

lazy_static! {
static ref TOTAL_STREAMS: Counter::<u64> = metrics::register_metric("blade_bep_streams", "Total number of streams", Counter::default());
static ref TOTAL_STREAMS_ERRORS: Family::<ErrorLabels, Counter> = metrics::register_metric("blade_bep_stream_errors", "Total number of stream errors", Family::default());
static ref MESSAGE_HANDLER_ERRORS: Counter::<u64> = metrics::register_metric("blade_bep_message_handler_errors", "Total number of errors returned by the message handlers", Counter::default());
static ref ACTIVE_STREAMS: Gauge::<u32, AtomicU32> = metrics::register_metric("blade_bep_active_streams", "Total number of active streams", Gauge::default());
}

trait EventHandler {
fn handle_event(
&self,
Expand Down Expand Up @@ -104,6 +119,11 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
tokio::spawn(async move {
let mut session_uuid = "".to_string();
let span = tracing::span::Span::current();
TOTAL_STREAMS.inc();
ACTIVE_STREAMS.inc();
defer! {
ACTIVE_STREAMS.dec();
}
loop {
let maybe_message = in_stream
.message()
Expand All @@ -123,6 +143,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
let stream_id_or = obe.stream_id.as_ref();
if stream_id_or.is_none() {
tracing::warn!("missing stream id");
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: tonic::Code::InvalidArgument.into() }).inc();
return;
}
let Some(uuid) = stream_id_or.map(|id| id.invocation_id.clone()) else {
Expand Down Expand Up @@ -156,6 +177,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
)))
.await
.ok();
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: tonic::Code::FailedPrecondition.into() }).inc();
return;
}

Expand All @@ -180,10 +202,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
build_event::Event::BazelEvent(any) => {
let be_or = build_event_stream::BuildEvent::decode(&any.value[..]);
if be_or.is_err() {
tracing::error!(
"invalid event: {:#?}",
be_or.unwrap_err()
);
let err_str = format!("invalid event: {:#?}", be_or.unwrap_err());
{
let _ = unexpected_cleanup_session(
global.db_manager.as_ref(),
Expand All @@ -195,6 +214,13 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
)
});
}
tracing::error!("{}", err_str);
let _ = tx
.send(Err(Status::new(tonic::Code::InvalidArgument, err_str)))
.await
.ok();
drop(tx);
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: tonic::Code::InvalidArgument.into() }).inc();
return;
}
let Ok(be) = be_or else {
Expand Down Expand Up @@ -223,6 +249,9 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
"{session_uuid}: error closing stream: {e:#?}"
)
});
if !success {
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: tonic::Code::Unknown.into() }).inc();
}
}
Some(_) => {
for v in &*handlers {
Expand All @@ -232,6 +261,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
&be,
) {
tracing::warn!("{:#?}", e);
MESSAGE_HANDLER_ERRORS.inc();
}
}
}
Expand Down Expand Up @@ -262,9 +292,11 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
Err(err) => {
// Tonic gives us this scary message for disconnects. It's really just a disconnect.
if err.message().contains("error reading a body from connection: stream closed because of a broken pipe") {
tracing::warn!("Client closed stream. Closing session.")
tracing::warn!("Client closed stream. Closing session.");
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: tonic::Code::Aborted.into() }).inc();
} else {
tracing::error!("Error: {}", err);
TOTAL_STREAMS_ERRORS.get_or_create(&ErrorLabels { code: err.code().into() }).inc();
}
if !session_uuid.is_empty() {
let _ = unexpected_cleanup_session(
Expand All @@ -275,6 +307,7 @@ impl publish_build_event_server::PublishBuildEvent for BuildEventService {
tracing::error!("error closing stream: {e:#?}")
});
}

drop(tx);
return;
}
Expand Down Expand Up @@ -320,41 +353,22 @@ pub async fn run_bes_grpc(
.context("error starting grpc server")
}

pub struct RegexHandle {
enabled: AtomicBool,
regex: Mutex<regex::Regex>,
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct ErrorLabels {
code: StatusCode,
}

impl RegexHandle {
pub fn new(re: &str) -> anyhow::Result<Self> {
Ok(Self {
enabled: AtomicBool::new(!re.is_empty()),
regex: Mutex::new(regex::Regex::new(re)?),
})
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct StatusCode(tonic::Code);

pub fn enabled(&self) -> bool {
self.enabled.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn update_regex(&mut self, re: &str) -> anyhow::Result<()> {
if re.is_empty() {
self.enabled
.store(false, std::sync::atomic::Ordering::Relaxed);
return Ok(());
}
let mut r = self.regex.lock().unwrap();
*r = regex::Regex::new(re)?;
self.enabled
.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
impl EncodeLabelValue for StatusCode {
fn encode(&self, encoder: &mut prometheus_client::encoding::LabelValueEncoder) -> std::prelude::v1::Result<(), std::fmt::Error> {
encoder.write_str(&format!("{:#?}", self.0))
}
}

pub fn is_match(&self, text: &str) -> bool {
if !self.enabled.load(std::sync::atomic::Ordering::Relaxed) {
return false;
}
let r = self.regex.lock().unwrap();
r.is_match(text)
impl From<tonic::Code> for StatusCode {
fn from(value: tonic::Code) -> Self {
Self(value)
}
}

0 comments on commit 09b55e1

Please sign in to comment.