Skip to content

Commit 320c207

Browse files
authored
Merge pull request apache#48 from yang20150702/main
Ftr(dubbo): impl registry protocol plugin
2 parents 6f93320 + ce2b48d commit 320c207

File tree

9 files changed

+167
-44
lines changed

9 files changed

+167
-44
lines changed

config/src/config.rs

+20
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use lazy_static::lazy_static;
2121
use serde::{Deserialize, Serialize};
2222

2323
use super::protocol::ProtocolConfig;
24+
use super::provider::ProviderConfig;
2425
use super::service::ServiceConfig;
2526

2627
pub const DUBBO_CONFIG_PATH: &str = "./dubbo.yaml";
@@ -35,10 +36,14 @@ lazy_static! {
3536
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
3637
pub struct RootConfig {
3738
pub name: String,
39+
40+
#[serde(skip_serializing, skip_deserializing)]
3841
pub service: HashMap<String, ServiceConfig>,
3942
pub protocols: HashMap<String, ProtocolConfig>,
4043
pub registries: HashMap<String, String>,
4144

45+
pub provider: ProviderConfig,
46+
4247
#[serde(skip_serializing, skip_deserializing)]
4348
pub data: HashMap<String, String>,
4449
}
@@ -65,6 +70,7 @@ impl RootConfig {
6570
service: HashMap::new(),
6671
protocols: HashMap::new(),
6772
registries: HashMap::new(),
73+
provider: ProviderConfig::new(),
6874
data: HashMap::new(),
6975
}
7076
}
@@ -96,6 +102,10 @@ impl RootConfig {
96102
}
97103

