Skip to content

Commit e391ce9

Browse files
committed
Add shared router for per-loop created loops
Per-loop created loops (via _loop_new()) now automatically use a shared router, enabling full asyncio support including timers and FD operations. Implementation (Option B - Shared Router with Loop Identity): - Add global shared router storage in C (g_shared_router) - Add set_shared_router/1 NIF to set the shared router - py_loop_new() automatically assigns shared router to new loops - Timer messages now include LoopRef for correct dispatch - Router stores LoopRef with each timer, dispatches to correct loop Test: - Add test_two_loops_with_timers verifying timer callbacks work - All 5 integration tests pass - All existing event loop tests pass
1 parent 0510bbe commit e391ce9

File tree

6 files changed

+133
-18
lines changed

6 files changed

+133
-18
lines changed

c_src/py_event_loop.c

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ static erlang_event_loop_t *g_python_event_loop;
9292
/* Global flag for isolation mode - set by Erlang via NIF */
9393
static volatile int g_isolation_mode = 0; /* 0 = global, 1 = per_loop */
9494

95+
/* Global shared router PID - set during init, used by all loops in per_loop mode */
96+
static ErlNifPid g_shared_router;
97+
static volatile int g_shared_router_valid = 0;
98+
9599
/**
96100
* Get the py_event_loop module for the current interpreter.
97101
* MUST be called with GIL held.
@@ -2329,6 +2333,21 @@ ERL_NIF_TERM nif_set_isolation_mode(ErlNifEnv *env, int argc,
23292333
return make_error(env, "invalid_mode");
23302334
}
23312335

2336+
/**
2337+
* Set the shared router PID for per-loop created loops.
2338+
* This router will be used by all loops created via _loop_new().
2339+
*/
2340+
ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc,
2341+
const ERL_NIF_TERM argv[]) {
2342+
(void)argc;
2343+
2344+
if (!enif_get_local_pid(env, argv[0], &g_shared_router)) {
2345+
return make_error(env, "invalid_pid");
2346+
}
2347+
g_shared_router_valid = 1;
2348+
return ATOM_OK;
2349+
}
2350+
23322351
/* Python function: _poll_events(timeout_ms) -> num_events */
23332352
static PyObject *py_poll_events(PyObject *self, PyObject *args) {
23342353
(void)self;
@@ -2952,6 +2971,12 @@ static PyObject *py_loop_new(PyObject *self, PyObject *args) {
29522971
loop->event_freelist = NULL;
29532972
loop->freelist_count = 0;
29542973

2974+
/* Use shared router if available (for per-loop mode) */
2975+
if (g_shared_router_valid) {
2976+
loop->router_pid = g_shared_router;
2977+
loop->has_router = true;
2978+
}
2979+
29552980
/* Create a capsule wrapping the loop pointer */
29562981
PyObject *capsule = PyCapsule_New(loop, LOOP_CAPSULE_NAME, loop_capsule_destructor);
29572982
if (capsule == NULL) {
@@ -3260,9 +3285,13 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) {
32603285
return NULL;
32613286
}
32623287

3263-
ERL_NIF_TERM msg = enif_make_tuple4(
3288+
/* Include loop resource in message so router dispatches to correct loop */
3289+
ERL_NIF_TERM loop_term = enif_make_resource(msg_env, loop);
3290+
3291+
ERL_NIF_TERM msg = enif_make_tuple5(
32643292
msg_env,
32653293
ATOM_START_TIMER,
3294+
loop_term,
32663295
enif_make_int(msg_env, delay_ms),
32673296
enif_make_uint64(msg_env, callback_id),
32683297
enif_make_uint64(msg_env, timer_ref_id)

c_src/py_nif.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1924,6 +1924,7 @@ static ErlNifFunc nif_funcs[] = {
19241924
/* Python event loop integration */
19251925
{"set_python_event_loop", 1, nif_set_python_event_loop, 0},
19261926
{"set_isolation_mode", 1, nif_set_isolation_mode, 0},
1927+
{"set_shared_router", 1, nif_set_shared_router, 0},
19271928

19281929
/* ASGI optimizations */
19291930
{"asgi_build_scope", 1, nif_asgi_build_scope, ERL_NIF_DIRTY_JOB_IO_BOUND},

src/py_event_loop.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ init([]) ->
9898
{ok, LoopRef} ->
9999
{ok, RouterPid} = py_event_router:start_link(LoopRef),
100100
ok = py_nif:event_loop_set_router(LoopRef, RouterPid),
101+
%% Set shared router for per-loop created loops
102+
%% All loops created via _loop_new() in Python will use this router
103+
ok = py_nif:set_shared_router(RouterPid),
101104
%% Make the event loop available to Python
102105
ok = py_nif:set_python_event_loop(LoopRef),
103106
%% Set ErlangEventLoop as the default asyncio policy

src/py_event_router.erl

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@
4242

4343
-record(state, {
4444
loop_ref :: reference(),
45-
%% Map of TimerRef -> {ErlangTimerRef, CallbackId}
46-
timers = #{} :: #{reference() => {reference(), non_neg_integer()}}
45+
%% Map of TimerRef -> {LoopRef, ErlangTimerRef, CallbackId}
46+
%% LoopRef is included to dispatch to the correct loop for per-loop timers
47+
timers = #{} :: #{non_neg_integer() => {reference(), reference(), non_neg_integer()}}
4748
}).
4849

4950
%% ============================================================================
@@ -100,12 +101,21 @@ handle_info({select, FdRes, _Ref, ready_output}, State) ->
100101
py_nif:reselect_writer_fd(FdRes),
101102
{noreply, State};
102103

103-
%% Handle timer start request from call_later NIF
104-
handle_info({start_timer, DelayMs, CallbackId, TimerRef}, State) ->
104+
%% Handle timer start request from call_later NIF (new format with LoopRef)
105+
handle_info({start_timer, LoopRef, DelayMs, CallbackId, TimerRef}, State) ->
105106
#state{timers = Timers} = State,
106107
%% Create the actual Erlang timer
107-
ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef, CallbackId}),
108-
NewTimers = maps:put(TimerRef, {ErlTimerRef, CallbackId}, Timers),
108+
ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}),
109+
%% Store LoopRef so we dispatch to the correct loop
110+
NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers),
111+
{noreply, State#state{timers = NewTimers}};
112+
113+
%% Handle timer start request (legacy format without LoopRef - uses state's loop_ref)
114+
handle_info({start_timer, DelayMs, CallbackId, TimerRef}, State) ->
115+
#state{loop_ref = LoopRef, timers = Timers} = State,
116+
%% Create the actual Erlang timer
117+
ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}),
118+
NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers),
109119
{noreply, State#state{timers = NewTimers}};
110120

111121
%% Handle timer cancellation
@@ -114,20 +124,26 @@ handle_info({cancel_timer, TimerRef}, State) ->
114124
case maps:get(TimerRef, Timers, undefined) of
115125
undefined ->
116126
{noreply, State};
117-
{ErlTimerRef, _CallbackId} ->
127+
{_LoopRef, ErlTimerRef, _CallbackId} ->
118128
erlang:cancel_timer(ErlTimerRef),
119129
NewTimers = maps:remove(TimerRef, Timers),
120130
{noreply, State#state{timers = NewTimers}}
121131
end;
122132

123133
%% Handle timer expiration
124-
handle_info({timeout, TimerRef, CallbackId}, State) ->
125-
#state{loop_ref = LoopRef, timers = Timers} = State,
126-
%% Dispatch the timer callback
127-
py_nif:dispatch_timer(LoopRef, CallbackId),
128-
%% Remove from active timers
129-
NewTimers = maps:remove(TimerRef, Timers),
130-
{noreply, State#state{timers = NewTimers}};
134+
handle_info({timeout, TimerRef}, State) ->
135+
#state{timers = Timers} = State,
136+
case maps:get(TimerRef, Timers, undefined) of
137+
undefined ->
138+
%% Timer was cancelled
139+
{noreply, State};
140+
{LoopRef, _ErlTimerRef, CallbackId} ->
141+
%% Dispatch the timer callback to the correct loop
142+
py_nif:dispatch_timer(LoopRef, CallbackId),
143+
%% Remove from active timers
144+
NewTimers = maps:remove(TimerRef, Timers),
145+
{noreply, State#state{timers = NewTimers}}
146+
end;
131147

132148
%% Handle select stop notifications
133149
handle_info({select, _FdRes, _Ref, cancelled}, State) ->
@@ -139,7 +155,7 @@ handle_info(_Info, State) ->
139155

140156
terminate(_Reason, #state{timers = Timers}) ->
141157
%% Cancel all pending timers
142-
maps:foreach(fun(_TimerRef, {ErlTimerRef, _CallbackId}) ->
158+
maps:foreach(fun(_TimerRef, {_LoopRef, ErlTimerRef, _CallbackId}) ->
143159
erlang:cancel_timer(ErlTimerRef)
144160
end, Timers),
145161
ok.

src/py_nif.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
%% Python event loop integration
121121
set_python_event_loop/1,
122122
set_isolation_mode/1,
123+
set_shared_router/1,
123124
%% ASGI optimizations
124125
asgi_build_scope/1,
125126
asgi_run/5,
@@ -771,6 +772,13 @@ set_python_event_loop(_LoopRef) ->
771772
set_isolation_mode(_Mode) ->
772773
?NIF_STUB.
773774

775+
%% @doc Set the shared router PID for per-loop created loops.
776+
%% All loops created via _loop_new() in Python will use this router
777+
%% for FD monitoring and timer operations.
778+
-spec set_shared_router(pid()) -> ok | {error, term()}.
779+
set_shared_router(_RouterPid) ->
780+
?NIF_STUB.
781+
774782
%%% ============================================================================
775783
%%% ASGI Optimizations
776784
%%% ============================================================================

test/py_multi_loop_integration_SUITE.erl

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@
2121
test_per_loop_mode_enabled/1,
2222
test_two_loops_concurrent_sleep/1,
2323
test_two_loops_concurrent_gather/1,
24-
test_isolated_loop_tcp_echo/1
24+
test_isolated_loop_tcp_echo/1,
25+
test_two_loops_with_timers/1
2526
]).
2627

2728
all() ->
2829
[
2930
test_per_loop_mode_enabled,
3031
test_two_loops_concurrent_sleep,
3132
test_two_loops_concurrent_gather,
32-
test_isolated_loop_tcp_echo
33+
test_isolated_loop_tcp_echo,
34+
test_two_loops_with_timers
3335
].
3436

3537
init_per_suite(Config) ->
@@ -256,3 +258,59 @@ for loop in loops:
256258
assert loop.is_closed(), 'Loop not closed'
257259
">>),
258260
ok.
261+
262+
%% ============================================================================
263+
%% Test: Two loops with timer callbacks (call_later)
264+
%% ============================================================================
265+
%%
266+
%% Tests that per-loop created loops can use timers (via shared router).
267+
%% This verifies that the shared router is properly set up and working.
268+
269+
test_two_loops_with_timers(_Config) ->
270+
ok = py:exec(<<"
271+
import time
272+
from erlang_loop import ErlangEventLoop
273+
274+
# Test timer in a single loop first
275+
loop = ErlangEventLoop()
276+
timer_fired = []
277+
278+
def timer_callback():
279+
timer_fired.append(time.time())
280+
281+
# Schedule a 50ms timer
282+
handle = loop.call_later(0.05, timer_callback)
283+
284+
# Run the loop to process the timer
285+
start = time.time()
286+
deadline = start + 0.5 # 500ms timeout
287+
288+
while time.time() < deadline and not timer_fired:
289+
loop._run_once()
290+
time.sleep(0.01)
291+
292+
assert len(timer_fired) > 0, f'Timer did not fire within timeout, elapsed={time.time()-start:.3f}s'
293+
294+
loop.close()
295+
296+
# Now test two loops sequentially
297+
for loop_id in ['loop_a', 'loop_b']:
298+
loop = ErlangEventLoop()
299+
timer_result = []
300+
301+
def make_cb(lid):
302+
def cb():
303+
timer_result.append(lid)
304+
return cb
305+
306+
loop.call_later(0.03, make_cb(loop_id))
307+
308+
start = time.time()
309+
while time.time() < start + 0.3 and not timer_result:
310+
loop._run_once()
311+
time.sleep(0.01)
312+
313+
assert timer_result == [loop_id], f'Loop {loop_id} timer failed: {timer_result}'
314+
loop.close()
315+
">>),
316+
ok.

0 commit comments

Comments
 (0)