Skip to content

Commit df348cf

Browse files
author
Devdutt Shenoi
committed
refactor: AsyncReadWrite (#822)
1 parent 353a477 commit df348cf

File tree

7 files changed

+25
-24
lines changed

7 files changed

+25
-24
lines changed

rumqttc/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414

1515
### Changed
1616

17+
* rename `N` as `AsyncReadWrite` to describe usage.
18+
1719
### Deprecated
1820

1921
### Removed

rumqttc/src/eventloop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{framed::Network, Transport};
22
use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
33
use crate::{MqttOptions, Outgoing};
44

5-
use crate::framed::N;
5+
use crate::framed::AsyncReadWrite;
66
use crate::mqttbytes::v4::*;
77
use flume::{bounded, Receiver, Sender};
88
use tokio::net::{lookup_host, TcpSocket, TcpStream};
@@ -369,7 +369,7 @@ async fn network_connect(
369369
_ => options.broker_address(),
370370
};
371371

372-
let tcp_stream: Box<dyn N> = {
372+
let tcp_stream: Box<dyn AsyncReadWrite> = {
373373
#[cfg(feature = "proxy")]
374374
match options.proxy() {
375375
Some(proxy) => proxy.connect(&domain, port, network_options).await?,

rumqttc/src/framed.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::io;
1010
/// appropriate to achieve performance
1111
pub struct Network {
1212
/// Socket for IO
13-
socket: Box<dyn N>,
13+
socket: Box<dyn AsyncReadWrite>,
1414
/// Buffered reads
1515
read: BytesMut,
1616
/// Maximum packet size
@@ -20,8 +20,8 @@ pub struct Network {
2020
}
2121

2222
impl Network {
23-
pub fn new(socket: impl N + 'static, max_incoming_size: usize) -> Network {
24-
let socket = Box::new(socket) as Box<dyn N>;
23+
pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: usize) -> Network {
24+
let socket = Box::new(socket) as Box<dyn AsyncReadWrite>;
2525
Network {
2626
socket,
2727
read: BytesMut::with_capacity(10 * 1024),
@@ -117,5 +117,5 @@ impl Network {
117117
}
118118
}
119119

120-
pub trait N: AsyncRead + AsyncWrite + Send + Unpin {}
121-
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin {}
120+
pub trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin {}
121+
impl<T> AsyncReadWrite for T where T: AsyncRead + AsyncWrite + Send + Unpin {}

rumqttc/src/proxy.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::eventloop::socket_connect;
2-
use crate::framed::N;
2+
use crate::framed::AsyncReadWrite;
33
use crate::NetworkOptions;
44

55
use std::io;
@@ -46,10 +46,10 @@ impl Proxy {
4646
broker_addr: &str,
4747
broker_port: u16,
4848
network_options: NetworkOptions,
49-
) -> Result<Box<dyn N>, ProxyError> {
49+
) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
5050
let proxy_addr = format!("{}:{}", self.addr, self.port);
5151

52-
let tcp: Box<dyn N> = Box::new(socket_connect(proxy_addr, network_options).await?);
52+
let tcp: Box<dyn AsyncReadWrite> = Box::new(socket_connect(proxy_addr, network_options).await?);
5353
let mut tcp = match self.ty {
5454
ProxyType::Http => tcp,
5555
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
@@ -67,7 +67,7 @@ impl ProxyAuth {
6767
self,
6868
host: &str,
6969
port: u16,
70-
tcp_stream: &mut Box<dyn N>,
70+
tcp_stream: &mut Box<dyn AsyncReadWrite>,
7171
) -> Result<(), ProxyError> {
7272
match self {
7373
Self::None => async_http_proxy::http_connect_tokio(tcp_stream, host, port).await?,

rumqttc/src/tls.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::io::{BufReader, Cursor};
1616
#[cfg(feature = "use-rustls")]
1717
use std::sync::Arc;
1818

19-
use crate::framed::N;
19+
use crate::framed::AsyncReadWrite;
2020
use crate::TlsConfiguration;
2121

2222
#[cfg(feature = "use-native-tls")]
@@ -166,9 +166,9 @@ pub async fn tls_connect(
166166
addr: &str,
167167
_port: u16,
168168
tls_config: &TlsConfiguration,
169-
tcp: Box<dyn N>,
170-
) -> Result<Box<dyn N>, Error> {
171-
let tls: Box<dyn N> = match tls_config {
169+
tcp: Box<dyn AsyncReadWrite>,
170+
) -> Result<Box<dyn AsyncReadWrite>, Error> {
171+
let tls: Box<dyn AsyncReadWrite> = match tls_config {
172172
#[cfg(feature = "use-rustls")]
173173
TlsConfiguration::Simple { .. } | TlsConfiguration::Rustls(_) => {
174174
let connector = rustls_connector(tls_config).await?;

rumqttc/src/v5/eventloop.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::framed::Network;
22
use super::mqttbytes::v5::*;
33
use super::{Incoming, MqttOptions, MqttState, Outgoing, Request, StateError, Transport};
44
use crate::eventloop::socket_connect;
5-
use crate::framed::N;
5+
use crate::framed::AsyncReadWrite;
66

77
use flume::{bounded, Receiver, Sender};
88
use tokio::select;
@@ -304,7 +304,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
304304
_ => options.broker_address(),
305305
};
306306

307-
let tcp_stream: Box<dyn N> = {
307+
let tcp_stream: Box<dyn AsyncReadWrite> = {
308308
#[cfg(feature = "proxy")]
309309
match options.proxy() {
310310
Some(proxy) => {

rumqttc/src/v5/framed.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use bytes::BytesMut;
2-
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
2+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3+
4+
use crate::framed::AsyncReadWrite;
35

46
use super::mqttbytes;
57
use super::mqttbytes::v5::{Connect, Login, Packet};
@@ -11,7 +13,7 @@ use std::io;
1113
/// appropriate to achieve performance
1214
pub struct Network {
1315
/// Socket for IO
14-
socket: Box<dyn N>,
16+
socket: Box<dyn AsyncReadWrite>,
1517
/// Buffered reads
1618
read: BytesMut,
1719
/// Maximum packet size
@@ -21,8 +23,8 @@ pub struct Network {
2123
}
2224

2325
impl Network {
24-
pub fn new(socket: impl N + 'static, max_incoming_size: Option<usize>) -> Network {
25-
let socket = Box::new(socket) as Box<dyn N>;
26+
pub fn new(socket: impl AsyncReadWrite + 'static, max_incoming_size: Option<usize>) -> Network {
27+
let socket = Box::new(socket) as Box<dyn AsyncReadWrite>;
2628
Network {
2729
socket,
2830
read: BytesMut::with_capacity(10 * 1024),
@@ -127,6 +129,3 @@ impl Network {
127129
Ok(())
128130
}
129131
}
130-
131-
pub trait N: AsyncRead + AsyncWrite + Send + Unpin {}
132-
impl<T> N for T where T: AsyncRead + AsyncWrite + Send + Unpin {}

0 commit comments

Comments
 (0)