forked from tokio-rs/tokio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
named-pipe-ready.rs
161 lines (127 loc) · 4.23 KB
/
named-pipe-ready.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use std::io;
#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use tokio::io::Interest;
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";
let server = ServerOptions::new().create(PIPE_NAME)?;
let server = tokio::spawn(async move {
// Note: we wait for a client to connect.
server.connect().await?;
let buf = {
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
loop {
server.readable().await?;
let buf = &mut read_buf[read_buf_cursor..];
match server.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
read_buf
};
{
let write_buf = b"pong\n";
let mut write_buf_cursor = 0;
loop {
let buf = &write_buf[write_buf_cursor..];
if buf.is_empty() {
break;
}
server.writable().await?;
match server.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
Ok::<_, io::Error>(buf)
});
let client = tokio::spawn(async move {
// There's no need to use a connect loop here, since we know that the
// server is already up - `open` was called before spawning any of the
// tasks.
let client = ClientOptions::new().open(PIPE_NAME)?;
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;
let write_buf = b"ping\n";
let mut write_buf_cursor = 0;
loop {
let mut interest = Interest::READABLE;
if write_buf_cursor < write_buf.len() {
interest |= Interest::WRITABLE;
}
let ready = client.ready(interest).await?;
if ready.is_readable() {
let buf = &mut read_buf[read_buf_cursor..];
match client.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;
if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
if ready.is_writable() {
let buf = &write_buf[write_buf_cursor..];
if buf.is_empty() {
continue;
}
match client.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
let buf = String::from_utf8_lossy(&read_buf).into_owned();
Ok::<_, io::Error>(buf)
});
let (server, client) = tokio::try_join!(server, client)?;
assert_eq!(server?, *b"ping\n");
assert_eq!(client?, "pong\n");
Ok(())
}
#[tokio::main]
async fn main() -> io::Result<()> {
#[cfg(windows)]
{
windows_main().await?;
}
#[cfg(not(windows))]
{
println!("Named pipes are only supported on Windows!");
}
Ok(())
}