@@ -1064,12 +1064,18 @@ pub mod server {
1064
1064
. http2_keepalive_timeout ( Some ( std:: time:: Duration :: from_secs (
1065
1065
config. keepalive_timeout ,
1066
1066
) ) )
1067
- . layer ( log_layer)
1068
- . layer ( tower:: limit:: ConcurrencyLimitLayer :: new ( config. max_concurrent_requests ) )
1069
- // Note: the in-flight request layer applies after the limit layer. This is what we want so that the
1070
- // metric reflects the actual number of in-flight requests.
1067
+ // Note: the in-flight request layer applies first here. Since we are using a load-shed
1068
+ // layer just below this corresponds very directly to the number of requests being actually handled.
1069
+ // The technical reason for this is that we cannot really stack the in flight requests layer
1070
+ // below the stats layer since we want to transform some `Err` responses in the stats layer
1071
+ // to Ok responses with a meaningful gRPC status code,
1072
+ // but since the in flight request layer adds a guard to count in-flight requests this would
1073
+ // mean we'd have to construct such a guard in the response, which is not possible.
1071
1074
. layer ( in_flight_request_layer)
1072
- . layer ( stats_layer) ;
1075
+ . layer ( stats_layer)
1076
+ . layer ( tower:: load_shed:: LoadShedLayer :: new ( ) )
1077
+ . layer ( tower:: limit:: ConcurrencyLimitLayer :: new ( config. max_concurrent_requests ) )
1078
+ . layer ( log_layer) ;
1073
1079
if let Some ( identity) = identity {
1074
1080
builder = builder
1075
1081
. tls_config ( ServerTlsConfig :: new ( ) . identity ( identity) )
@@ -2560,19 +2566,21 @@ fn get_grpc_code_label(code: tonic::Code) -> &'static str {
2560
2566
/// Actual middleware implementation updating the stats.
2561
2567
/// The middleware is called once for each gRPC request, even for the streaming
2562
2568
/// gRPC methods.
2563
- impl < S > tower:: Service < hyper:: Request < hyper:: Body > > for StatsMiddleware < S >
2569
+ impl < S , Body : hyper:: body:: HttpBody + Unpin + Send + Default >
2570
+ tower:: Service < hyper:: Request < hyper:: Body > > for StatsMiddleware < S >
2564
2571
where
2565
2572
S : tower:: Service <
2566
2573
hyper:: Request < hyper:: Body > ,
2567
- Response = hyper:: Response < tonic:: body:: BoxBody > ,
2574
+ Response = hyper:: Response < Body > ,
2575
+ Error = tower:: BoxError ,
2568
2576
> + Clone
2569
2577
+ Send
2570
2578
+ ' static ,
2571
2579
S :: Future : Send + ' static ,
2572
2580
{
2573
- type Error = S :: Error ;
2581
+ type Error = tower :: BoxError ;
2574
2582
type Future = futures:: future:: BoxFuture < ' static , Result < Self :: Response , Self :: Error > > ;
2575
- type Response = S :: Response ;
2583
+ type Response = hyper :: Response < Body > ;
2576
2584
2577
2585
fn poll_ready (
2578
2586
& mut self ,
@@ -2599,15 +2607,30 @@ where
2599
2607
// Time taken for the inner service to send back a response, meaning for
2600
2608
// streaming gRPC methods this is the duration for it to first return a stream.
2601
2609
let duration = request_received. elapsed ( ) . as_secs_f64 ( ) ;
2602
- if result. is_err ( ) {
2603
- grpc_request_duration
2604
- . with_label_values ( & [
2605
- endpoint_name. as_str ( ) ,
2606
- get_grpc_code_label ( tonic:: Code :: Internal ) ,
2607
- ] )
2608
- . observe ( duration) ;
2610
+ match result {
2611
+ Err ( e) => {
2612
+ // If the load shed service terminated the request this will be signalled as
2613
+ // an Err(Overloaded). So record resource exhaustion
2614
+ // in the metrics.
2615
+ let ( code, response) = if e. is :: < tower:: load_shed:: error:: Overloaded > ( ) {
2616
+ // return a response with empty body of the correct type. `to_http`
2617
+ // constructs a response with a `BoxBody` but
2618
+ // here we need a more general one to make the service generic enough.
2619
+ let new_response =
2620
+ tonic:: Status :: resource_exhausted ( "Too many concurrent requests." )
2621
+ . to_http ( )
2622
+ . map ( |_| Default :: default ( ) ) ;
2623
+ ( tonic:: Code :: ResourceExhausted , Ok ( new_response) )
2624
+ } else {
2625
+ ( tonic:: Code :: Internal , Err ( e) )
2626
+ } ;
2627
+ grpc_request_duration
2628
+ . with_label_values ( & [ endpoint_name. as_str ( ) , get_grpc_code_label ( code) ] )
2629
+ . observe ( duration) ;
2630
+ return response;
2631
+ }
2632
+ Ok ( result) => ( result, duration) ,
2609
2633
}
2610
- ( result?, duration)
2611
2634
} ;
2612
2635
2613
2636
// Check if the gRPC status header is part of the HTTP headers, if not check for
0 commit comments