Skip to content

Commit 6f93320

Browse files
authored
Merge pull request apache#42 from yang20150702/main
Refact: add Connection mod, refactor connector
2 parents 207bfe1 + 1d78481 commit 6f93320

35 files changed

+972
-567
lines changed

config/src/config.rs

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct RootConfig {
3737
pub name: String,
3838
pub service: HashMap<String, ServiceConfig>,
3939
pub protocols: HashMap<String, ProtocolConfig>,
40+
pub registries: HashMap<String, String>,
4041

4142
#[serde(skip_serializing, skip_deserializing)]
4243
pub data: HashMap<String, String>,
@@ -63,6 +64,7 @@ impl RootConfig {
6364
name: "dubbo".to_string(),
6465
service: HashMap::new(),
6566
protocols: HashMap::new(),
67+
registries: HashMap::new(),
6668
data: HashMap::new(),
6769
}
6870
}
@@ -83,6 +85,7 @@ impl RootConfig {
8385
}
8486
};
8587

88+
tracing::info!("current path: {:?}", env::current_dir());
8689
let data = fs::read(config_path)?;
8790
let mut conf: RootConfig = serde_yaml::from_slice(&data).unwrap();
8891
tracing::debug!("origin config: {:?}", conf);

dubbo-build/src/client.rs

+26-11
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,38 @@ pub fn generate<T: Service>(
6666
#service_doc
6767
#(#struct_attributes)*
6868
#[derive(Debug, Clone, Default)]
69-
pub struct #service_ident {
70-
inner: TripleClient,
71-
uri: String,
69+
pub struct #service_ident<T> {
70+
inner: TripleClient<T>,
7271
}
7372

74-
impl #service_ident {
75-
pub fn new() -> Self {
73+
impl #service_ident<Connection> {
74+
pub fn connect(host: String) -> Self {
75+
let cli = TripleClient::connect(host);
76+
#service_ident {
77+
inner: cli,
78+
}
79+
}
80+
}
81+
82+
impl<T> #service_ident<T>
83+
where
84+
T: Service<http::Request<hyperBody>, Response = http::Response<BoxBody>>,
85+
T::Error: Into<StdError>,
86+
{
87+
pub fn new(inner: T) -> Self {
7688
Self {
77-
inner: TripleClient::new(),
78-
uri: "".to_string(),
89+
inner: TripleClient::new(inner, None),
7990
}
8091
}
8192

82-
pub fn with_uri(mut self, uri: String) -> Self {
83-
self.uri = uri.clone();
84-
self.inner = self.inner.with_host(uri);
85-
self
93+
pub fn with_filter<F>(self, filter: F) -> #service_ident<FilterService<T, F>>
94+
where
95+
F: Filter,
96+
{
97+
let inner = self.inner.with_filter(filter);
98+
#service_ident {
99+
inner,
100+
}
86101
}
87102

88103
#methods

dubbo-build/src/server.rs

+12-9
Original file line numberDiff line numberDiff line change
@@ -76,29 +76,33 @@ pub fn generate<T: Service>(
7676
#service_doc
7777
#(#struct_attributes)*
7878
#[derive(Debug)]
79-
pub struct #server_service<T: #server_trait, I = TripleInvoker> {
79+
pub struct #server_service<T: #server_trait> {
8080
inner: _Inner<T>,
81-
invoker: Option<I>,
8281
}
8382

8483
struct _Inner<T>(Arc<T>);
8584

86-
impl<T: #server_trait, I> #server_service<T, I> {
85+
impl<T: #server_trait> #server_service<T> {
8786
pub fn new(inner: T) -> Self {
8887
Self {
8988
inner: _Inner(Arc::new(inner)),
90-
invoker: None,
9189
}
9290
}
9391

92+
pub fn with_filter<F>(inner: T, filter: F) -> FilterService<Self, F>
93+
where
94+
F: Filter,
95+
{
96+
FilterService::new(Self::new(inner), filter)
97+
}
98+
9499
}
95100

96-
impl<T, I, B> Service<http::Request<B>> for #server_service<T, I>
101+
impl<T, B> Service<http::Request<B>> for #server_service<T>
97102
where
98103
T: #server_trait,
99104
B: Body + Send + 'static,
100105
B::Error: Into<StdError> + Send + 'static,
101-
I: Invoker + Send + 'static,
102106
{
103107
type Response = http::Response<BoxBody>;
104108
type Error = std::convert::Infallible;
@@ -126,12 +130,11 @@ pub fn generate<T: Service>(
126130
}
127131
}
128132

