Skip to content

Commit fd8cef6

Browse files
authored
Merge pull request #34 from benoitc/feature/event-loop-optimization-v2
Event loop performance optimizations
2 parents 1ae7f62 + 2df726e commit fd8cef6

File tree

6 files changed

+129
-60
lines changed

6 files changed

+129
-60
lines changed

c_src/py_event_loop.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,9 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
10871087
/* Initialize fields */
10881088
memset(loop, 0, sizeof(erlang_event_loop_t));
10891089

1090+
/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
1091+
loop->pending_capacity = INITIAL_PENDING_CAPACITY;
1092+
10901093
if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
10911094
enif_release_resource(loop);
10921095
return make_error(env, "mutex_init_failed");
@@ -6410,6 +6413,9 @@ static PyObject *py_loop_new(PyObject *self, PyObject *args) {
64106413
/* Initialize fields */
64116414
memset(loop, 0, sizeof(erlang_event_loop_t));
64126415

6416+
/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
6417+
loop->pending_capacity = INITIAL_PENDING_CAPACITY;
6418+
64136419
if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
64146420
enif_release_resource(loop);
64156421
PyErr_SetString(PyExc_RuntimeError, "Failed to initialize mutex");
@@ -7446,6 +7452,9 @@ int create_default_event_loop(ErlNifEnv *env) {
74467452
/* Initialize fields */
74477453
memset(loop, 0, sizeof(erlang_event_loop_t));
74487454

7455+
/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
7456+
loop->pending_capacity = INITIAL_PENDING_CAPACITY;
7457+
74497458
if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
74507459
enif_release_resource(loop);
74517460
return -1;

c_src/py_event_loop.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,11 @@ typedef struct erlang_event_loop {
345345
uint32_t interp_id;
346346

347347
/* ========== Async Task Queue (uvloop-inspired) ========== */
348+
/*
349+
* Future optimization: Replace serialized task queue with native MPSC
350+
* ring buffer to avoid enif_term_to_binary/enif_binary_to_term overhead.
351+
* See task_entry_t/task_ring_t design in optimization plan.
352+
*/
348353

349354
/** @brief Python ErlangEventLoop instance (direct ref, no thread-local) */
350355
PyObject *py_loop;

priv/_erlang_impl/_loop.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,10 +1339,19 @@ async def _run_and_send(coro, caller_pid, ref):
13391339

13401340
try:
13411341
result = await coro
1342-
erlang.send(caller_pid, (async_result, ref, (ok, result)))
1342+
try:
1343+
erlang.send(caller_pid, (async_result, ref, (ok, result)))
1344+
except erlang.ProcessError:
1345+
pass # Caller gone, nothing to do
13431346
except asyncio.CancelledError:
1344-
erlang.send(caller_pid, (async_result, ref, (error, 'cancelled')))
1347+
try:
1348+
erlang.send(caller_pid, (async_result, ref, (error, 'cancelled')))
1349+
except erlang.ProcessError:
1350+
pass # Caller gone, nothing to do
13451351
except Exception as e:
13461352
import traceback
13471353
tb = traceback.format_exc()
1348-
erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}')))
1354+
try:
1355+
erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}')))
1356+
except erlang.ProcessError:
1357+
pass # Caller gone, nothing to do

priv/_erlang_impl/_transport.py

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -69,29 +69,39 @@ async def _start(self):
6969
self._loop.call_soon(self._protocol.connection_made, self)
7070
self._loop.add_reader(self._fileno, self._read_ready)
7171

72+
# Maximum reads per callback to avoid starving other events
73+
_max_reads_per_call = 16
74+
7275
def _read_ready(self):
73-
"""Called when data is available to read."""
76+
"""Called when data is available to read.
77+
78+
Drains socket until EAGAIN with a budget to avoid starvation.
79+
"""
7480
if self._conn_lost:
7581
return
76-
try:
77-
data = self._sock.recv(self.max_size)
78-
except (BlockingIOError, InterruptedError):
79-
return
80-
except Exception as exc:
81-
self._fatal_error(exc, 'Fatal read error')
82-
return
8382

84-
if data:
85-
self._protocol.data_received(data)
86-
else:
87-
# Connection closed (EOF received)
88-
self._loop.remove_reader(self._fileno)
89-
keep_open = self._protocol.eof_received()
90-
# If eof_received returns False/None, close the transport
91-
if not keep_open:
92-
self._closing = True
93-
self._conn_lost += 1
94-
self._call_connection_lost(None)
83+
for _ in range(self._max_reads_per_call):
84+
try:
85+
data = self._sock.recv(self.max_size)
86+
except (BlockingIOError, InterruptedError):
87+
# EAGAIN - no more data available
88+
return
89+
except Exception as exc:
90+
self._fatal_error(exc, 'Fatal read error')
91+
return
92+
93+
if data:
94+
self._protocol.data_received(data)
95+
else:
96+
# Connection closed (EOF received)
97+
self._loop.remove_reader(self._fileno)
98+
keep_open = self._protocol.eof_received()
99+
# If eof_received returns False/None, close the transport
100+
if not keep_open:
101+
self._closing = True
102+
self._conn_lost += 1
103+
self._call_connection_lost(None)
104+
return
95105

96106
def write(self, data):
97107
"""Write data to the transport."""
@@ -122,30 +132,38 @@ def write(self, data):
122132

123133
self._buffer.extend(data)
124134

135+
# Maximum writes per callback to avoid starving other events
136+
_max_writes_per_call = 16
137+
125138
def _write_ready_cb(self):
126-
"""Called when socket is ready for writing."""
127-
remaining = len(self._buffer) - self._buffer_offset
128-
if remaining <= 0:
129-
self._loop.remove_writer(self._fileno)
130-
if self._closing:
131-
self._call_connection_lost(None)
132-
return
139+
"""Called when socket is ready for writing.
140+
141+
Drains buffer until EAGAIN with a budget to avoid starvation.
142+
"""
143+
for _ in range(self._max_writes_per_call):
144+
remaining = len(self._buffer) - self._buffer_offset
145+
if remaining <= 0:
146+
self._loop.remove_writer(self._fileno)
147+
if self._closing:
148+
self._call_connection_lost(None)
149+
return
133150

134-
try:
135-
# Use memoryview with offset for O(1) access to remaining data
136-
data_view = memoryview(self._buffer)[self._buffer_offset:]
137-
n = self._sock.send(data_view)
138-
except (BlockingIOError, InterruptedError):
139-
return
140-
except Exception as exc:
141-
self._loop.remove_writer(self._fileno)
142-
self._fatal_error(exc, 'Fatal write error')
143-
return
151+
try:
152+
# Use memoryview with offset for O(1) access to remaining data
153+
data_view = memoryview(self._buffer)[self._buffer_offset:]
154+
n = self._sock.send(data_view)
155+
except (BlockingIOError, InterruptedError):
156+
# EAGAIN - socket buffer full
157+
return
158+
except Exception as exc:
159+
self._loop.remove_writer(self._fileno)
160+
self._fatal_error(exc, 'Fatal write error')
161+
return
144162

145-
if n:
146-
self._buffer_offset += n # O(1) offset update instead of O(n) deletion
163+
if n:
164+
self._buffer_offset += n # O(1) offset update instead of O(n) deletion
147165

148-
# Check if buffer is fully consumed
166+
# Check if buffer is fully consumed after budget exhausted
149167
if self._buffer_offset >= len(self._buffer):
150168
# Reset buffer when fully consumed
151169
self._buffer = self._buffer_factory()
@@ -258,6 +276,9 @@ class ErlangDatagramTransport(transports.DatagramTransport):
258276

259277
max_size = 256 * 1024 # 256 KB
260278

279+
# Maximum reads per callback to avoid starving other events
280+
_max_reads_per_call = 16
281+
261282
def __init__(self, loop, sock, protocol, address=None, extra=None):
262283
super().__init__(extra)
263284
self._loop = loop
@@ -282,21 +303,27 @@ async def _start(self):
282303
self._loop.add_reader(self._fileno, self._read_ready)
283304

284305
def _read_ready(self):
285-
"""Called when data is available to read."""
306+
"""Called when data is available to read.
307+
308+
Drains socket until EAGAIN with a budget to avoid starvation.
309+
"""
286310
if self._conn_lost:
287311
return
288-
try:
289-
data, addr = self._sock.recvfrom(self.max_size)
290-
except (BlockingIOError, InterruptedError):
291-
return
292-
except OSError as exc:
293-
self._protocol.error_received(exc)
294-
return
295-
except Exception as exc:
296-
self._fatal_error(exc, 'Fatal read error on datagram transport')
297-
return
298312

299-
self._protocol.datagram_received(data, addr)
313+
for _ in range(self._max_reads_per_call):
314+
try:
315+
data, addr = self._sock.recvfrom(self.max_size)
316+
except (BlockingIOError, InterruptedError):
317+
# EAGAIN - no more data available
318+
return
319+
except OSError as exc:
320+
self._protocol.error_received(exc)
321+
return
322+
except Exception as exc:
323+
self._fatal_error(exc, 'Fatal read error on datagram transport')
324+
return
325+
326+
self._protocol.datagram_received(data, addr)
300327

301328
def sendto(self, data, addr=None):
302329
"""Send data to the transport."""

src/py.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,8 @@ deactivate_venv() ->
10891089
%% Returns a map with venv_path and site_packages, or none if no venv is active.
10901090
-spec venv_info() -> {ok, map() | none} | {error, term()}.
10911091
venv_info() ->
1092-
Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if hasattr(__import__('sys'), '_active_venv') else {'active': False})">>,
1092+
%% Check both attributes exist to handle partial activation/deactivation state
1093+
Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if (hasattr(__import__('sys'), '_active_venv') and hasattr(__import__('sys'), '_venv_site_packages')) else {'active': False})">>,
10931094
eval(Code).
10941095

