From 53cbd0ef5555ce2d851033e5ad60d774b5482dc6 Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 15 Jun 2024 18:42:37 +0900 Subject: [PATCH] chore(transport): Use hyper_util::service::TowerToHyperService (#1729) --- tonic/src/transport/server/mod.rs | 73 ++++--------------------------- 1 file changed, 9 insertions(+), 64 deletions(-) diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 0bd865557..730551494 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -16,7 +16,10 @@ use tracing::{debug, trace}; pub use crate::service::{Routes, RoutesBuilder}; pub use conn::{Connected, TcpConnectInfo}; -use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; +use hyper_util::{ + rt::{TokioExecutor, TokioIo, TokioTimer}, + service::TowerToHyperService, +}; #[cfg(feature = "tls")] pub use tls::ServerTlsConfig; @@ -63,7 +66,7 @@ use tower::{ layer::util::{Identity, Stack}, layer::Layer, limit::concurrency::ConcurrencyLimitLayer, - util::{BoxCloneService, Either, Oneshot}, + util::{BoxCloneService, Either}, Service, ServiceBuilder, ServiceExt, }; @@ -595,7 +598,9 @@ impl Server { let req_svc = svc .call(&io) .await - .map_err(super::Error::from_source)?; + .map_err(super::Error::from_source)? + .map_request(|req: Request| req.map(boxed)); + let hyper_svc = TowerToHyperService::new(req_svc); serve_connection(io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone())); @@ -627,7 +632,7 @@ fn serve_connection( builder: ConnectionBuilder, mut watcher: Option>, ) where - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, @@ -663,66 +668,6 @@ fn serve_connection( type ConnectionBuilder = hyper_util::server::conn::auto::Builder; -/// An adaptor which converts a [`tower::Service`] to a [`hyper::service::Service`]. -/// -/// The [`hyper::service::Service`] trait is used by hyper to handle incoming requests, -/// and does not support the `poll_ready` method that is used by tower services. -#[derive(Debug, Copy, Clone)] -pub(crate) struct TowerToHyperService { - service: S, -} - -impl TowerToHyperService { - /// Create a new `TowerToHyperService` from a tower service. - pub(crate) fn new(service: S) -> Self { - Self { service } - } -} - -impl hyper::service::Service> for TowerToHyperService -where - S: tower_service::Service> + Clone, - S::Error: Into + 'static, -{ - type Response = S::Response; - type Error = super::Error; - type Future = TowerToHyperServiceFuture>; - - fn call(&self, req: Request) -> Self::Future { - let req = req.map(crate::body::boxed); - TowerToHyperServiceFuture { - future: self.service.clone().oneshot(req), - } - } -} - -/// Future returned by [`TowerToHyperService`]. -#[derive(Debug)] -#[pin_project] -pub(crate) struct TowerToHyperServiceFuture -where - S: tower_service::Service, -{ - #[pin] - future: Oneshot, -} - -impl Future for TowerToHyperServiceFuture -where - S: tower_service::Service, - S::Error: Into + 'static, -{ - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project() - .future - .poll(cx) - .map_err(super::Error::from_source) - } -} - impl Router { pub(crate) fn new(server: Server, routes: Routes) -> Self { Self { server, routes }