Skip to content

Commit f656da5

Browse files
authored
Add Builder for DubboServer (apache#93)
* Ftr: add serverBuilder for Server, support multiple ways to start server * Rft(examples): update yaml
1 parent b3fce7f commit f656da5

File tree

16 files changed

+247
-76
lines changed

16 files changed

+247
-76
lines changed

config/src/config.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ impl RootConfig {
115115
let triple_config = ProtocolConfig::default()
116116
.name("triple".to_string())
117117
.ip("0.0.0.0".to_string())
118-
.port("8888".to_string());
118+
.port("8888".to_string())
119+
.listener("tcp".to_string());
119120

120121
let service_config = service_config.add_protocol_configs(triple_config);
121122
self.service
@@ -135,7 +136,8 @@ impl RootConfig {
135136
ProtocolConfig::default()
136137
.name("triple".to_string())
137138
.ip("0.0.0.0".to_string())
138-
.port("8889".to_string()),
139+
.port("8889".to_string())
140+
.listener("tcp".to_string()),
139141
);
140142

141143
provider.services = self.service.clone();

config/src/protocol.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct ProtocolConfig {
2424
pub ip: String,
2525
pub port: String,
2626
pub name: String,
27+
pub listener: String,
2728

2829
#[serde(skip_serializing, skip_deserializing)]
2930
pub params: HashMap<String, String>,
@@ -42,11 +43,30 @@ impl ProtocolConfig {
4243
Self { port, ..self }
4344
}
4445

46+
pub fn listener(self, listener: String) -> Self {
47+
Self { listener, ..self }
48+
}
49+
4550
pub fn params(self, params: HashMap<String, String>) -> Self {
4651
Self { params, ..self }
4752
}
4853

54+
pub fn add_param(mut self, key: String, value: String) -> Self {
55+
self.params.insert(key, value);
56+
self
57+
}
58+
4959
pub fn to_url(&self) -> String {
50-
format!("{}://{}:{}", self.name, self.ip, self.port)
60+
let mut params_vec: Vec<String> = Vec::new();
61+
for (k, v) in self.params.iter() {
62+
// let tmp = format!("{}={}", k, v);
63+
params_vec.push(format!("{}={}", k, v));
64+
}
65+
let param = params_vec.join("&");
66+
67+
format!(
68+
"{}://{}:{}?listener={}&{}",
69+
self.name, self.ip, self.port, self.listener, param
70+
)
5171
}
5272
}

docs/generic-protocol-design.md

+12
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,22 @@ Protocol模块的核心功能:
1818
+ 通用、高效的Listener层
1919
+ 等等
2020

21+
Protocol API支持管理多个底层协议的server以及在一个server上暴露多个服务
22+
+ 多个底层通信的server:location(ip:port): server
23+
+ 一个server上暴露多个服务:
24+
2125
### Exporter
2226

2327
### Invoker
2428

29+
Invoker: 客户端通用能力的封装。获取需要一个private withInvoker 接口
30+
31+
Invoker应该是基于Connection(Service)实现的Service。并且进行扩展新的接口。
32+
protocol.rs模块:根据Url返回初始化好的Invoker实例。
33+
+ 如何设计Invoker接口:扩展Service接口
34+
+ cluster模块如何使用Invoker实例呢?这里需要画一个数据流转图
35+
+ 如何将初始化好的Invoker与tower::Layer相结合
36+
2537
Invoker提供的通用的接口,使得dubbo在不同的协议下遵循相同的接口抽象。
2638

2739
在Invoker中,需要做的功能包括

docs/readme.md

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
# Readme
22

3-
There is some RFCs of dubbo-rust design.
3+
There is some RFCs of dubbo-rust design.
4+
5+
## 关于配置的一些约定(暂时)
6+
7+
所有的服务只能注册到一个或多个注册中心
8+
所有的服务只能使用Triple进行通信
9+
Triple只能对外暴露一个端口
10+
11+
一个协议上的server注册到

dubbo.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ service:
1111
ip: 0.0.0.0
1212
port: '8888'
1313
name: triple
14+
listener: tcp
1415
# helloworld.Greeter:
1516
# version: 1.0.0
1617
# group: test
@@ -22,3 +23,4 @@ protocols:
2223
ip: 0.0.0.0
2324
port: '8888'
2425
name: triple
26+
listener: tcp

dubbo/readme.md

+23-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,23 @@
1-
# Introduce
1+
# Introduce
2+
3+
## Filter实现
4+
5+
客户端Filter和服务端Filter
6+
将TripleClient扩展为通用的InvokerClient
7+
8+
+ 测试客户端filter
9+
+ 服务端filter接口设计,以及测试
10+
+ 服务注册接口
11+
+ 服务发现接口设计
12+
13+
EchoClient -> TripleClient -> FilterService -> Connection -> hyper::Connect
14+
15+
16+
memory registry clone实现
17+
将服务注册接入到framework中
18+
url模型如何用于多个场景
19+
protocol模块实现
20+
21+
registry config(yaml) -> registry config(memory) ->
22+
23+
Init函数初始化配置即:根据RootConfig来初始化对应的ServiceConfig

dubbo/src/codegen.rs

+1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,5 @@ pub use super::{empty_body, BoxBody, BoxFuture, StdError};
3939
pub use crate::filter::service::FilterService;
4040
pub use crate::filter::Filter;
4141
pub use crate::triple::client::builder::{ClientBoxService, ClientBuilder};
42+
pub use crate::triple::server::builder::ServerBuilder;
4243
pub use crate::triple::transport::connection::Connection;

dubbo/src/common/url.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl Url {
4141
tracing::error!("fail to parse url({}), err: {:?}", url, err);
4242
})
4343
.unwrap();
44-
Some(Self {
44+
let mut u = Self {
4545
uri: uri.to_string(),
4646
protocol: uri.scheme_str()?.to_string(),
4747
ip: uri.authority()?.host().to_string(),
@@ -54,7 +54,18 @@ impl Url {
5454
.map(|x| x.to_string())
5555
.collect::<Vec<_>>(),
5656
params: HashMap::new(),
57-
})
57+
};
58+
if uri.query().is_some() {
59+
u.decode(uri.query().unwrap().to_string());
60+
u.service_key = u
61+
.get_param("service_names".to_string())
62+
.unwrap()
63+
.split(',')
64+
.map(|x| x.to_string())
65+
.collect::<Vec<_>>();
66+
}
67+
68+
Some(u)
5869
}
5970

6071
pub fn get_service_name(&self) -> Vec<String> {

dubbo/src/framework.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,17 @@ impl Dubbo {
7777
protocol_url
7878
}
7979
};
80-
let protocol_url = format!(
81-
"{}/{}/{}",
82-
&protocol_url.to_url(),
83-
service_config.name,
84-
service_name
85-
);
80+
// let protocol_url = format!(
81+
// "{}/{}/{}",
82+
// &protocol_url.to_url(),
83+
// service_config.name,
84+
// service_name
85+
// );
86+
// service_names may be multiple
87+
let protocol_url = protocol_url
88+
.to_owned()
89+
.add_param("service_names".to_string(), service_name.to_string());
90+
let protocol_url = protocol_url.to_url();
8691
tracing::info!("url: {}", protocol_url);
8792

8893
let protocol_url = Url::from_url(&protocol_url)

dubbo/src/protocol/triple/triple_protocol.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ impl Protocol for TripleProtocol {
6060
}
6161

6262
async fn export(mut self, url: Url) -> BoxExporter {
63-
let server = TripleServer::new(url.service_key.clone());
63+
let server = TripleServer::new();
6464
self.servers
6565
.insert(url.service_key.join(","), server.clone());
66-
server.serve(url.to_url()).await;
66+
server.serve(url).await;
6767

6868
Box::new(TripleExporter::new())
6969
}

dubbo/src/protocol/triple/triple_server.rs

+7-50
Original file line numberDiff line numberDiff line change
@@ -15,65 +15,22 @@
1515
* limitations under the License.
1616
*/
1717

18-
use std::{net::ToSocketAddrs, str::FromStr};
19-
20-
use crate::triple::transport::DubboServer;
18+
use crate::{common::url::Url, triple::server::builder::ServerBuilder};
2119

2220
#[derive(Default, Clone)]
2321
pub struct TripleServer {
24-
s: DubboServer,
25-
service_names: Vec<String>,
22+
builder: ServerBuilder,
2623
}
2724

2825
impl TripleServer {
29-
pub fn new(names: Vec<String>) -> TripleServer {
26+
pub fn new() -> TripleServer {
3027
Self {
31-
service_names: names,
32-
s: DubboServer::new(),
28+
builder: ServerBuilder::new(),
3329
}
3430
}
3531

36-
pub async fn serve(mut self, url: String) {
37-
{
38-
let lock = super::TRIPLE_SERVICES.read().unwrap();
39-
for name in self.service_names.iter() {
40-
if lock.get(name).is_none() {
41-
tracing::warn!("service ({}) not register", name);
42-
continue;
43-
}
44-
let svc = lock.get(name).unwrap();
45-
46-
self.s = self.s.add_service(name.clone(), svc.clone());
47-
}
48-
}
49-
50-
let uri = match http::Uri::from_str(&url) {
51-
Ok(v) => v,
52-
Err(err) => {
53-
tracing::error!("http uri parse error: {}, url: {:?}", err, &url);
54-
return;
55-
}
56-
};
57-
58-
let authority = match uri.authority() {
59-
Some(v) => v.to_owned(),
60-
None => {
61-
tracing::error!("http authority is none");
62-
return;
63-
}
64-
};
65-
66-
self.s
67-
.with_listener("tcp".to_string())
68-
.serve(
69-
authority
70-
.to_string()
71-
.to_socket_addrs()
72-
.unwrap()
73-
.next()
74-
.unwrap(),
75-
)
76-
.await
77-
.unwrap();
32+
pub async fn serve(mut self, url: Url) {
33+
self.builder = ServerBuilder::from(url);
34+
self.builder.build().serve().await.unwrap()
7835
}
7936
}

0 commit comments

Comments
 (0)