Skip to content

Commit fe1ac19

Browse files
committed
Rft(dubbo): use registry_url and service_url to impl registry_protocol plugin
1 parent d403771 commit fe1ac19

File tree

3 files changed

+31
-42
lines changed

3 files changed

+31
-42
lines changed

dubbo/src/common/url.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ mod tests {
9696

9797
#[test]
9898
fn test_from_url() {
99-
let u1 = Url::from_url("");
99+
let u1 = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter");
100+
println!("{:?}", u1.unwrap().get_service_name())
100101
}
101102

102103
#[test]

dubbo/src/framework.rs

+11-22
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::pin::Pin;
2121
use futures::future;
2222
use futures::Future;
2323

24-
use crate::common::consts;
2524
use crate::common::url::Url;
2625
use crate::protocol::{BoxExporter, Protocol};
2726
use crate::registry::protocol::RegistryProtocol;
@@ -33,7 +32,7 @@ use dubbo_config::{get_global_config, RootConfig};
3332
pub struct Dubbo {
3433
protocols: HashMap<String, Vec<Url>>,
3534
registries: HashMap<String, Url>,
36-
services: HashMap<String, Vec<Url>>, // protocol: Url; registry: Url
35+
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
3736
config: Option<RootConfig>,
3837
}
3938

@@ -43,7 +42,7 @@ impl Dubbo {
4342
Self {
4443
protocols: HashMap::new(),
4544
registries: HashMap::new(),
46-
services: HashMap::new(),
45+
service_registry: HashMap::new(),
4746
config: None,
4847
}
4948
}
@@ -94,24 +93,16 @@ impl Dubbo {
9493
}
9594

9695
let u = u.unwrap();
97-
if self.services.get(consts::PROTOCOL).is_some() {
98-
self.services
99-
.get_mut(consts::PROTOCOL)
100-
.unwrap()
101-
.push(u.clone());
102-
} else {
103-
self.services
104-
.insert(consts::PROTOCOL.to_string(), vec![u.clone()]);
105-
}
10696

107-
if self.services.get(consts::REGISTRY).is_some() {
108-
self.services
109-
.get_mut(consts::REGISTRY)
97+
let reg_url = self.registries.get(&c.registry).unwrap();
98+
if self.service_registry.get(&c.name).is_some() {
99+
self.service_registry
100+
.get_mut(&c.name)
110101
.unwrap()
111-
.push(u.clone());
102+
.push(reg_url.clone());
112103
} else {
113-
self.services
114-
.insert(consts::REGISTRY.to_string(), vec![u.clone()]);
104+
self.service_registry
105+
.insert(c.name.clone(), vec![reg_url.clone()]);
115106
}
116107

117108
if self.protocols.get(&c.protocol).is_some() {
@@ -126,11 +117,9 @@ impl Dubbo {
126117
self.init();
127118

128119
// TODO: server registry
129-
for (name, url) in self.registries.iter() {
130-
tracing::info!("registry name: {:?}, url: {:?}", name, url);
131-
}
132120

133-
let mem_reg = Box::new(RegistryProtocol::new());
121+
let mem_reg =
122+
Box::new(RegistryProtocol::new().with_services(self.service_registry.clone()));
134123
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
135124
for (name, items) in self.protocols.iter() {
136125
for url in items.iter() {

dubbo/src/registry/protocol.rs

+18-19
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::sync::{Arc, RwLock};
2121
use super::memory_registry::MemoryRegistry;
2222
use super::BoxRegistry;
2323
use crate::codegen::TripleInvoker;
24-
use crate::common::consts;
2524
use crate::common::url::Url;
2625
use crate::protocol::triple::triple_exporter::TripleExporter;
2726
use crate::protocol::triple::triple_protocol::TripleProtocol;
@@ -35,16 +34,24 @@ pub struct RegistryProtocol {
3534
registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
3635
// providerUrl: Exporter
3736
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
37+
// serviceName: registryUrls
38+
services: HashMap<String, Vec<Url>>,
3839
}
3940

4041
impl RegistryProtocol {
4142
pub fn new() -> Self {
4243
RegistryProtocol {
4344
registries: Arc::new(RwLock::new(HashMap::new())),
4445
exporters: Arc::new(RwLock::new(HashMap::new())),
46+
services: HashMap::new(),
4547
}
4648
}
4749

50+
pub fn with_services(mut self, services: HashMap<String, Vec<Url>>) -> Self {
51+
self.services.extend(services);
52+
self
53+
}
54+
4855
pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
4956
let mem = MemoryRegistry::default();
5057
self.registries
@@ -70,10 +77,14 @@ impl Protocol for RegistryProtocol {
7077
// init Exporter based on provider_url
7178
// server registry based on register_url
7279
// start server health check
73-
let registry_url = get_registry_url(url.clone());
74-
if !registry_url.protocol.is_empty() {
75-
let mut reg = self.get_registry(registry_url.clone());
76-
reg.register(registry_url.clone()).unwrap();
80+
let registry_url = self.services.get(url.get_service_name().join(",").as_str());
81+
if let Some(urls) = registry_url {
82+
for url in urls.clone().iter() {
83+
if !url.protocol.is_empty() {
84+
let mut reg = self.get_registry(url.clone());
85+
reg.register(url.clone()).unwrap();
86+
}
87+
}
7788
}
7889

7990
match url.clone().protocol.as_str() {
@@ -83,10 +94,11 @@ impl Protocol for RegistryProtocol {
8394
}
8495
_ => {
8596
tracing::error!("protocol {:?} not implemented", url.protocol);
97+
Box::new(TripleExporter::new())
8698
}
8799
}
88-
Box::new(TripleExporter::new())
89100
}
101+
90102
async fn refer(self, url: Url) -> Self::Invoker {
91103
// getRegisterUrl
92104
// get Registry from registry_url
@@ -95,16 +107,3 @@ impl Protocol for RegistryProtocol {
95107
Box::new(TripleInvoker::new(url))
96108
}
97109
}
98-
99-
fn get_registry_url(mut url: Url) -> Url {
100-
// registry_url need storage some places
101-
if url.protocol == consts::REGISTRY_PROTOCOL {
102-
url.protocol = url.get_param("registry".to_string()).unwrap();
103-
}
104-
105-
url
106-
}
107-
108-
fn get_provider_url(url: Url) -> Url {
109-
url
110-
}

0 commit comments

Comments
 (0)