Skip to content

Commit

Permalink
update http meta route confi
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 2722cf6da6fd907d26063168662865ff10d7723e
  • Loading branch information
gautamg795 authored and Convex, Inc. committed Aug 30, 2024
1 parent 1f5b3dc commit dcefdc7
Showing 1 changed file with 55 additions and 31 deletions.
86 changes: 55 additions & 31 deletions crates/common/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use axum::{
BoxError,
RequestPartsExt,
Router,
ServiceExt,
};
use errors::{
ErrorMetadata,
Expand Down Expand Up @@ -581,7 +582,9 @@ impl RouteMapper for NoopRouteMapper {
/// Router + Middleware for a Convex service
pub struct ConvexHttpService {
router: Router,
meta_routes_enabled: bool,
version: String,
service_name: &'static str,
_concurrency_gauge: Option<PullingGauge>,
}

Expand All @@ -594,10 +597,16 @@ impl ConvexHttpService {
request_timeout: Duration,
route_metric_mapper: RM,
) -> Self {
let sentry_layer = ServiceBuilder::new()
.layer(sentry_tower::NewSentryLayer::<_>::new_from_top())
.layer(sentry_tower::SentryHttpLayer::new());
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrency));
let semaphore_ = semaphore.clone();
let concurrency_gauge = PullingGauge::new(
format!("{service_name}_http_service_concurrent_requests"),
format!(
"{}_http_service_concurrent_requests",
service_name.replace('-', "_")
),
"The number of currently outstanding requests on the ConvexHttpService",
Box::new(move || (max_concurrency - semaphore_.available_permits()) as f64),
)
Expand All @@ -606,8 +615,9 @@ impl ConvexHttpService {
tracing::error!("Failed to register request concurrency gauge for {service_name}: {e}");
}

let router = router.layer(
ServiceBuilder::new()
let router = router
.layer(
ServiceBuilder::new()
// Order important. Log/stats first because they are infallible.
.layer(axum::middleware::from_fn(log_middleware))
.layer(axum::middleware::from_fn_with_state(
Expand All @@ -621,17 +631,24 @@ impl ConvexHttpService {
StatusCode::REQUEST_TIMEOUT
}))
.layer(TimeoutLayer::new(request_timeout)),
);
)
.layer(sentry_layer);

Self {
router,
version,
_concurrency_gauge: Some(concurrency_gauge),
service_name,
meta_routes_enabled: true,
}
}

pub fn set_meta_routes_enabled(&mut self, enabled: bool) {
self.meta_routes_enabled = enabled;
}

/// Routes not handled by the passed-in router.
fn extra_routes(&self) -> Router {
fn meta_routes(&self) -> Router {
let version = self.version.clone();
Router::new()
.route("/version", get(move || async move { version }))
Expand All @@ -643,22 +660,18 @@ impl ConvexHttpService {
addr: SocketAddr,
shutdown: F,
) -> anyhow::Result<()> {
let extra = self.extra_routes();
let sentry_layer = ServiceBuilder::new()
.layer(sentry_tower::NewSentryLayer::<_>::new_from_top())
.layer(sentry_tower::SentryHttpLayer::new());

let make_svc = self
.router
.merge(extra)
.layer(sentry_layer)
.into_make_service_with_connect_info::<SocketAddr>();
let extra = self.meta_routes();
let mut router = self.router;
if self.meta_routes_enabled {
router = router.merge(extra);
}
let make_svc = router.into_make_service_with_connect_info::<SocketAddr>();
tracing::info!("{} listening on {addr}", self.service_name);
serve_http(make_svc, addr, shutdown).await
}

/// Apply `middleware_fn` to incoming requests *before* passing them to
/// the router. Middleware will not apply to internal routes like `/version`
/// and `/metrics`. Because the middleware is applied before routing, it is
/// the router. Because the middleware is applied before routing, it is
/// allowed to change the request URI and affect which route will be
/// matched.
pub async fn serve_with_middleware<F, Fut, Rejection>(
Expand All @@ -672,27 +685,40 @@ impl ConvexHttpService {
Fut: Future<Output = Result<http::Request<Body>, Rejection>> + Send + 'static,
Rejection: IntoResponse + Send + 'static,
{
let sentry_layer = ServiceBuilder::new()
.layer(sentry_tower::NewSentryLayer::<_>::new_from_top())
.layer(sentry_tower::SentryHttpLayer::new());

let outer_router = self.extra_routes();
let middleware = axum::middleware::map_request(middleware_fn);
let meta_router = self.meta_routes();
let wrapped_svc = middleware.layer(self.router);
// Fall back to the middleware-wrapped service if the request doesn't match the
// outer router.
let router = outer_router
.fallback_service(wrapped_svc)
.layer(sentry_layer);
let make_svc = router.into_make_service_with_connect_info::<SocketAddr>();
serve_http(make_svc, addr, shutdown).await

tracing::info!("{} listening on {addr}", self.service_name);
if self.meta_routes_enabled {
// Fall back to the middleware-wrapped service if the request doesn't match the
// meta router.
serve_http(
meta_router
.fallback_service(wrapped_svc)
.into_make_service_with_connect_info::<SocketAddr>(),
addr,
shutdown,
)
.await
} else {
// If we're not serving meta routes, simply serve the middleware-wrapped service
serve_http(
wrapped_svc.into_make_service_with_connect_info::<SocketAddr>(),
addr,
shutdown,
)
.await
}
}

#[cfg(any(test, feature = "testing"))]
pub fn new_for_test(router: Router) -> Self {
Self {
router,
version: String::new(),
meta_routes_enabled: true,
service_name: "test-service",
_concurrency_gauge: None,
}
}
Expand Down Expand Up @@ -726,8 +752,6 @@ where
socket.bind(addr)?;
let listener = socket.listen(*HTTP_SERVER_TCP_BACKLOG)?;

tracing::info!("Listening on http://{}", addr);

fork_of_axum_serve::serve(listener, make_service)
.with_graceful_shutdown(shutdown)
.await?;
Expand Down

0 comments on commit dcefdc7

Please sign in to comment.