129-
impl<T: #server_trait, I: Invoker + Send + 'static> Clone for #server_service<T, I> {
133+
impl<T: #server_trait> Clone for #server_service<T> {
130134
fn clone(&self) -> Self {
131135
let inner = self.inner.clone();
132136
Self {
133137
inner,
134-
invoker: None,
135138
}
136139
}
137140
}
@@ -149,7 +152,7 @@ pub fn generate<T: Service>(
149152
}
150153

151154
pub fn register_server<T: #server_trait>(server: T) {
152-
let s = #server_service::<_, TripleInvoker>::new(server);
155+
let s = #server_service::new(server);
153156
dubbo::protocol::triple::TRIPLE_SERVICES
154157
.write()
155158
.unwrap()

dubbo/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ repository = "https://github.com/apache/dubbo-rust.git"
1212
[dependencies]
1313
hyper = { version = "0.14.19", features = ["full"]}
1414
http = "0.2"
15-
tonic = {version ="0.7.2", features = ["compression",]}
1615
tower-service = "0.3.1"
1716
http-body = "0.4.4"
1817
tower = "0.4.12"

dubbo/src/codegen.rs

+5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ pub use std::sync::Arc;
1919
pub use std::task::{Context, Poll};
2020

2121
pub use async_trait::async_trait;
22+
pub use bytes::Bytes;
2223
pub use http_body::Body;
24+
pub use hyper::Body as hyperBody;
2325
pub use tower_service::Service;
2426

2527
pub use super::invocation::{IntoStreamingRequest, Request, Response};
@@ -34,3 +36,6 @@ pub use super::triple::server::service::{
3436
};
3537
pub use super::triple::server::TripleServer;
3638
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
39+
pub use crate::filter::service::FilterService;
40+
pub use crate::filter::Filter;
41+
pub use crate::triple::client::connection::Connection;

dubbo/src/common/consts.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub const REGISTRY_PROTOCOL: &str = "registry";

dubbo/src/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub mod consts;
1819
pub mod url;

dubbo/src/protocol/grpc/grpc_exporter.rs dubbo/src/filter/mod.rs

+4-26
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
use crate::protocol::{Exporter, Invoker};
18+
pub mod service;
1919

20-
pub struct GrpcExporter<T> {
21-
invoker: T,
22-
}
23-
24-
impl<T> GrpcExporter<T> {
25-
pub fn new(_key: String, invoker: T) -> GrpcExporter<T> {
26-
Self { invoker }
27-
}
28-
}
29-
30-
impl<T: Invoker + Clone> Exporter for GrpcExporter<T> {
31-
type InvokerType = T;
32-
33-
fn unexport(&self) {}
34-
35-
fn get_invoker(&self) -> Self::InvokerType {
36-
self.invoker.clone()
37-
}
38-
}
20+
use crate::invocation::Request;
3921

40-
impl<T: Invoker + Clone> Clone for GrpcExporter<T> {
41-
fn clone(&self) -> Self {
42-
Self {
43-
invoker: self.invoker.clone(),
44-
}
45-
}
22+
pub trait Filter {
23+
fn call(&mut self, req: Request<()>) -> Result<Request<()>, crate::status::Status>;
4624
}

dubbo/src/filter/service.rs

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use tower_service::Service;
19+
20+
use super::Filter;
21+
use crate::invocation::Metadata;
22+
use crate::invocation::Request;
23+
24+
#[derive(Clone)]
25+
pub struct FilterService<S, F> {
26+
inner: S,
27+
f: F,
28+
}
29+
30+
impl<S, F> FilterService<S, F> {
31+
pub fn new(inner: S, f: F) -> Self
32+
where
33+
F: Filter,
34+
{
35+
Self { inner, f }
36+
}
37+
}
38+
39+
impl<S, F, ReqBody> Service<http::Request<ReqBody>> for FilterService<S, F>
40+
where
41+
F: Filter,
42+
S: Service<http::Request<ReqBody>, Response = http::Response<crate::BoxBody>>,
43+
S::Error: Into<crate::Error>,
44+
S::Future: Send + 'static,
45+
{
46+
type Response = http::Response<crate::BoxBody>;
47+
48+
type Error = S::Error;
49+
50+
type Future = crate::BoxFuture<Self::Response, Self::Error>;
51+
52+
fn poll_ready(
53+
&mut self,
54+
cx: &mut std::task::Context<'_>,
55+
) -> std::task::Poll<Result<(), Self::Error>> {
56+
self.inner.poll_ready(cx)
57+
}
58+
59+
fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
60+
let uri = req.uri().clone();
61+
let method = req.method().clone();
62+
let version = req.version();
63+
let (parts, msg) = req.into_parts();
64+
65+
let res = self.f.call(Request::from_parts(
66+
Metadata::from_headers(parts.headers),
67+
(),
68+
));
69+
match res {
70+
Ok(req) => {
71+
let (metadata, _) = req.into_parts();
72+
let req = Request::from_parts(Metadata::from_headers(metadata.into_headers()), msg);
73+
let http_req = req.into_http(uri, method, version);
74+
75+
let resp = self.inner.call(http_req);
76+
Box::pin(resp)
77+
}
78+
Err(err) => {
79+
let fut = async move { Ok(err.to_http()) };
80+
Box::pin(fut)
81+
}
82+
}
83+
}
84+
}

