Skip to content

Commit 30b9c1a

Browse files
committed
refactor(dubbo): use registry protocol plugin to init protocol
1 parent 1d78481 commit 30b9c1a

File tree

3 files changed

+43
-27
lines changed

3 files changed

+43
-27
lines changed

dubbo/src/framework.rs

+10-14
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ use std::pin::Pin;
2020

2121
use futures::future;
2222
use futures::Future;
23-
use futures::FutureExt;
2423

2524
use crate::common::url::Url;
26-
use crate::protocol::triple::triple_protocol::TripleProtocol;
2725
use crate::protocol::{BoxExporter, Protocol};
26+
use crate::registry::protocol::RegistryProtocol;
2827
use dubbo_config::{get_global_config, RootConfig};
2928

3029
// Invoker是否可以基于hyper写一个通用的
@@ -104,20 +103,17 @@ impl Dubbo {
104103
self.init();
105104

106105
// TODO: server registry
106+
for (name, url) in self.registries.iter() {
107+
tracing::info!("registry name: {:?}, url: {:?}", name, url);
108+
}
107109

110+
let mem_reg = Box::new(RegistryProtocol::new());
108111
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
109-
for (key, c) in self.protocols.iter() {
110-
match key.as_str() {
111-
"triple" => {
112-
let pro = Box::new(TripleProtocol::new());
113-
for u in c.iter() {
114-
let tri_fut = pro.clone().export(u.clone()).boxed();
115-
async_vec.push(tri_fut);
116-
}
117-
}
118-
_ => {
119-
tracing::error!("protocol {:?} not implemented", key);
120-
}
112+
for (name, items) in self.protocols.iter() {
113+
for url in items.iter() {
114+
tracing::info!("protocol: {:?}, service url: {:?}", name, url);
115+
let exporter = mem_reg.clone().export(url.to_owned());
116+
async_vec.push(exporter)
121117
}
122118
}
123119

dubbo/src/registry/memory_registry.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#![allow(unused_variables, dead_code, missing_docs)]
1919
use std::collections::HashMap;
20+
use std::sync::Arc;
2021
use std::sync::RwLock;
2122

2223
use super::{NotifyListener, Registry};
@@ -27,15 +28,15 @@ use super::{NotifyListener, Registry};
2728
2829
pub const REGISTRY_GROUP_KEY: &str = "registry.group";
2930

30-
#[derive(Debug, Default)]
31+
#[derive(Debug, Default, Clone)]
3132
pub struct MemoryRegistry {
32-
registries: RwLock<HashMap<String, String>>,
33+
registries: Arc<RwLock<HashMap<String, String>>>,
3334
}
3435

3536
impl MemoryRegistry {
3637
pub fn new() -> MemoryRegistry {
3738
MemoryRegistry {
38-
registries: RwLock::new(HashMap::new()),
39+
registries: Arc::new(RwLock::new(HashMap::new())),
3940
}
4041
}
4142
}

dubbo/src/registry/protocol.rs

+29-10
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,43 @@
1616
*/
1717

1818
use std::collections::HashMap;
19-
use std::sync::Arc;
19+
use std::sync::{Arc, RwLock};
2020

2121
use super::memory_registry::MemoryRegistry;
2222
use super::BoxRegistry;
2323
use crate::codegen::TripleInvoker;
2424
use crate::common::consts;
2525
use crate::common::url::Url;
2626
use crate::protocol::triple::triple_exporter::TripleExporter;
27+
use crate::protocol::triple::triple_protocol::TripleProtocol;
2728
use crate::protocol::BoxExporter;
2829
use crate::protocol::BoxInvoker;
2930
use crate::protocol::Protocol;
3031

3132
#[derive(Clone, Default)]
3233
pub struct RegistryProtocol {
3334
// registerAddr: Registry
34-
registries: Arc<HashMap<String, BoxRegistry>>,
35+
registries: Arc<RwLock<HashMap<String, BoxRegistry>>>,
3536
// providerUrl: Exporter
36-
exporters: Arc<HashMap<String, BoxExporter>>,
37+
exporters: Arc<RwLock<HashMap<String, BoxExporter>>>,
3738
}
3839

3940
impl RegistryProtocol {
4041
pub fn new() -> Self {
4142
RegistryProtocol {
42-
registries: Arc::new(HashMap::new()),
43-
exporters: Arc::new(HashMap::new()),
43+
registries: Arc::new(RwLock::new(HashMap::new())),
44+
exporters: Arc::new(RwLock::new(HashMap::new())),
4445
}
4546
}
4647

47-
pub fn get_registry(&self, url: Url) -> BoxRegistry {
48-
// self.registries.clone().insert(url.location.clone(), Box::new(MemoryRegistry::default()));
48+
pub fn get_registry(&mut self, url: Url) -> BoxRegistry {
49+
let mem = MemoryRegistry::default();
50+
self.registries
51+
.write()
52+
.unwrap()
53+
.insert(url.location, Box::new(mem.clone()));
4954

50-
// *(self.registries.get(&url.location).unwrap())
51-
Box::new(MemoryRegistry::default())
55+
Box::new(mem)
5256
}
5357
}
5458

@@ -60,12 +64,27 @@ impl Protocol for RegistryProtocol {
6064
todo!()
6165
}
6266

63-
async fn export(self, url: Url) -> BoxExporter {
67+
async fn export(mut self, url: Url) -> BoxExporter {
6468
// getProviderUrl
6569
// getRegisterUrl
6670
// init Exporter based on provider_url
6771
// server registry based on register_url
6872
// start server health check
73+
let registry_url = get_registry_url(url.clone());
74+
if !registry_url.protocol.is_empty() {
75+
let mut reg = self.get_registry(registry_url.clone());
76+
reg.register(registry_url.clone()).unwrap();
77+
}
78+
79+
match url.clone().protocol.as_str() {
80+
"triple" => {
81+
let pro = Box::new(TripleProtocol::new());
82+
return pro.export(url).await;
83+
}
84+
_ => {
85+
tracing::error!("protocol {:?} not implemented", url.protocol);
86+
}
87+
}
6988
Box::new(TripleExporter::new())
7089
}
7190
async fn refer(self, url: Url) -> Self::Invoker {

0 commit comments

Comments
 (0)