Skip to content

Commit aa21f11

Browse files
authored
[Ftr] Add Builder for DubboClient (apache#89)
* Rft(dubbo): add ClientBuilder for client * Rftï(dubbo-build): add build api for client * style(examples): cargo fmt * Rft: move connection from client to transport mod * Rft(dubbo): add default timeout for client
1 parent a863be4 commit aa21f11

File tree

12 files changed

+143
-29
lines changed

12 files changed

+143
-29
lines changed

dubbo-build/src/client.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,29 @@ pub fn generate<T: Service>(
7070
inner: TripleClient<T>,
7171
}
7272

73-
impl #service_ident<Connection> {
73+
impl #service_ident<ClientBoxService> {
7474
pub fn connect(host: String) -> Self {
7575
let cli = TripleClient::connect(host);
7676
#service_ident {
7777
inner: cli,
7878
}
7979
}
80+
81+
pub fn build(builder: ClientBuilder) -> Self {
82+
Self {
83+
inner: TripleClient::with_builder(builder),
84+
}
85+
}
8086
}
8187

8288
impl<T> #service_ident<T>
8389
where
8490
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
8591
T::Error: Into<StdError>,
8692
{
87-
pub fn new(inner: T) -> Self {
93+
pub fn new(inner: T, builder: ClientBuilder) -> Self {
8894
Self {
89-
inner: TripleClient::new(inner, None),
95+
inner: TripleClient::new(inner, builder),
9096
}
9197
}
9298

dubbo/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ hyper = { version = "0.14.19", features = ["full"]}
1414
http = "0.2"
1515
tower-service = "0.3.1"
1616
http-body = "0.4.4"
17-
tower = "0.4.12"
17+
tower = { version = "0.4.12", features = ["timeout"]}
1818
futures-util = "0.3.23"
1919
futures-core ="0.3.23"
2020
tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] }

dubbo/src/codegen.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ pub use super::triple::server::TripleServer;
3838
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
3939
pub use crate::filter::service::FilterService;
4040
pub use crate::filter::Filter;
41-
pub use crate::triple::client::connection::Connection;
41+
pub use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
42+
pub use crate::triple::transport::connection::Connection;

dubbo/src/protocol/triple/triple_invoker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use tower_service::Service;
2121

2222
use crate::common::url::Url;
2323
use crate::protocol::Invoker;
24-
use crate::triple::client::connection::Connection;
24+
use crate::triple::transport::connection::Connection;
2525

2626
#[allow(dead_code)]
2727
#[derive(Clone, Default)]

dubbo/src/triple/client/builder.rs

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use http::Uri;
19+
use hyper::client::conn::Builder;
20+
use tokio::time::Duration;
21+
use tower::ServiceBuilder;
22+
23+
use crate::triple::transport::connection::Connection;
24+
use crate::utils::boxed::BoxService;
25+
26+
pub type ClientBoxService =
27+
BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>, crate::Error>;
28+
29+
#[derive(Clone, Debug, Default)]
30+
pub struct ClientBuilder {
31+
pub uri: Uri,
32+
pub timeout: Option<u64>,
33+
pub connector: &'static str,
34+
}
35+
36+
impl ClientBuilder {
37+
pub fn new() -> ClientBuilder {
38+
ClientBuilder {
39+
uri: Uri::builder().build().unwrap(),
40+
timeout: None,
41+
connector: "",
42+
}
43+
}
44+
45+
pub fn from_static(s: &'static str) -> ClientBuilder {
46+
Self::from(Uri::from_static(s))
47+
}
48+
49+
pub fn with_timeout(self, timeout: u64) -> Self {
50+
Self {
51+
timeout: Some(timeout),
52+
..self
53+
}
54+
}
55+
56+
pub fn with_host(self, host: &'static str) -> Self {
57+
Self {
58+
uri: Uri::from_static(host),
59+
..self
60+
}
61+
}
62+
63+
pub fn connect(self) -> ClientBoxService {
64+
let builder = ServiceBuilder::new();
65+
let timeout = self.timeout.unwrap_or(5);
66+
let builder = builder.timeout(Duration::from_secs(timeout));
67+
68+
let mut b = Builder::new();
69+
let hyper_builder = b.http2_only(true);
70+
let conn = Connection::new()
71+
.with_host(self.uri.clone())
72+
.with_connector(self.connector)
73+
.with_builder(hyper_builder.to_owned());
74+
75+
BoxService::new(builder.service(conn))
76+
}
77+
}
78+
79+
impl From<Uri> for ClientBuilder {
80+
fn from(u: Uri) -> Self {
81+
Self {
82+
uri: u,
83+
timeout: None,
84+
connector: "",
85+
}
86+
}
87+
}

dubbo/src/triple/client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
pub mod connection;
18+
pub mod builder;
1919
pub mod triple;
2020

2121
pub use triple::TripleClient;

dubbo/src/triple/client/triple.rs

+24-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
2121
use http::HeaderValue;
2222
use tower_service::Service;
2323

24-
use super::connection::Connection;
24+
use super::builder::{ClientBoxService, ClientBuilder};
2525
use crate::filter::service::FilterService;
2626
use crate::filter::Filter;
2727
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
@@ -32,33 +32,43 @@ use crate::triple::encode::encode;
3232

3333
#[derive(Debug, Clone, Default)]
3434
pub struct TripleClient<T> {
35-
host: Option<http::Uri>,
35+
builder: Option<ClientBuilder>,
3636
inner: T,
3737
send_compression_encoding: Option<CompressionEncoding>,
3838
}
3939

