Skip to content

Commit c30db89

Browse files
committed
fix: re-registering interest on Windows (#274)
1 parent 90c9c3e commit c30db89

File tree

4 files changed

+54
-30
lines changed

4 files changed

+54
-30
lines changed

monoio/src/driver/legacy/iocp/mod.rs

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ impl Poller {
128128
token: mio::Token,
129129
interests: mio::Interest,
130130
) -> std::io::Result<()> {
131-
if state.inner.is_none() {
131+
let mut state_inner = state.inner.lock().unwrap();
132+
if state_inner.inner.is_none() {
132133
let flags = interests_to_afd_flags(interests);
133134

134135
let inner = {
@@ -143,9 +144,9 @@ impl Poller {
143144

144145
self.queue_state(inner.clone());
145146
unsafe { self.update_sockets_events_if_polling()? };
146-
state.inner = Some(inner);
147-
state.token = token;
148-
state.interest = interests;
147+
state_inner.inner = Some(inner);
148+
state_inner.token = token;
149+
state_inner.interest = interests;
149150

150151
Ok(())
151152
} else {
@@ -155,37 +156,31 @@ impl Poller {
155156

156157
pub fn reregister(
157158
&self,
158-
state: &mut SocketState,
159+
state: Pin<Arc<Mutex<SockState>>>,
159160
token: mio::Token,
160161
interests: mio::Interest,
161162
) -> std::io::Result<()> {
162-
if let Some(inner) = state.inner.as_mut() {
163-
{
164-
let event = Event {
165-
flags: interests_to_afd_flags(interests),
166-
data: token.0 as u64,
167-
};
168-
169-
inner.lock().unwrap().set_event(event);
170-
}
171-
172-
state.token = token;
173-
state.interest = interests;
163+
{
164+
let event = Event {
165+
flags: interests_to_afd_flags(interests),
166+
data: token.0 as u64,
167+
};
174168

175-
self.queue_state(inner.clone());
176-
unsafe { self.update_sockets_events_if_polling() }
177-
} else {
178-
Err(std::io::ErrorKind::NotFound.into())
169+
state.lock().unwrap().set_event(event);
179170
}
171+
172+
self.queue_state(state.clone());
173+
unsafe { self.update_sockets_events_if_polling() }
180174
}
181175

182176
pub fn deregister(&mut self, state: &mut SocketState) -> std::io::Result<()> {
183-
if let Some(inner) = state.inner.as_mut() {
177+
let mut state_inner = state.inner.lock().unwrap();
178+
if let Some(inner) = state_inner.inner.as_mut() {
184179
{
185180
let mut sock_state = inner.lock().unwrap();
186181
sock_state.mark_delete();
187182
}
188-
state.inner = None;
183+
state_inner.inner = None;
189184
Ok(())
190185
} else {
191186
Err(std::io::ErrorKind::NotFound.into())

monoio/src/driver/legacy/iocp/state.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,27 @@ pub enum SockPollStatus {
2525
}
2626

2727
#[derive(Debug)]
28-
pub struct SocketState {
29-
pub socket: RawSocket,
28+
pub struct SocketStateInner {
3029
pub inner: Option<Pin<Arc<Mutex<SockState>>>>,
3130
pub token: mio::Token,
3231
pub interest: mio::Interest,
3332
}
3433

34+
#[derive(Debug)]
35+
pub struct SocketState {
36+
pub socket: RawSocket,
37+
pub inner: Arc<Mutex<SocketStateInner>>,
38+
}
39+
3540
impl SocketState {
3641
pub fn new(socket: RawSocket) -> Self {
3742
Self {
3843
socket,
39-
inner: None,
40-
token: mio::Token(0),
41-
interest: mio::Interest::READABLE,
44+
inner: Arc::new(Mutex::new(SocketStateInner {
45+
inner: None,
46+
token: mio::Token(0),
47+
interest: mio::Interest::READABLE,
48+
}))
4249
}
4350
}
4451
}

monoio/src/driver/legacy/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl LegacyDriver {
182182
interest: mio::Interest,
183183
) -> io::Result<usize> {
184184
let inner = unsafe { &mut *this.get() };
185-
let io = ScheduledIo::default();
185+
let io = ScheduledIo::new(state.inner.clone());
186186
let token = inner.io_dispatch.insert(io);
187187

188188
match inner.poll.register(state, mio::Token(token), interest) {
@@ -303,6 +303,21 @@ impl LegacyInner {
303303
flags: 0,
304304
}),
305305
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
306+
#[cfg(windows)]
307+
{
308+
if let Some((sock_state, token, interest)) = {
309+
let socket_state_lock = ref_mut.state.lock().unwrap();
310+
socket_state_lock.inner.clone().map(|inner| (inner, socket_state_lock.token, socket_state_lock.interest))
311+
} {
312+
if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
313+
return Poll::Ready(CompletionMeta {
314+
result: Err(e),
315+
flags: 0,
316+
});
317+
}
318+
}
319+
}
320+
306321
ref_mut.clear_readiness(direction.mask());
307322
ref_mut.set_waker(cx, direction);
308323
Poll::Pending

monoio/src/driver/scheduled_io.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@ pub(crate) struct ScheduledIo {
99
reader: Option<Waker>,
1010
/// Waker used for AsyncWrite.
1111
writer: Option<Waker>,
12+
13+
#[cfg(windows)]
14+
pub state: std::sync::Arc<std::sync::Mutex<super::legacy::iocp::SocketStateInner>>,
1215
}
1316

17+
18+
#[cfg(not(windows))]
1419
impl Default for ScheduledIo {
1520
#[inline]
1621
fn default() -> Self {
@@ -19,11 +24,13 @@ impl Default for ScheduledIo {
1924
}
2025

2126
impl ScheduledIo {
22-
pub(crate) const fn new() -> Self {
27+
pub(crate) const fn new(#[cfg(windows)] state: std::sync::Arc<std::sync::Mutex<super::legacy::iocp::SocketStateInner>>) -> Self {
2328
Self {
2429
readiness: Ready::EMPTY,
2530
reader: None,
2631
writer: None,
32+
#[cfg(windows)]
33+
state,
2734
}
2835
}
2936

0 commit comments

Comments
 (0)