10951096
%% @private

src/py_event_worker.erl

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ handle_cast(_Msg, State) -> {noreply, State}.
4646
handle_info({select, FdRes, _Ref, ready_input}, State) ->
4747
py_nif:handle_fd_event_and_reselect(FdRes, read),
4848
%% Trigger event processing after FD event dispatch
49-
self() ! task_ready,
49+
maybe_send_task_ready(),
5050
{noreply, State};
5151

5252
handle_info({select, FdRes, _Ref, ready_output}, State) ->
5353
py_nif:handle_fd_event_and_reselect(FdRes, write),
5454
%% Trigger event processing after FD event dispatch
55-
self() ! task_ready,
55+
maybe_send_task_ready(),
5656
{noreply, State};
5757

5858
handle_info({start_timer, _LoopRef, DelayMs, CallbackId, TimerRef}, State) ->
@@ -86,7 +86,7 @@ handle_info({timeout, TimerRef}, State) ->
8686
NewTimers = maps:remove(TimerRef, Timers),
8787
%% Trigger event processing after timer dispatch
8888
%% This ensures _run_once is called to handle the timer callback
89-
self() ! task_ready,
89+
maybe_send_task_ready(),
9090
{noreply, State#state{timers = NewTimers}}
9191
end;
9292

@@ -96,6 +96,8 @@ handle_info({select, _FdRes, _Ref, cancelled}, State) -> {noreply, State};
9696
%% This is sent via enif_send when a new async task is submitted.
9797
%% Uses a drain-until-empty loop to handle tasks submitted during processing.
9898
handle_info(task_ready, #state{loop_ref = LoopRef} = State) ->
99+
%% Clear the pending flag - we're processing now
100+
put(task_ready_pending, false),
99101
drain_tasks_loop(LoopRef),
100102
{noreply, State};
101103

@@ -121,7 +123,9 @@ drain_tasks_loop(LoopRef) ->
121123
ok ->
122124
%% Check if more task_ready messages arrived during processing
123125
receive
124-
task_ready -> drain_tasks_loop(LoopRef)
126+
task_ready ->
127+
put(task_ready_pending, false),
128+
drain_tasks_loop(LoopRef)
125129
after 0 ->
126130
ok
127131
end;
@@ -130,7 +134,7 @@ drain_tasks_loop(LoopRef) ->
130134
%% Send task_ready to self and return, allowing the gen_server
131135
%% to process other messages (select, timers) before continuing.
132136
%% This prevents starvation under sustained task traffic.
133-
self() ! task_ready,
137+
maybe_send_task_ready(),
134138
ok;
135139
{error, py_loop_not_set} ->
136140
ok;
@@ -141,3 +145,17 @@ drain_tasks_loop(LoopRef) ->
141145
error_logger:warning_msg("py_event_worker: task processing failed: ~p~n", [Reason]),
142146
ok
143147
end.
148+
149+
%% @doc Send task_ready message only if one isn't already pending.
150+
%% Uses process dictionary to coalesce multiple wakeup requests.
151+
maybe_send_task_ready() ->
152+
case get(task_ready_pending) of
153+
true ->
154+
%% Already pending, no need to send another
155+
ok;
156+
_ ->
157+
%% No pending message, send one and mark as pending
158+
put(task_ready_pending, true),
159+
self() ! task_ready,
160+
ok
161+
end.

0 commit comments

Comments
 (0)