Skip to content

Commit b3fce7f

Browse files
daedalus2022qunwei
and
qunwei
authored
Support timeout after waiting RPC response for a maximum time. apache#47 (apache#92)
* Support timeout after waiting RPC response for a maximum time. apache#47 * timeout key by define const Co-authored-by: qunwei <[email protected]>
1 parent aa21f11 commit b3fce7f

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

dubbo/src/filter/context.rs

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::time::{SystemTime, UNIX_EPOCH};
19+
20+
use serde_json::Value;
21+
22+
use crate::{
23+
codegen::Request,
24+
context::{Context, RpcContext},
25+
filter::{TIMEOUT_COUNTDOWN, TIMEOUT_DEFAULT, TRI_TIMEOUT_DEADLINE_IN_NANOS},
26+
status::Status,
27+
};
28+
29+
use super::Filter;
30+
31+
#[derive(Clone)]
32+
pub struct ContextFilter {}
33+
34+
impl Filter for ContextFilter {
35+
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
36+
let headers = &mut req.metadata.into_headers();
37+
38+
let timeout = headers.get(TIMEOUT_COUNTDOWN);
39+
40+
let time = SystemTime::now()
41+
.duration_since(UNIX_EPOCH)
42+
.unwrap()
43+
.as_nanos();
44+
45+
let mut dead_line_in_nanos = 0_u128;
46+
47+
if let Some(t) = timeout {
48+
let timeout: u128 = t.to_str().unwrap().parse().unwrap();
49+
if timeout > 0_u128 {
50+
dead_line_in_nanos = time + timeout * 1000000;
51+
}
52+
} else {
53+
let timeout: u128 = TIMEOUT_DEFAULT * 1000000;
54+
dead_line_in_nanos = time + timeout;
55+
}
56+
57+
tracing::debug!(
58+
"ContextFilter tri-timeout-deadline-in-nanos : {}",
59+
dead_line_in_nanos
60+
);
61+
if let Some(at) = RpcContext::get_attachments() {
62+
let mut attachments = at.lock().unwrap();
63+
attachments.insert(
64+
String::from(TRI_TIMEOUT_DEADLINE_IN_NANOS),
65+
Value::from(dead_line_in_nanos.to_string()),
66+
);
67+
}
68+
69+
Ok(req)
70+
}
71+
}

dubbo/src/filter/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18+
pub mod context;
1819
pub mod service;
20+
pub mod timeout;
1921

2022
use crate::invocation::Request;
2123

24+
pub const TRI_TIMEOUT_DEADLINE_IN_NANOS: &str = "tri-timeout-deadline-in-nanos";
25+
pub const TIMEOUT_COUNTDOWN: &str = "timeout-countdown";
26+
pub const TIMEOUT_DEFAULT: u128 = 1000;
27+
2228
pub trait Filter {
2329
fn call(&mut self, req: Request<()>) -> Result<Request<()>, crate::status::Status>;
2430
}

dubbo/src/filter/timeout.rs

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::time::{SystemTime, UNIX_EPOCH};
18+
19+
use crate::{
20+
codegen::Request,
21+
context::{Context, RpcContext},
22+
filter::TRI_TIMEOUT_DEADLINE_IN_NANOS,
23+
status::{Code, Status},
24+
};
25+
26+
use super::Filter;
27+
28+
#[derive(Clone)]
29+
pub struct TimeoutFilter {}
30+
31+
/// timeout count
32+
/// 1. ContextFilter 初始化 timeout 时间,初始化后将 tri-timeout-deadline-in-nanos 放入 context 中
33+
/// 2. TimeoutFilter read context tri-timeout-deadline-in-nanos
34+
/// 3. 响应时计算 tri-timeout-deadline-in-nanos - current_nanos <= 0
35+
///
36+
impl Filter for TimeoutFilter {
37+
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
38+
if let Some(attachments) = RpcContext::get_attachments() {
39+
let current_nanos = SystemTime::now()
40+
.duration_since(UNIX_EPOCH)
41+
.unwrap()
42+
.as_nanos();
43+
44+
let attachments = attachments.lock().unwrap();
45+
let tri_timeout_deadline_in_nanos =
46+
attachments.get(TRI_TIMEOUT_DEADLINE_IN_NANOS).unwrap();
47+
let tri_timeout_deadline_in_nanos: u128 = tri_timeout_deadline_in_nanos
48+
.as_str()
49+
.unwrap()
50+
.parse()
51+
.unwrap();
52+
53+
tracing::debug!(
54+
"TimeoutFilter tri-timeout-deadline-in-nanos : {}, current-nanos:{}",
55+
tri_timeout_deadline_in_nanos,
56+
current_nanos
57+
);
58+
if tri_timeout_deadline_in_nanos - current_nanos <= 0 {
59+
return Err(Status::new(Code::DeadlineExceeded, String::from("Timeout")));
60+
}
61+
}
62+
63+
Ok(req)
64+
}
65+
}

examples/echo/src/echo/server.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::io::ErrorKind;
1919
use std::pin::Pin;
2020

2121
use async_trait::async_trait;
22+
use dubbo::filter::context::ContextFilter;
23+
use dubbo::filter::timeout::TimeoutFilter;
2224
use futures_util::Stream;
2325
use futures_util::StreamExt;
2426
use tokio::sync::mpsc;
@@ -52,12 +54,15 @@ async fn main() {
5254
});
5355
let server = EchoServerImpl::default();
5456
let s = EchoServer::<EchoServerImpl>::with_filter(server, FakeFilter {});
57+
let timeout_filter = FilterService::new(s, TimeoutFilter {});
58+
let context_filter = FilterService::new(timeout_filter, ContextFilter {});
59+
5560
dubbo::protocol::triple::TRIPLE_SERVICES
5661
.write()
5762
.unwrap()
5863
.insert(
5964
"grpc.examples.echo.Echo".to_string(),
60-
dubbo::utils::boxed_clone::BoxCloneService::new(s),
65+
dubbo::utils::boxed_clone::BoxCloneService::new(context_filter),
6166
);
6267

6368
// Dubbo::new().start().await;

0 commit comments

Comments
 (0)