Skip to content

Commit bc009fd

Browse files
committed
extmod/uasyncio: Add optional implementation of core uasyncio in C.
Implements Task and TaskQueue classes in C, using a pairing-heap data structure. Using this reduces RAM use of each Task, and improves overall performance of the uasyncio scheduler.
1 parent 081d067 commit bc009fd

File tree

6 files changed

+308
-2
lines changed

6 files changed

+308
-2
lines changed

extmod/moduasyncio.c

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* This file is part of the MicroPython project, http://micropython.org/
3+
*
4+
* The MIT License (MIT)
5+
*
6+
* Copyright (c) 2020 Damien P. George
7+
*
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy
9+
* of this software and associated documentation files (the "Software"), to deal
10+
* in the Software without restriction, including without limitation the rights
11+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
* copies of the Software, and to permit persons to whom the Software is
13+
* furnished to do so, subject to the following conditions:
14+
*
15+
* The above copyright notice and this permission notice shall be included in
16+
* all copies or substantial portions of the Software.
17+
*
18+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24+
* THE SOFTWARE.
25+
*/
26+
27+
#include "py/runtime.h"
28+
#include "py/smallint.h"
29+
#include "py/pairheap.h"
30+
#include "py/mphal.h"
31+
32+
#if MICROPY_PY_UASYNCIO
33+
34+
typedef struct _mp_obj_task_t {
35+
mp_pairheap_t pairheap;
36+
mp_obj_t coro;
37+
mp_obj_t data;
38+
mp_obj_t waiting;
39+
40+
mp_obj_t ph_key;
41+
} mp_obj_task_t;
42+
43+
typedef struct _mp_obj_task_queue_t {
44+
mp_obj_base_t base;
45+
mp_obj_task_t *heap;
46+
} mp_obj_task_queue_t;
47+
48+
STATIC const mp_obj_type_t task_queue_type;
49+
STATIC const mp_obj_type_t task_type;
50+
51+
STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args);
52+
53+
/******************************************************************************/
54+
// Ticks for task ordering in pairing heap
55+
56+
STATIC mp_obj_t ticks(void) {
57+
return MP_OBJ_NEW_SMALL_INT(mp_hal_ticks_ms() & (MICROPY_PY_UTIME_TICKS_PERIOD - 1));
58+
}
59+
60+
STATIC mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {
61+
mp_uint_t t0 = MP_OBJ_SMALL_INT_VALUE(t0_in);
62+
mp_uint_t t1 = MP_OBJ_SMALL_INT_VALUE(t1_in);
63+
mp_int_t diff = ((t1 - t0 + MICROPY_PY_UTIME_TICKS_PERIOD / 2) & (MICROPY_PY_UTIME_TICKS_PERIOD - 1))
64+
- MICROPY_PY_UTIME_TICKS_PERIOD / 2;
65+
return diff;
66+
}
67+
68+
STATIC int task_lt(mp_pairheap_t *n1, mp_pairheap_t *n2) {
69+
mp_obj_task_t *t1 = (mp_obj_task_t *)n1;
70+
mp_obj_task_t *t2 = (mp_obj_task_t *)n2;
71+
return MP_OBJ_SMALL_INT_VALUE(ticks_diff(t1->ph_key, t2->ph_key)) < 0;
72+
}
73+
74+
/******************************************************************************/
75+
// TaskQueue class
76+
77+
STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
78+
(void)args;
79+
mp_arg_check_num(n_args, n_kw, 0, 0, false);
80+
mp_obj_task_queue_t *self = m_new_obj(mp_obj_task_queue_t);
81+
self->base.type = type;
82+
self->heap = (mp_obj_task_t *)mp_pairheap_new(task_lt);
83+
return MP_OBJ_FROM_PTR(self);
84+
}
85+
86+
STATIC mp_obj_t task_queue_peek(mp_obj_t self_in) {
87+
mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
88+
if (self->heap == NULL) {
89+
return mp_const_none;
90+
} else {
91+
return MP_OBJ_FROM_PTR(self->heap);
92+
}
93+
}
94+
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek);
95+
96+
STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) {
97+
mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]);
98+
mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]);
99+
task->data = mp_const_none;
100+
if (n_args == 2) {
101+
task->ph_key = ticks();
102+
} else {
103+
assert(mp_obj_is_small_int(args[2]));
104+
task->ph_key = args[2];
105+
}
106+
self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, &self->heap->pairheap, &task->pairheap);
107+
return mp_const_none;
108+
}
109+
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_sorted_obj, 2, 3, task_queue_push_sorted);
110+
111+
STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) {
112+
mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
113+
mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap);
114+
if (head == NULL) {
115+
mp_raise_msg(&mp_type_IndexError, "empty heap");
116+
}
117+
self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap);
118+
return MP_OBJ_FROM_PTR(head);
119+
}
120+
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_head_obj, task_queue_pop_head);
121+
122+
STATIC mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) {
123+
mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
124+
mp_obj_task_t *task = MP_OBJ_TO_PTR(task_in);
125+
self->heap = (mp_obj_task_t *)mp_pairheap_delete(task_lt, &self->heap->pairheap, &task->pairheap);
126+
return mp_const_none;
127+
}
128+
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove);
129+
130+
STATIC const mp_rom_map_elem_t task_queue_locals_dict_table[] = {
131+
{ MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) },
132+
{ MP_ROM_QSTR(MP_QSTR_push_sorted), MP_ROM_PTR(&task_queue_push_sorted_obj) },
133+
{ MP_ROM_QSTR(MP_QSTR_push_head), MP_ROM_PTR(&task_queue_push_sorted_obj) },
134+
{ MP_ROM_QSTR(MP_QSTR_pop_head), MP_ROM_PTR(&task_queue_pop_head_obj) },
135+
{ MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) },
136+
};
137+
STATIC MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table);
138+
139+
STATIC const mp_obj_type_t task_queue_type = {
140+
{ &mp_type_type },
141+
.name = MP_QSTR_TaskQueue,
142+
.make_new = task_queue_make_new,
143+
.locals_dict = (mp_obj_dict_t *)&task_queue_locals_dict,
144+
};
145+
146+
/******************************************************************************/
147+
// Task class
148+
149+
// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
150+
STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
151+
152+
STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
153+
mp_arg_check_num(n_args, n_kw, 1, 2, false);
154+
mp_obj_task_t *self = m_new_obj(mp_obj_task_t);
155+
self->pairheap.base.type = type;
156+
mp_pairheap_init_node(task_lt, &self->pairheap);
157+
self->coro = args[0];
158+
self->data = mp_const_none;
159+
self->waiting = mp_const_none;
160+
self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
161+
if (n_args == 2) {
162+
uasyncio_context = args[1];
163+
}
164+
return MP_OBJ_FROM_PTR(self);
165+
}
166+
167+
STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
168+
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
169+
// Check if task is already finished.
170+
if (self->coro == mp_const_none) {
171+
return mp_const_false;
172+
}
173+
// Can't cancel self (not supported yet).
174+
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
175+
if (self_in == cur_task) {
176+
mp_raise_msg(&mp_type_RuntimeError, "cannot cancel self");
177+
}
178+
// If Task waits on another task then forward the cancel to the one it's waiting on.
179+
while (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&task_type))) {
180+
self = MP_OBJ_TO_PTR(self->data);
181+
}
182+
183+
mp_obj_t _task_queue = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__task_queue));
184+
185+
// Reschedule Task as a cancelled task.
186+
mp_obj_t dest[3];
187+
mp_load_method_maybe(self->data, MP_QSTR_remove, dest);
188+
if (dest[0] != MP_OBJ_NULL) {
189+
// Not on the main running queue, remove the task from the queue it's on.
190+
dest[2] = MP_OBJ_FROM_PTR(self);
191+
mp_call_method_n_kw(1, 0, dest);
192+
// _task_queue.push_head(self)
193+
dest[0] = _task_queue;
194+
dest[1] = MP_OBJ_FROM_PTR(self);
195+
task_queue_push_sorted(2, dest);
196+
} else if (ticks_diff(self->ph_key, ticks()) > 0) {
197+
// On the main running queue but scheduled in the future, so bring it forward to now.
198+
// _task_queue.remove(self)
199+
task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self));
200+
// _task_queue.push_head(self)
201+
dest[0] = _task_queue;
202+
dest[1] = MP_OBJ_FROM_PTR(self);
203+
task_queue_push_sorted(2, dest);
204+
}
205+
206+
self->data = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError));
207+
208+
return mp_const_true;
209+
}
210+
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
211+
212+
STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
213+
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
214+
if (dest[0] == MP_OBJ_NULL) {
215+
// Load
216+
if (attr == MP_QSTR_coro) {
217+
dest[0] = self->coro;
218+
} else if (attr == MP_QSTR_data) {
219+
dest[0] = self->data;
220+
} else if (attr == MP_QSTR_waiting) {
221+
if (self->waiting != mp_const_none) {
222+
dest[0] = self->waiting;
223+
}
224+
} else if (attr == MP_QSTR_cancel) {
225+
dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
226+
dest[1] = self_in;
227+
} else if (attr == MP_QSTR_ph_key) {
228+
dest[0] = self->ph_key;
229+
}
230+
} else if (dest[1] != MP_OBJ_NULL) {
231+
// Store
232+
if (attr == MP_QSTR_coro) {
233+
self->coro = dest[1];
234+
dest[0] = MP_OBJ_NULL;
235+
} else if (attr == MP_QSTR_data) {
236+
self->data = dest[1];
237+
dest[0] = MP_OBJ_NULL;
238+
} else if (attr == MP_QSTR_waiting) {
239+
self->waiting = dest[1];
240+
dest[0] = MP_OBJ_NULL;
241+
}
242+
}
243+
}
244+
245+
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
246+
(void)iter_buf;
247+
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
248+
if (self->waiting == mp_const_none) {
249+
self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
250+
}
251+
return self_in;
252+
}
253+
254+
STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
255+
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
256+
if (self->coro == mp_const_none) {
257+
// Task finished, raise return value to caller so it can continue.
258+
nlr_raise(self->data);
259+
} else {
260+
// Put calling task on waiting queue.
261+
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
262+
mp_obj_t args[2] = { self->waiting, cur_task };
263+
task_queue_push_sorted(2, args);
264+
// Set calling task's data to this task that it waits on, to double-link it.
265+
((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
266+
}
267+
return mp_const_none;
268+
}
269+
270+
STATIC const mp_obj_type_t task_type = {
271+
{ &mp_type_type },
272+
.name = MP_QSTR_Task,
273+
.make_new = task_make_new,
274+
.attr = task_attr,
275+
.getiter = task_getiter,
276+
.iternext = task_iternext,
277+
};
278+
279+
/******************************************************************************/
280+
// C-level uasyncio module
281+
282+
STATIC const mp_rom_map_elem_t mp_module_uasyncio_globals_table[] = {
283+
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR__uasyncio) },
284+
{ MP_ROM_QSTR(MP_QSTR_TaskQueue), MP_ROM_PTR(&task_queue_type) },
285+
{ MP_ROM_QSTR(MP_QSTR_Task), MP_ROM_PTR(&task_type) },
286+
};
287+
STATIC MP_DEFINE_CONST_DICT(mp_module_uasyncio_globals, mp_module_uasyncio_globals_table);
288+
289+
const mp_obj_module_t mp_module_uasyncio = {
290+
.base = { &mp_type_module },
291+
.globals = (mp_obj_dict_t *)&mp_module_uasyncio_globals,
292+
};
293+
294+
#endif // MICROPY_PY_UASYNCIO