40-
impl TripleClient<Connection> {
40+
impl TripleClient<ClientBoxService> {
4141
pub fn connect(host: String) -> Self {
4242
let uri = match http::Uri::from_str(&host) {
43-
Ok(v) => Some(v),
43+
Ok(v) => v,
4444
Err(err) => {
4545
tracing::error!("http uri parse error: {}, host: {}", err, host);
4646
panic!("http uri parse error: {}, host: {}", err, host)
4747
}
4848
};
4949

50+
let builder = ClientBuilder::from(uri);
51+
5052
TripleClient {
51-
host: uri.clone(),
52-
inner: Connection::new().with_host(uri.unwrap()),
53+
builder: Some(builder.clone()),
54+
inner: builder.connect(),
55+
send_compression_encoding: Some(CompressionEncoding::Gzip),
56+
}
57+
}
58+
59+
pub fn with_builder(builder: ClientBuilder) -> Self {
60+
TripleClient {
61+
builder: Some(builder.clone()),
62+
inner: builder.connect(),
5363
send_compression_encoding: Some(CompressionEncoding::Gzip),
5464
}
5565
}
5666
}
5767

5868
impl<T> TripleClient<T> {
59-
pub fn new(inner: T, host: Option<http::Uri>) -> Self {
69+
pub fn new(inner: T, builder: ClientBuilder) -> Self {
6070
TripleClient {
61-
host,
71+
builder: Some(builder),
6272
inner,
6373
send_compression_encoding: Some(CompressionEncoding::Gzip),
6474
}
@@ -68,7 +78,10 @@ impl<T> TripleClient<T> {
6878
where
6979
F: Filter,
7080
{
71-
TripleClient::new(FilterService::new(self.inner, filter), self.host)
81+
TripleClient::new(
82+
FilterService::new(self.inner, filter),
83+
self.builder.unwrap(),
84+
)
7285
}
7386
}
7487

@@ -82,8 +95,8 @@ where
8295
path: http::uri::PathAndQuery,
8396
body: hyper::Body,
8497
) -> http::Request<hyper::Body> {
85-
let mut parts = match self.host.as_ref() {
86-
Some(v) => v.to_owned().into_parts(),
98+
let mut parts = match self.builder.as_ref() {
99+
Some(v) => v.to_owned().uri.into_parts(),
87100
None => {
88101
tracing::error!("client host is empty");
89102
return http::Request::new(hyper::Body::empty());

dubbo/src/triple/client/connection.rs dubbo/src/triple/transport/connection.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::triple::transport::connector::get_connector;
2727
#[derive(Debug, Clone)]
2828
pub struct Connection {
2929
host: hyper::Uri,
30-
connector: String,
30+
connector: &'static str,
3131
builder: Builder,
3232
}
3333

@@ -41,12 +41,12 @@ impl Connection {
4141
pub fn new() -> Self {
4242
Connection {
4343
host: hyper::Uri::default(),
44-
connector: "http".to_string(),
44+
connector: "http",
4545
builder: Builder::new(),
4646
}
4747
}
4848

49-
pub fn with_connector<C>(mut self, connector: String) -> Self {
49+
pub fn with_connector(mut self, connector: &'static str) -> Self {
5050
self.connector = connector;
5151
self
5252
}
@@ -83,7 +83,7 @@ where
8383

8484
fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
8585
let builder = self.builder.clone().http2_only(true).to_owned();
86-
let mut connector = Connect::new(get_connector(self.connector.clone()), builder);
86+
let mut connector = Connect::new(get_connector(self.connector), builder);
8787
let uri = self.host.clone();
8888
let fut = async move {
8989
let mut con = connector.call(uri).await.unwrap();

dubbo/src/triple/transport/connector/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ where
7373
}
7474
}
7575

76-
pub fn get_connector(connector: String) -> BoxCloneService<Uri, BoxIO, crate::Error> {
77-
match connector.as_str() {
76+
pub fn get_connector(connector: &'static str) -> BoxCloneService<Uri, BoxIO, crate::Error> {
77+
match connector {
7878
"http" => {
7979
let c = http_connector::HttpConnector::new();
8080
BoxCloneService::new(Connector::new(c))

dubbo/src/triple/transport/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub mod connection;
1819
pub mod connector;
1920
mod io;
2021
pub mod listener;

examples/echo/src/echo/client.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ impl Filter for FakeFilter {
3232
#[tokio::main]
3333
async fn main() {
3434
let mut cli = EchoClient::connect("http://127.0.0.1:8888".to_string());
35-
let mut unary_cli = cli.clone().with_filter(FakeFilter {});
36-
let resp = unary_cli
35+
// let mut unary_cli = cli.clone().with_filter(FakeFilter {});
36+
// let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888"));
37+
let resp = cli
3738
.unary_echo(Request::new(EchoRequest {
3839
message: "message from client".to_string(),
3940
}))

examples/echo/src/protos/hello_echo.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,25 @@ pub mod echo_client {
3636
pub struct EchoClient<T> {
3737
inner: TripleClient<T>,
3838
}
39-
impl EchoClient<Connection> {
39+
impl EchoClient<ClientBoxService> {
4040
pub fn connect(host: String) -> Self {
4141
let cli = TripleClient::connect(host);
4242
EchoClient { inner: cli }
4343
}
44+
pub fn build(builder: ClientBuilder) -> Self {
45+
Self {
46+
inner: TripleClient::with_builder(builder),
47+
}
48+
}
4449
}
4550
impl<T> EchoClient<T>
4651
where
4752
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
4853
T::Error: Into<StdError>,
4954
{
50-
pub fn new(inner: T) -> Self {
55+
pub fn new(inner: T, builder: ClientBuilder) -> Self {
5156
Self {
52-
inner: TripleClient::new(inner, None),
57+
inner: TripleClient::new(inner, builder),
5358
}
5459
}
5560
pub fn with_filter<F>(self, filter: F) -> EchoClient<FilterService<T, F>>

0 commit comments

Comments
 (0)