dubbo/src/framework.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,25 @@ use futures::Future;
2323
use futures::FutureExt;
2424

2525
use crate::common::url::Url;
26-
use crate::protocol::triple::triple_invoker::TripleInvoker;
2726
use crate::protocol::triple::triple_protocol::TripleProtocol;
28-
use crate::protocol::{Exporter, Protocol};
27+
use crate::protocol::{BoxExporter, Protocol};
2928
use dubbo_config::{get_global_config, RootConfig};
3029

31-
pub type BoxExporter = Box<dyn Exporter<InvokerType = TripleInvoker>>;
3230
// Invoker是否可以基于hyper写一个通用的
3331

3432
#[derive(Default)]
3533
pub struct Dubbo {
3634
protocols: HashMap<String, Vec<Url>>,
35+
registries: HashMap<String, Url>,
3736
config: Option<RootConfig>,
3837
}
3938

4039
impl Dubbo {
4140
pub fn new() -> Dubbo {
41+
tracing_subscriber::fmt::init();
4242
Self {
4343
protocols: HashMap::new(),
44+
registries: HashMap::new(),
4445
config: None,
4546
}
4647
}
@@ -51,14 +52,18 @@ impl Dubbo {
5152
}
5253

5354
pub fn init(&mut self) {
54-
tracing_subscriber::fmt::init();
55-
5655
if self.config.is_none() {
5756
self.config = Some(get_global_config())
5857
}
5958

6059
let conf = self.config.as_ref().unwrap();
6160
tracing::debug!("global conf: {:?}", conf);
61+
62+
for (name, url) in conf.registries.iter() {
63+
self.registries
64+
.insert(name.to_string(), Url::from_url(url).unwrap());
65+
}
66+
6267
for (_, c) in conf.service.iter() {
6368
let u = if c.protocol_configs.is_empty() {
6469
let protocol = match conf.protocols.get(&c.protocol) {
@@ -106,11 +111,7 @@ impl Dubbo {
106111
"triple" => {
107112
let pro = Box::new(TripleProtocol::new());
108113
for u in c.iter() {
109-
let tri_fut = pro
110-
.clone()
111-
.export(u.clone())
112-
.map(|res| Box::new(res) as BoxExporter)
113-
.boxed();
114+
let tri_fut = pro.clone().export(u.clone()).boxed();
114115
async_vec.push(tri_fut);
115116
}
116117
}

dubbo/src/invocation.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,16 @@ impl<T> Request<T> {
5151
}
5252
}
5353

54-
pub fn into_http(self) -> http::Request<T> {
54+
pub fn into_http(
55+
self,
56+
uri: http::Uri,
57+
method: http::Method,
58+
version: http::Version,
59+
) -> http::Request<T> {
5560
let mut http_req = http::Request::new(self.message);
56-
*http_req.version_mut() = http::Version::HTTP_2;
61+
*http_req.version_mut() = version;
62+
*http_req.uri_mut() = uri;
63+
*http_req.method_mut() = method;
5764
*http_req.headers_mut() = self.metadata.into_headers();
5865

5966
http_req

dubbo/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod codegen;
1919
pub mod common;
20+
pub mod filter;
2021
mod framework;
2122
pub mod invocation;
2223
pub mod protocol;

0 commit comments

Comments
 (0)