98104
pub fn test_config(&mut self) {
105+
let mut provider = ProviderConfig::new();
106+
provider.protocol_ids = vec!["triple".to_string()];
107+
provider.registry_ids = vec![];
108+
99109
let service_config = ServiceConfig::default()
100110
.group("test".to_string())
101111
.serializer("json".to_string())
@@ -127,6 +137,10 @@ impl RootConfig {
127137
.ip("0.0.0.0".to_string())
128138
.port("8889".to_string()),
129139
);
140+
141+
provider.services = self.service.clone();
142+
self.provider = provider.clone();
143+
println!("provider config: {:?}", provider);
130144
// 通过环境变量读取某个文件。加在到内存中
131145
self.data.insert(
132146
"dubbo.provider.url".to_string(),
@@ -172,6 +186,12 @@ pub trait Config {
172186
mod tests {
173187
use super::*;
174188

189+
#[test]
190+
fn test_config() {
191+
let mut r = RootConfig::new();
192+
r.test_config();
193+
}
194+
175195
#[test]
176196
fn test_load() {
177197
// case 1: read config yaml from default path

config/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
pub mod config;
1919
pub mod protocol;
20+
pub mod provider;
2021
pub mod service;
2122

2223
pub use config::*;

config/src/provider.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::collections::HashMap;
19+
20+
use serde::{Deserialize, Serialize};
21+
22+
use super::service::ServiceConfig;
23+
24+
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
25+
pub struct ProviderConfig {
26+
pub registry_ids: Vec<String>,
27+
28+
pub protocol_ids: Vec<String>,
29+
30+
pub services: HashMap<String, ServiceConfig>,
31+
}
32+
33+
impl ProviderConfig {
34+
pub fn new() -> Self {
35+
ProviderConfig {
36+
registry_ids: vec![],
37+
protocol_ids: vec![],
38+
services: HashMap::new(),
39+
}
40+
}
41+
42+
pub fn with_registry_ids(mut self, registry_ids: Vec<String>) -> Self {
43+
self.registry_ids = registry_ids;
44+
self
45+
}
46+
47+
pub fn with_protocol_ids(mut self, protocol_ids: Vec<String>) -> Self {
48+
self.protocol_ids = protocol_ids;
49+
self
50+
}
51+
52+
pub fn with_services(mut self, services: HashMap<String, ServiceConfig>) -> Self {
53+
self.services = services;
54+
self
55+
}
56+
}

docs/config.md

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
## 关于配置的一些约定(暂时)
2+
3+
所有的服务只能注册到一个或多个注册中心
4+
所有的服务只能使用Triple进行通信
5+
Triple只能对外暴露一个端口
6+
7+
## Config配置
8+
9+
每个组件的配置是独立的。
10+
11+
Provider、Consumer等使用独立组件的配置进行工作
12+
13+
Provider Config核心设计以及Url模型流转:
14+
15+
Provider和Consumer使用组件的配置

dubbo/src/common/consts.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
pub const REGISTRY_PROTOCOL: &str = "registry";
18+
pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
19+
pub const PROTOCOL: &str = "protocol";
20+
pub const REGISTRY: &str = "registry";

dubbo/src/common/url.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ mod tests {
9696

9797
#[test]
9898
fn test_from_url() {
99-
let u1 = Url::from_url("");
99+
let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
100+
println!("{:?}", u1.unwrap().get_service_name())
100101
}
101102

102103
#[test]

dubbo/src/framework.rs

+23-15
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ use std::pin::Pin;
2020

2121
use futures::future;
2222
use futures::Future;
23-
use futures::FutureExt;
2423

2524
use crate::common::url::Url;
26-
use crate::protocol::triple::triple_protocol::TripleProtocol;
2725
use crate::protocol::{BoxExporter, Protocol};
26+
use crate::registry::protocol::RegistryProtocol;
2827
use dubbo_config::{get_global_config, RootConfig};
2928

3029
// Invoker是否可以基于hyper写一个通用的
@@ -33,6 +32,7 @@ use dubbo_config::{get_global_config, RootConfig};
3332
pub struct Dubbo {
3433
protocols: HashMap<String, Vec<Url>>,
3534
registries: HashMap<String, Url>,
35+
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
3636
config: Option<RootConfig>,
3737
}
3838

@@ -42,6 +42,7 @@ impl Dubbo {
4242
Self {
4343
protocols: HashMap::new(),
4444
registries: HashMap::new(),
45+
service_registry: HashMap::new(),
4546
config: None,
4647
}
4748
}
@@ -64,7 +65,7 @@ impl Dubbo {
6465
.insert(name.to_string(), Url::from_url(url).unwrap());
6566
}
6667

67-
for (_, c) in conf.service.iter() {
68+
for (_, c) in conf.provider.services.iter() {
6869
let u = if c.protocol_configs.is_empty() {
6970
let protocol = match conf.protocols.get(&c.protocol) {
7071
Some(v) => v.to_owned(),
@@ -92,6 +93,18 @@ impl Dubbo {
9293
}
9394

9495
let u = u.unwrap();
96+
97+
let reg_url = self.registries.get(&c.registry).unwrap();
98+
if self.service_registry.get(&c.name).is_some() {
99+
self.service_registry
100+
.get_mut(&c.name)
101+
.unwrap()
102+
.push(reg_url.clone());
103+
} else {
104+
self.service_registry
105+
.insert(c.name.clone(), vec![reg_url.clone()]);
106+
}
107+
95108
if self.protocols.get(&c.protocol).is_some() {
96109
self.protocols.get_mut(&c.protocol).unwrap().push(u);
97110
} else {
@@ -105,19 +118,14 @@ impl Dubbo {
105118

106119
// TODO: server registry
107120

121+
let mem_reg =
122+
Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
108123
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
109-
for (key, c) in self.protocols.iter() {
110-
match key.as_str() {
111-
"triple" => {
112-
let pro = Box::new(TripleProtocol::new());
113-
for u in c.iter() {
114-
let tri_fut = pro.clone().export(u.clone()).boxed();
115-
async_vec.push(tri_fut);
116-
}
117-
}
118-
_ => {
119-
tracing::error!("protocol {:?} not implemented", key);
120-
}
124+
for (name, items) in self.protocols.iter() {
125+
for url in items.iter() {
126+
tracing::info!("protocol: {:?}, service url: {:?}", name, url);
127+
let exporter = mem_reg.clone().export(url.to_owned());
128+
async_vec.push(exporter)
121129
}
122130
}
123131

dubbo/src/registry/memory_registry.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#![allow(unused_variables, dead_code, missing_docs)]
1919
use std::collections::HashMap;
20+
use std::sync::Arc;
2021
use std::sync::RwLock;
2122

2223
use super::{NotifyListener, Registry};
@@ -27,15 +28,15 @@ use super::{NotifyListener, Registry};
2728
2829
pub const REGISTRY_GROUP_KEY: &str = "registry.group";
2930

30-
#[derive(Debug, Default)]
31+
#[derive(Debug, Default, Clone)]
3132
pub struct MemoryRegistry {
32-
registries: RwLock<HashMap<String, String>>,
33+
registries: Arc<RwLock<HashMap<String, String>>>,
3334
}
3435

3536
impl MemoryRegistry {
3637
pub fn new() -> MemoryRegistry {
3738
MemoryRegistry {
38-
registries: RwLock::new(HashMap::new()),
39+
registries: Arc::new(RwLock::new(HashMap::new())),
3940
}
4041
}
4142
}

dubbo/src/registry/protocol.rs

+43-24
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,50 @@
1616
*/
1717

1818
use std::collections::HashMap;
19-
use std::sync::Arc;
19+
use std::sync::{Arc, RwLock};
2020

2121
use super::memory_registry::MemoryRegistry;
2222
use super::BoxRegistry;
2323
use crate::codegen::TripleInvoker;
24-
use crate::common::consts;
2524
use crate::common::url::Url;
2625
use crate::protocol::triple::triple_exporter::TripleExporter;
26+
use crate::protocol::triple::triple_protocol::TripleProtocol;
2727
use crate::protocol::BoxExporter;
2828
use crate::protocol::BoxInvoker;
2929
use crate::protocol::Protocol;
3030

3131
#[derive(Clone, Default)]
3232
pub struct RegistryProtocol {
3333
// registerAddr: Registry
34-
registries: Arc<HashMap<String, BoxRegistry>>,
34+
registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
3535
// providerUrl: Exporter
36-
exporters: Arc<HashMap<String, BoxExporter>>,
36+
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
37+
// serviceName: registryUrls
38+
services: HashMap<String, Vec<Url>>,
3739
}
3840

3941
impl RegistryProtocol {
4042
pub fn new() -> Self {
4143
RegistryProtocol {
42-
registries: Arc::new(HashMap::new()),
43-
exporters: Arc::new(HashMap::new()),
44+
registries: Arc::new(RwLock::new(HashMap::new())),
45+
exporters: Arc::new(RwLock::new(HashMap::new())),
46+
services: HashMap::new(),
4447
}
4548
}
4649

47-
pub fn get_registry(&self, url: Url) -> BoxRegistry {
48-
// self.registries.clone().insert(url.location.clone(), Box::new(MemoryRegistry::default()));
50+
pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> Self {
51+
self.services.extend(services);
52+
self
53+
}
54+
55+
pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
56+
let mem = MemoryRegistry::default();
57+
self.registries
58+
.write()
59+
.unwrap()
60+
.insert(url.location, Box::new(mem.clone()));
4961

50-
// *(self.registries.get(&url.location).unwrap())
51-
Box::new(MemoryRegistry::default())
62+
Box::new(mem)
5263
}
5364
}
5465

@@ -60,14 +71,34 @@ impl Protocol for RegistryProtocol {
6071
todo!()
6172
}
6273

63-
async fn export(self, url: Url) -> BoxExporter {
74+
async fn export(mut self, url: Url) -> BoxExporter {
6475
// getProviderUrl
6576
// getRegisterUrl
6677
// init Exporter based on provider_url
6778
// server registry based on register_url
6879
// start server health check
69-
Box::new(TripleExporter::new())
80+
let registry_url = self.services.get(url.get_service_name().join(",").as_str());
81+
if let Some(urls) = registry_url {
82+
for url in urls.clone().iter() {
83+
if !url.protocol.is_empty() {
84+
let mut reg = self.get_registry(url.clone());
85+
reg.register(url.clone()).unwrap();
86+
}
87+
}
88+
}
89+
90+
match url.clone().protocol.as_str() {
91+
"triple" => {
92+
let pro = Box::new(TripleProtocol::new());
93+
return pro.export(url).await;
94+
}
95+
_ => {
96+
tracing::error!("protocol {:?} not implemented", url.protocol);
97+
Box::new(TripleExporter::new())
98+
}
99+
}
70100
}
101+
71102
async fn refer(self, url: Url) -> Self::Invoker {
72103
// getRegisterUrl
73104
// get Registry from registry_url
@@ -76,15 +107,3 @@ impl Protocol for RegistryProtocol {
76107
Box::new(TripleInvoker::new(url))
77108
}
78109
}
79-
80-
fn get_registry_url(mut url: Url) -> Url {
81-
if url.protocol == consts::REGISTRY_PROTOCOL {
82-
url.protocol = url.get_param("registry".to_string()).unwrap();
83-
}
84-
85-
url
86-
}
87-
88-
fn get_provider_url(url: Url) -> Url {
89-
url
90-
}

0 commit comments

Comments
 (0)