extmod/uasyncio/core.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
from time import ticks_ms as ticks, ticks_diff, ticks_add
55
import sys, select
66

7-
# Import TaskQueue and Task
8-
from .task import TaskQueue, Task
7+
# Import TaskQueue and Task, preferring built-in C code over Python code
8+
try:
9+
from _uasyncio import TaskQueue, Task
10+
except:
11+
from .task import TaskQueue, Task
912

1013

1114
################################################################################

py/builtin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ extern const mp_obj_module_t mp_module_thread;
103103
extern const mp_obj_dict_t mp_module_builtins_globals;
104104

105105
// extmod modules
106+
extern const mp_obj_module_t mp_module_uasyncio;
106107
extern const mp_obj_module_t mp_module_uerrno;
107108
extern const mp_obj_module_t mp_module_uctypes;
108109
extern const mp_obj_module_t mp_module_uzlib;

py/mpconfig.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,10 @@ typedef double mp_float_t;
12911291

12921292
// Extended modules
12931293

1294+
#ifndef MICROPY_PY_UASYNCIO
1295+
#define MICROPY_PY_UASYNCIO (0)
1296+
#endif
1297+
12941298
#ifndef MICROPY_PY_UCTYPES
12951299
#define MICROPY_PY_UCTYPES (0)
12961300
#endif

py/objmodule.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ STATIC const mp_rom_map_elem_t mp_builtin_module_table[] = {
170170

171171
// extmod modules
172172

173+
#if MICROPY_PY_UASYNCIO
174+
{ MP_ROM_QSTR(MP_QSTR__uasyncio), MP_ROM_PTR(&mp_module_uasyncio) },
175+
#endif
173176
#if MICROPY_PY_UERRNO
174177
{ MP_ROM_QSTR(MP_QSTR_uerrno), MP_ROM_PTR(&mp_module_uerrno) },
175178
#endif

py/py.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ PY_CORE_O_BASENAME = $(addprefix py/,\
168168
)
169169

170170
PY_EXTMOD_O_BASENAME = \
171+
extmod/moduasyncio.o \
171172
extmod/moductypes.o \
172173
extmod/modujson.o \
173174
extmod/modure.o \

0 commit comments

Comments
 (0)