Skip to content

Commit 32dd1ee

Browse files
daedalus2022qunwei
and
qunwei
authored
UnixListener in Triple Transport (apache#74)
* UnixListener in Triple Transport * UnixListener in Triple Transport * fix ci errors Co-authored-by: qunwei <[email protected]>
1 parent f14bf36 commit 32dd1ee

File tree

4 files changed

+203
-2
lines changed

4 files changed

+203
-2
lines changed

dubbo/src/triple/transport/connector/mod.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
*/
1717

1818
pub mod http_connector;
19+
pub mod unix_connector;
1920

20-
use hyper::{client::connect::Connection, Uri};
21+
use hyper::Uri;
2122
use tokio::io::{AsyncRead, AsyncWrite};
2223
use tower::make::MakeConnection;
2324
use tower_service::Service;
@@ -35,7 +36,7 @@ impl<C> Connector<C> {
3536
where
3637
C: Service<Uri>,
3738
C::Error: Into<crate::Error>,
38-
C::Response: AsyncRead + AsyncWrite + Connection + Send + 'static,
39+
C::Response: AsyncRead + AsyncWrite + Send + 'static,
3940
{
4041
Self { inner }
4142
}
@@ -77,6 +78,10 @@ pub fn get_connector(connector: String) -> BoxCloneService<Uri, BoxIO, crate::Er
7778
let c = http_connector::HttpConnector::new();
7879
BoxCloneService::new(Connector::new(c))
7980
}
81+
"unix" => {
82+
let c = unix_connector::UnixConnector::new();
83+
BoxCloneService::new(Connector::new(c))
84+
}
8085
_ => {
8186
let c = http_connector::HttpConnector::new();
8287
BoxCloneService::new(Connector::new(c))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
19+
use std::str::FromStr;
20+
21+
use http::Uri;
22+
use hyper::client::connect::dns::Name;
23+
use tokio::net::UnixStream;
24+
use tower_service::Service;
25+
26+
use crate::triple::transport::resolver::dns::DnsResolver;
27+
use crate::triple::transport::resolver::Resolve;
28+
29+
#[derive(Clone, Default)]
30+
pub struct UnixConnector<R = DnsResolver> {
31+
resolver: R,
32+
}
33+
34+
impl UnixConnector {
35+
pub fn new() -> Self {
36+
Self {
37+
resolver: DnsResolver::default(),
38+
}
39+
}
40+
}
41+
42+
impl<R> UnixConnector<R> {
43+
pub fn new_with_resolver(resolver: R) -> UnixConnector<R> {
44+
Self { resolver }
45+
}
46+
}
47+
48+
impl<R> Service<Uri> for UnixConnector<R>
49+
where
50+
R: Resolve + Clone + Send + Sync + 'static,
51+
R::Future: Send,
52+
{
53+
type Response = UnixStream;
54+
55+
type Error = crate::Error;
56+
57+
type Future = crate::BoxFuture<Self::Response, Self::Error>;
58+
59+
fn poll_ready(
60+
&mut self,
61+
cx: &mut std::task::Context<'_>,
62+
) -> std::task::Poll<Result<(), Self::Error>> {
63+
self.resolver.poll_ready(cx).map_err(|err| err.into())
64+
}
65+
66+
fn call(&mut self, uri: Uri) -> Self::Future {
67+
let mut inner = self.clone();
68+
69+
Box::pin(async move { inner.call_async(uri).await })
70+
}
71+
}
72+
73+
impl<R> UnixConnector<R>
74+
where
75+
R: Resolve + Send + Sync + 'static,
76+
{
77+
async fn call_async(&mut self, uri: Uri) -> Result<UnixStream, crate::Error> {
78+
let host = uri.host().unwrap();
79+
let port = uri.port_u16().unwrap();
80+
81+
let addr = if let Ok(addr) = host.parse::<Ipv4Addr>() {
82+
tracing::info!("host is ip address: {:?}", host);
83+
SocketAddr::V4(SocketAddrV4::new(addr, port))
84+
} else {
85+
tracing::info!("host is dns: {:?}", host);
86+
let addrs = self
87+
.resolver
88+
.resolve(Name::from_str(host).unwrap())
89+
.await
90+
.map_err(|err| err.into())?;
91+
let addrs: Vec<SocketAddr> = addrs
92+
.map(|mut addr| {
93+
addr.set_port(port);
94+
addr
95+
})
96+
.collect();
97+
addrs[0]
98+
};
99+
100+
tracing::debug!("uri:{:?}, ip:port : {}", &addr, addr.to_string());
101+
102+
let conn = UnixStream::connect(addr.to_string().replace("unix://", "")).await?;
103+
104+
Ok(conn)
105+
}
106+
}

dubbo/src/triple/transport/listener/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
pub mod tcp_listener;
19+
pub mod unix_listener;
1920

2021
use std::net::SocketAddr;
2122

@@ -24,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
2425

2526
use super::io::BoxIO;
2627
pub use tcp_listener::TcpListener;
28+
pub use unix_listener::UnixListener;
2729

2830
#[async_trait]
2931
pub trait Listener: Send + Sync {
@@ -62,6 +64,7 @@ impl<T: Listener> Listener for WrappedListener<T> {
6264
pub async fn get_listener(name: String, addr: SocketAddr) -> Result<BoxListener, crate::Error> {
6365
match name.as_str() {
6466
"tcp" => Ok(TcpListener::bind(addr).await?.boxed()),
67+
"unix" => Ok(UnixListener::bind(addr).await?.boxed()),
6568
_ => {
6669
tracing::warn!("no support listener: {:?}", name);
6770
Err(Box::new(crate::status::DubboError::new(format!(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
use std::{net::SocketAddr, task};
19+
20+
use super::Listener;
21+
use async_trait::async_trait;
22+
use futures_core::Stream;
23+
use hyper::server::accept::Accept;
24+
use tokio::net::{UnixListener as tokioUnixListener, UnixStream};
25+
26+
pub struct UnixListener {
27+
inner: tokioUnixListener,
28+
path: String,
29+
}
30+
31+
impl UnixListener {
32+
pub async fn bind(addr: SocketAddr) -> std::io::Result<UnixListener> {
33+
let listener = tokioUnixListener::bind(format!("{}", addr.to_string()))?;
34+
35+
Ok(UnixListener {
36+
inner: listener,
37+
path: addr.to_string(),
38+
})
39+
}
40+
}
41+
42+
#[async_trait]
43+
impl Listener for UnixListener {
44+
type Conn = UnixStream;
45+
46+
async fn accept(&self) -> std::io::Result<(Self::Conn, SocketAddr)> {
47+
let (unix_stream, _unix_addr) = self.inner.accept().await?;
48+
let addr: SocketAddr = self.path.parse().unwrap();
49+
Ok((unix_stream, addr))
50+
}
51+
}
52+
53+
impl Stream for UnixListener {
54+
type Item = UnixStream;
55+
56+
fn poll_next(
57+
self: std::pin::Pin<&mut Self>,
58+
cx: &mut std::task::Context<'_>,
59+
) -> std::task::Poll<Option<Self::Item>> {
60+
self.inner.poll_accept(cx).map(|res| match res {
61+
Ok(data) => Some(data.0),
62+
Err(err) => {
63+
tracing::error!("UnixListener poll_next Error: {:?}", err);
64+
None
65+
}
66+
})
67+
}
68+
}
69+
70+
impl Accept for UnixListener {
71+
type Conn = UnixStream;
72+
73+
type Error = crate::Error;
74+
75+
fn poll_accept(
76+
self: std::pin::Pin<&mut Self>,
77+
cx: &mut task::Context<'_>,
78+
) -> std::task::Poll<Option<Result<Self::Conn, Self::Error>>> {
79+
self.inner.poll_accept(cx).map(|res| match res {
80+
Ok(data) => Some(Ok(data.0)),
81+
Err(err) => {
82+
tracing::error!("UnixListener poll_accept Error: {:?}", err);
83+
None
84+
}
85+
})
86+
}
87+
}

0 commit comments

Comments
 (0)