Skip to content

Commit

Permalink
Add request tracing middleware. (#24427)
Browse files Browse the repository at this point in the history
* Add request_id extractor.
* Set the request_id to sentry scope via middleware.
* Enable tracing based on middleware.

GitOrigin-RevId: 8d891352864a3fc599a3ebdcd59fad0d83b15d83
  • Loading branch information
Preslav Le authored and Convex, Inc. committed Apr 8, 2024
1 parent aa99e12 commit e0f2ff2
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 48 deletions.
4 changes: 4 additions & 0 deletions crates/common/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl RequestId {
hash.truncate(16);
Self(hash)
}

pub fn as_str(&self) -> &str {
self.0.as_str()
}
}

impl FromStr for RequestId {
Expand Down
65 changes: 62 additions & 3 deletions crates/common/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use std::{
future::Future,
net::SocketAddr,
pin::Pin,
str::{
self,
},
str,
sync::LazyLock,
time::{
Duration,
Expand Down Expand Up @@ -70,6 +68,7 @@ use http::{
};
use hyper::server::conn::AddrIncoming;
use itertools::Itertools;
use maplit::btreemap;
use prometheus::TextEncoder;
use sentry::integrations::tower as sentry_tower;
use serde::{
Expand All @@ -94,11 +93,13 @@ use crate::{
errors::report_error,
knobs::HTTP_SERVER_TCP_BACKLOG,
metrics::log_client_version_unsupported,
minitrace_helpers::get_sampled_span,
version::{
ClientVersion,
ClientVersionState,
COMPILED_REVISION,
},
RequestId,
};

pub mod extract;
Expand Down Expand Up @@ -707,6 +708,7 @@ async fn client_version_state_middleware(
pub async fn stats_middleware<RM: RouteMapper>(
State(route_metric_mapper): State<RM>,
matched_path: Option<axum::extract::MatchedPath>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractClientVersion(client_version): ExtractClientVersion,
req: http::request::Request<Body>,
next: axum::middleware::Next<Body>,
Expand All @@ -725,6 +727,18 @@ pub async fn stats_middleware<RM: RouteMapper>(

let route = route_metric_mapper.map_route(route);

// Configure tracing.
let mut rng = rand::thread_rng();
let root = get_sampled_span(
route.clone(),
&mut rng,
btreemap!["request_id".to_owned() => request_id.to_string()],
);
let _guard = root.set_local_parent();

// Add the request_id to sentry
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));

log_http_request(
&client_version_s,
&route,
Expand Down Expand Up @@ -804,6 +818,51 @@ where
}
}

#[allow(clippy::declare_interior_mutable_const)]
pub const CONVEX_REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("convex-request-id");

pub struct ExtractRequestId(pub RequestId);

async fn request_id_from_req_parts(
parts: &mut axum::http::request::Parts,
) -> anyhow::Result<RequestId> {
if let Some(request_id_header) = parts
.headers
.get(CONVEX_REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
{
request_id_header.parse::<RequestId>()
} else {
// Generate a new request_id
let request_id = RequestId::new();
parts
.headers
.insert(CONVEX_REQUEST_ID_HEADER, request_id.as_str().parse()?);
Ok(request_id)
}
}

#[async_trait]
impl<S> FromRequestParts<S> for ExtractRequestId
where
S: Send + Sync,
{
type Rejection = HttpResponseError;

async fn from_request_parts(
parts: &mut axum::http::request::Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
let request_id = request_id_from_req_parts(parts).await.map_err(|e| {
anyhow::anyhow!(ErrorMetadata::bad_request(
"InvalidRequestId",
e.to_string(),
))
})?;
Ok(Self(request_id))
}
}

async fn log_middleware<B: Send>(
remote_addr: Option<axum::extract::ConnectInfo<SocketAddr>>,
req: http::request::Request<B>,
Expand Down
20 changes: 9 additions & 11 deletions crates/common/src/minitrace_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::collections::BTreeMap;
use std::{
borrow::Cow,
collections::BTreeMap,
};

use minitrace::{
collector::SpanContext,
Span,
};
use rand::Rng;

use crate::{
knobs::REQUEST_TRACE_SAMPLE_PERCENT,
runtime::Runtime,
};
use crate::knobs::REQUEST_TRACE_SAMPLE_PERCENT;

#[derive(Clone)]
pub struct EncodedSpan(pub Option<String>);
Expand All @@ -27,14 +27,12 @@ impl EncodedSpan {

/// Given an instance name returns a span with the sample percentage specified
/// in `knobs.rs`
pub fn get_sampled_span<RT: Runtime>(
request_name: String,
rt: RT,
pub fn get_sampled_span<R: Rng>(
request_name: impl Into<Cow<'static, str>>,
rng: &mut R,
properties: BTreeMap<String, String>,
) -> Span {
let should_sample = rt
.clone()
.with_rng(|rng| rng.gen_bool(*REQUEST_TRACE_SAMPLE_PERCENT));
let should_sample = rng.gen_bool(*REQUEST_TRACE_SAMPLE_PERCENT);
match should_sample {
true => Span::root(request_name, SpanContext::random()).with_properties(|| properties),
false => Span::noop(),
Expand Down
11 changes: 5 additions & 6 deletions crates/local_backend/src/http_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use axum::{
RequestExt,
};
use common::{
http::HttpResponseError,
http::{
ExtractRequestId,
HttpResponseError,
},
types::FunctionCaller,
RequestId,
};
use futures::TryStreamExt;
use http::{
Expand Down Expand Up @@ -87,6 +89,7 @@ impl FromRequest<LocalAppState, axum::body::Body> for ExtractHttpRequestMetadata
pub async fn http_any_method(
State(st): State<LocalAppState>,
TryExtractIdentity(identity_result): TryExtractIdentity,
ExtractRequestId(request_id): ExtractRequestId,
ExtractHttpRequestMetadata(http_request_metadata): ExtractHttpRequestMetadata,
) -> Result<impl IntoResponse, HttpResponseError> {
// All HTTP actions run the default export of the http.js path.
Expand All @@ -97,10 +100,6 @@ pub async fn http_any_method(
// to go through if the header does not seem to specify Convex auth.
let identity = identity_result.unwrap_or(Identity::Unknown);

// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));

let udf_return = st
.application
.http_action_udf(
Expand Down
27 changes: 7 additions & 20 deletions crates/local_backend/src/public_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common::{
Query,
},
ExtractClientVersion,
ExtractRequestId,
HttpResponseError,
},
pause::PauseClient,
Expand All @@ -25,7 +26,6 @@ use common::{
FunctionCaller,
},
version::ClientVersion,
RequestId,
};
use errors::ErrorMetadata;
use isolate::UdfArgsJson;
Expand Down Expand Up @@ -137,6 +137,7 @@ impl UdfResponse {
/// Executes an arbitrary query/mutation/action from its name.
pub async fn public_function_post(
State(st): State<LocalAppState>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
Json(req): Json<UdfPostRequest>,
Expand All @@ -147,10 +148,6 @@ pub async fn public_function_post(
return Result::Err(anyhow!(bad_admin_key_error(Some(st.instance_name.clone()))).into());
}

// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));

let udf_path = parse_udf_path(&req.path)?;
let udf_result = st
.application
Expand Down Expand Up @@ -205,16 +202,14 @@ pub fn export_value(
pub async fn public_query_get(
State(st): State<LocalAppState>,
Query(req): Query<UdfArgsQuery>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
) -> Result<impl IntoResponse, HttpResponseError> {
let udf_path = req.path.parse().context(ErrorMetadata::bad_request(
"InvalidConvexFunction",
format!("Failed to parse Convex function path: {}", req.path),
))?;
// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));
let args = req.args.into_arg_vec();
let udf_return = st
.application
Expand Down Expand Up @@ -242,6 +237,7 @@ pub async fn public_query_get(

pub async fn public_query_post(
State(st): State<LocalAppState>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
Json(req): Json<UdfPostRequest>,
Expand All @@ -250,9 +246,6 @@ pub async fn public_query_post(
"InvalidConvexFunction",
format!("Failed to parse Convex function path: {}", req.path),
))?;
// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));
let udf_return = st
.application
.read_only_udf(
Expand Down Expand Up @@ -289,15 +282,13 @@ pub struct QueryBatchResponse {

pub async fn public_query_batch_post(
State(st): State<LocalAppState>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
Json(req_batch): Json<QueryBatchArgs>,
) -> Result<impl IntoResponse, HttpResponseError> {
let mut results = vec![];
let ts = *st.application.now_ts_for_reads();
// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));
for req in req_batch.queries {
let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?;
let udf_path = parse_udf_path(&req.path)?;
Expand Down Expand Up @@ -333,14 +324,12 @@ pub async fn public_query_batch_post(

pub async fn public_mutation_post(
State(st): State<LocalAppState>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
Json(req): Json<UdfPostRequest>,
) -> Result<impl IntoResponse, HttpResponseError> {
let udf_path = parse_udf_path(&req.path)?;
// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));
let udf_result = st
.application
.mutation_udf(
Expand Down Expand Up @@ -372,13 +361,11 @@ pub async fn public_mutation_post(

pub async fn public_action_post(
State(st): State<LocalAppState>,
ExtractRequestId(request_id): ExtractRequestId,
ExtractIdentity(identity): ExtractIdentity,
ExtractClientVersion(client_version): ExtractClientVersion,
Json(req): Json<UdfPostRequest>,
) -> Result<impl IntoResponse, HttpResponseError> {
// TODO: Move generating request_id and configuring sentry axum middleware.
let request_id = RequestId::new();
sentry::configure_scope(|scope| scope.set_tag("request_id", request_id.clone()));
let udf_path = parse_udf_path(&req.path)?;
let action_result = st
.application
Expand Down
18 changes: 10 additions & 8 deletions crates/sync/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,16 @@ impl<RT: Runtime> SyncWorker<RT> {
Some(id) => RequestId::new_for_ws_session(id, request_id),
None => RequestId::new(),
};
let root = get_sampled_span(
"sync-worker/mutation".into(),
self.rt.clone(),
btreemap! {
"udf_type".into() => UdfType::Mutation.to_lowercase_string().into(),
"udf_path".into() => udf_path.clone().into(),
},
);
let root = self.rt.with_rng(|rng| {
get_sampled_span(
"sync-worker/mutation",
rng,
btreemap! {
"udf_type".into() => UdfType::Mutation.to_lowercase_string().into(),
"udf_path".into() => udf_path.clone().into(),
},
)
});
let rt = self.rt.clone();
let client_version = self.config.client_version.clone();
let timer = mutation_queue_timer();
Expand Down

0 comments on commit e0f2ff2

Please sign in to comment.