Skip to content

Commit ffd52e5

Browse files
Use ThreadSelectorEventLoop on Windows with ProactorEventLoop (#5)
1 parent 0aeca27 commit ffd52e5

File tree

4 files changed

+412
-34
lines changed

4 files changed

+412
-34
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ classifiers = [
2828
requires-python = ">= 3.9"
2929
dependencies = [
3030
"anyio",
31+
"sniffio",
3132
"anyioutils >=0.4.6",
3233
"pyzmq >=26.0.0,<27.0.0",
3334
]

src/zmq_anyio/_selector_thread.py

Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2+
Running select in a thread and defining these methods on the running event loop.
3+
Originally in tornado.platform.asyncio.
4+
Redistributed under license Apache-2.0
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import asyncio
10+
import atexit
11+
import errno
12+
import functools
13+
import select
14+
import socket
15+
import sys
16+
import threading
17+
import typing
18+
from typing import (
19+
Any,
20+
Callable,
21+
Union,
22+
)
23+
from weakref import WeakKeyDictionary
24+
25+
from sniffio import current_async_library
26+
27+
if typing.TYPE_CHECKING:
28+
from typing_extensions import Protocol
29+
30+
class _HasFileno(Protocol):
31+
def fileno(self) -> int:
32+
pass
33+
34+
_FileDescriptorLike = Union[int, _HasFileno]
35+
36+
37+
# Collection of selector thread event loops to shut down on exit.
38+
_selector_loops: set[SelectorThread] = set()
39+
40+
41+
def _atexit_callback() -> None:
42+
for loop in _selector_loops:
43+
with loop._select_cond:
44+
loop._closing_selector = True
45+
loop._select_cond.notify()
46+
try:
47+
loop._waker_w.send(b"a")
48+
except BlockingIOError:
49+
pass
50+
# If we don't join our (daemon) thread here, we may get a deadlock
51+
# during interpreter shutdown. I don't really understand why. This
52+
# deadlock happens every time in CI (both travis and appveyor) but
53+
# I've never been able to reproduce locally.
54+
assert loop._thread is not None
55+
loop._thread.join()
56+
_selector_loops.clear()
57+
58+
59+
atexit.register(_atexit_callback)
60+
61+
62+
# SelectorThread from tornado 6.4.0
63+
64+
65+
class SelectorThread:
66+
"""Define ``add_reader`` methods to be called in a background select thread.
67+
68+
Instances of this class start a second thread to run a selector.
69+
This thread is completely hidden from the user;
70+
all callbacks are run on the wrapped event loop's thread.
71+
72+
Typically used via ``AddThreadSelectorEventLoop``,
73+
but can be attached to a running asyncio loop.
74+
"""
75+
76+
_closed = False
77+
78+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
79+
self._real_loop = real_loop
80+
81+
self._select_cond = threading.Condition()
82+
self._select_args: (
83+
tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
84+
) = None
85+
self._closing_selector = False
86+
self._thread: threading.Thread | None = None
87+
self._thread_manager_handle = self._thread_manager()
88+
89+
async def thread_manager_anext() -> None:
90+
# the anext builtin wasn't added until 3.10. We just need to iterate
91+
# this generator one step.
92+
await self._thread_manager_handle.__anext__()
93+
94+
# When the loop starts, start the thread. Not too soon because we can't
95+
# clean up if we get to this point but the event loop is closed without
96+
# starting.
97+
self._real_loop.call_soon(
98+
lambda: self._real_loop.create_task(thread_manager_anext())
99+
)
100+
101+
self._readers: dict[_FileDescriptorLike, Callable] = {}
102+
self._writers: dict[_FileDescriptorLike, Callable] = {}
103+
104+
# Writing to _waker_w will wake up the selector thread, which
105+
# watches for _waker_r to be readable.
106+
self._waker_r, self._waker_w = socket.socketpair()
107+
self._waker_r.setblocking(False)
108+
self._waker_w.setblocking(False)
109+
_selector_loops.add(self)
110+
self.add_reader(self._waker_r, self._consume_waker)
111+
112+
def close(self) -> None:
113+
if self._closed:
114+
return
115+
with self._select_cond:
116+
self._closing_selector = True
117+
self._select_cond.notify()
118+
self._wake_selector()
119+
if self._thread is not None:
120+
self._thread.join()
121+
_selector_loops.discard(self)
122+
self.remove_reader(self._waker_r)
123+
self._waker_r.close()
124+
self._waker_w.close()
125+
self._closed = True
126+
127+
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
128+
# Create a thread to run the select system call. We manage this thread
129+
# manually so we can trigger a clean shutdown from an atexit hook. Note
130+
# that due to the order of operations at shutdown, only daemon threads
131+
# can be shut down in this way (non-daemon threads would require the
132+
# introduction of a new hook: https://bugs.python.org/issue41962)
133+
self._thread = threading.Thread(
134+
name="Tornado selector",
135+
daemon=True,
136+
target=self._run_select,
137+
)
138+
self._thread.start()
139+
self._start_select()
140+
try:
141+
# The presense of this yield statement means that this coroutine
142+
# is actually an asynchronous generator, which has a special
143+
# shutdown protocol. We wait at this yield point until the
144+
# event loop's shutdown_asyncgens method is called, at which point
145+
# we will get a GeneratorExit exception and can shut down the
146+
# selector thread.
147+
yield
148+
except GeneratorExit:
149+
self.close()
150+
raise
151+
152+
def _wake_selector(self) -> None:
153+
if self._closed:
154+
return
155+
try:
156+
self._waker_w.send(b"a")
157+
except BlockingIOError:
158+
pass
159+
160+
def _consume_waker(self) -> None:
161+
try:
162+
self._waker_r.recv(1024)
163+
except BlockingIOError:
164+
pass
165+
166+
def _start_select(self) -> None:
167+
# Capture reader and writer sets here in the event loop
168+
# thread to avoid any problems with concurrent
169+
# modification while the select loop uses them.
170+
with self._select_cond:
171+
assert self._select_args is None
172+
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
173+
self._select_cond.notify()
174+
175+
def _run_select(self) -> None:
176+
while True:
177+
with self._select_cond:
178+
while self._select_args is None and not self._closing_selector:
179+
self._select_cond.wait()
180+
if self._closing_selector:
181+
return
182+
assert self._select_args is not None
183+
to_read, to_write = self._select_args
184+
self._select_args = None
185+
186+
# We use the simpler interface of the select module instead of
187+
# the more stateful interface in the selectors module because
188+
# this class is only intended for use on windows, where
189+
# select.select is the only option. The selector interface
190+
# does not have well-documented thread-safety semantics that
191+
# we can rely on so ensuring proper synchronization would be
192+
# tricky.
193+
try:
194+
# On windows, selecting on a socket for write will not
195+
# return the socket when there is an error (but selecting
196+
# for reads works). Also select for errors when selecting
197+
# for writes, and merge the results.
198+
#
199+
# This pattern is also used in
200+
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
201+
rs, ws, xs = select.select(to_read, to_write, to_write)
202+
ws = ws + xs
203+
except OSError as e:
204+
# After remove_reader or remove_writer is called, the file
205+
# descriptor may subsequently be closed on the event loop
206+
# thread. It's possible that this select thread hasn't
207+
# gotten into the select system call by the time that
208+
# happens in which case (at least on macOS), select may
209+
# raise a "bad file descriptor" error. If we get that
210+
# error, check and see if we're also being woken up by
211+
# polling the waker alone. If we are, just return to the
212+
# event loop and we'll get the updated set of file
213+
# descriptors on the next iteration. Otherwise, raise the
214+
# original error.
215+
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
216+
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
217+
if rs:
218+
ws = []
219+
else:
220+
raise
221+
else:
222+
raise
223+
224+
try:
225+
self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
226+
except RuntimeError:
227+
# "Event loop is closed". Swallow the exception for
228+
# consistency with PollIOLoop (and logical consistency
229+
# with the fact that we can't guarantee that an
230+
# add_callback that completes without error will
231+
# eventually execute).
232+
pass
233+
except AttributeError:
234+
# ProactorEventLoop may raise this instead of RuntimeError
235+
# if call_soon_threadsafe races with a call to close().
236+
# Swallow it too for consistency.
237+
pass
238+
239+
def _handle_select(
240+
self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
241+
) -> None:
242+
for r in rs:
243+
self._handle_event(r, self._readers)
244+
for w in ws:
245+
self._handle_event(w, self._writers)
246+
self._start_select()
247+
248+
def _handle_event(
249+
self,
250+
fd: _FileDescriptorLike,
251+
cb_map: dict[_FileDescriptorLike, Callable],
252+
) -> None:
253+
try:
254+
callback = cb_map[fd]
255+
except KeyError:
256+
return
257+
callback()
258+
259+
def add_reader(
260+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
261+
) -> None:
262+
self._readers[fd] = functools.partial(callback, *args)
263+
self._wake_selector()
264+
265+
def add_writer(
266+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
267+
) -> None:
268+
self._writers[fd] = functools.partial(callback, *args)
269+
self._wake_selector()
270+
271+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
272+
try:
273+
del self._readers[fd]
274+
except KeyError:
275+
return False
276+
self._wake_selector()
277+
return True
278+
279+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
280+
try:
281+
del self._writers[fd]
282+
except KeyError:
283+
return False
284+
self._wake_selector()
285+
return True
286+
287+
288+
# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0
289+
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
290+
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
291+
292+
Instances of this class start a second thread to run a selector.
293+
This thread is completely hidden from the user; all callbacks are
294+
run on the wrapped event loop's thread.
295+
296+
This class is used automatically by Tornado; applications should not need
297+
to refer to it directly.
298+
299+
It is safe to wrap any event loop with this class, although it only makes sense
300+
for event loops that do not implement the ``add_reader`` family of methods
301+
themselves (i.e. ``WindowsProactorEventLoop``)
302+
303+
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
304+
"""
305+
306+
# This class is a __getattribute__-based proxy. All attributes other than those
307+
# in this set are proxied through to the underlying loop.
308+
MY_ATTRIBUTES = {
309+
"_real_loop",
310+
"_selector",
311+
"add_reader",
312+
"add_writer",
313+
"close",
314+
"remove_reader",
315+
"remove_writer",
316+
}
317+
318+
def __getattribute__(self, name: str) -> Any:
319+
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
320+
return super().__getattribute__(name)
321+
return getattr(self._real_loop, name)
322+
323+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
324+
self._real_loop = real_loop
325+
self._selector = SelectorThread(real_loop)
326+
327+
def close(self) -> None:
328+
self._selector.close()
329+
self._real_loop.close()
330+
331+
def add_reader( # type: ignore[override]
332+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
333+
) -> None:
334+
return self._selector.add_reader(fd, callback, *args)
335+
336+
def add_writer( # type: ignore[override]
337+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
338+
) -> None:
339+
return self._selector.add_writer(fd, callback, *args)
340+
341+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
342+
return self._selector.remove_reader(fd)
343+
344+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
345+
return self._selector.remove_writer(fd)
346+
347+
348+
# registry of asyncio loop : selector thread
349+
_selectors: WeakKeyDictionary = WeakKeyDictionary()
350+
351+
352+
def _set_selector_windows() -> None:
353+
"""Set selector-compatible loop.
354+
Sets ``add_reader`` family of methods on the asyncio loop.
355+
Workaround Windows proactor removal of *reader methods.
356+
"""
357+
if not (
358+
sys.platform == "win32"
359+
and current_async_library() == "asyncio"
360+
and asyncio.get_event_loop_policy().__class__.__name__
361+
== "WindowsProactorEventLoopPolicy"
362+
):
363+
return
364+
365+
asyncio_loop = asyncio.get_running_loop()
366+
if asyncio_loop in _selectors:
367+
return
368+
369+
from ._selector_thread import AddThreadSelectorEventLoop
370+
371+
selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract]
372+
asyncio_loop
373+
)
374+
375+
# patch loop.close to also close the selector thread
376+
loop_close = asyncio_loop.close
377+
378+
def _close_selector_and_loop() -> None:
379+
# restore original before calling selector.close,
380+
# which in turn calls eventloop.close!
381+
asyncio_loop.close = loop_close # type: ignore[method-assign]
382+
_selectors.pop(asyncio_loop, None)
383+
selector_loop.close()
384+
385+
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]
386+
asyncio_loop.add_reader = selector_loop.add_reader # type: ignore[assignment]
387+
asyncio_loop.remove_reader = selector_loop.remove_reader # type: ignore[method-assign]

0 commit comments

Comments
 (0)