Skip to content

Commit eef25df

Browse files
daedalus2022qunwei
and
qunwei
authored
Rpc context implementation apache#58 (apache#83)
* Rpc context implementation apache#58 * 修复建议补充测试场景 * 修复建议补充测试场景 * advice: use tracing to replace with println * advice: use tracing to replace with println Co-authored-by: qunwei <[email protected]>
1 parent c09abce commit eef25df

File tree

2 files changed

+44
-58
lines changed

2 files changed

+44
-58
lines changed

dubbo/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ async-stream = "0.3"
3434
flate2 = "1.0"
3535

3636
dubbo-config = {path = "../config", version = "0.2.0"}
37+
38+
#对象存储
39+
state = { version = "0.5", features = ["tls"] }

dubbo/src/context.rs

+41-58
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
use core::cell::RefCell;
19-
use std::any::Any;
2018
use std::collections::HashMap;
21-
use std::fmt;
22-
use std::sync::Arc;
19+
use std::sync::{Arc, Mutex};
20+
use std::thread;
2321

24-
///
25-
/// ```rust
26-
/// use std::collections::HashMap;
27-
/// use std::sync::Arc;
28-
///
29-
/// let mut map = HashMap::<String, SafetyValue>::new();
30-
/// map.insert("key1".into(), Arc::new("data-1"));
31-
///
32-
/// // get a typed value from SafetyValue
33-
/// let value = map
34-
/// .get("key1")
35-
/// .and_then(|f| f.downcast_ref::<String>())
36-
/// .unwrap();
37-
///
38-
/// assert_eq!(value, "data-1");
39-
/// ```
40-
type SafetyValue = Arc<dyn Any + Sync + Send>;
22+
use serde_json::Value;
23+
use state::Container;
4124

42-
thread_local! {
43-
static SERVICE_CONTEXT: RefCell<RpcContext> = RefCell::new(RpcContext::default());
44-
}
25+
pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();
4526

4627
///
4728
/// All environment information of during the current call will put into the context
@@ -53,37 +34,38 @@ thread_local! {
5334
/// After B call C,the RpcContext record the information of B call C
5435
///
5536
#[derive(Clone, Default)]
56-
pub struct RpcContext {
57-
pub attachments: HashMap<String, SafetyValue>,
58-
// TODO
59-
}
60-
61-
impl RpcContext {
62-
pub fn current() -> Self {
63-
get_current(|ctx| ctx.clone())
64-
}
37+
pub struct RpcContext {}
6538

66-
pub fn clear(&mut self) {
67-
self.attachments.clear();
68-
}
39+
pub trait Context {
40+
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>>;
6941
}
7042

71-
fn get_current<F: FnMut(&RpcContext) -> T, T>(mut f: F) -> T {
72-
SERVICE_CONTEXT.try_with(|ctx| f(&ctx.borrow())).unwrap()
73-
}
43+
impl Context for RpcContext {
44+
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>> {
45+
let local = APPLICATION_CONTEXT.try_get_local::<Arc<Mutex<HashMap<String, Value>>>>();
46+
47+
tracing::debug!("{:?} - {:?}", thread::current().id(), local);
7448

75-
impl fmt::Debug for RpcContext {
76-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77-
f.debug_struct("Context")
78-
.field("attachments", &self.attachments)
79-
.finish()
49+
match local {
50+
Some(attachment) => Some(attachment.clone()),
51+
None => {
52+
let attachment = HashMap::<String, Value>::new();
53+
let mutex = Arc::new(Mutex::new(attachment));
54+
let mutex_clone = Arc::clone(&mutex);
55+
APPLICATION_CONTEXT.set_local(move || {
56+
return Arc::clone(&mutex_clone);
57+
});
58+
Some(Arc::clone(&mutex))
59+
}
60+
}
8061
}
8162
}
8263

8364
#[cfg(test)]
8465
mod tests {
66+
use tokio::time;
67+
8568
use super::*;
86-
use std::thread::sleep;
8769
use std::time::Duration;
8870

8971
#[test]
@@ -96,25 +78,26 @@ mod tests {
9678

9779
let mut handles = Vec::with_capacity(10);
9880

99-
for i in 0..10 {
81+
for i in 0..=10 {
10082
handles.push(rt.spawn(async move {
101-
let mut attachments = RpcContext::current().attachments;
102-
attachments.insert("key1".into(), Arc::new(format!("data-{i}")));
103-
104-
if i == 10 {
105-
attachments.insert("key2".into(), Arc::new(2));
106-
assert_eq!(attachments.len(), 2);
107-
} else {
108-
assert_eq!(attachments.len(), 1);
109-
}
83+
if let Some(attachments) = RpcContext::get_attachments() {
84+
let mut attachments = attachments.lock().unwrap();
85+
attachments.insert("key1".into(), Value::from(format!("data-{i}")));
86+
87+
assert!(attachments.len() > 0);
88+
};
89+
90+
time::sleep(Duration::from_millis(1000)).await;
91+
92+
if let Some(attachments) = RpcContext::get_attachments() {
93+
let attachments = attachments.lock().unwrap();
94+
assert!(attachments.len() > 0);
95+
};
11096
}));
11197
}
11298

113-
sleep(Duration::from_millis(500));
114-
11599
for handle in handles {
116100
rt.block_on(handle).unwrap();
117101
}
118-
assert_eq!(RpcContext::current().attachments.len(), 0);
119102
}
120103
}

0 commit comments

Comments
 (0)