diff --git a/vm/lua/tenshi-runtime/inc/actor_sched.h b/vm/lua/tenshi-runtime/inc/actor_sched.h index 64af7278..696110c6 100644 --- a/vm/lua/tenshi-runtime/inc/actor_sched.h +++ b/vm/lua/tenshi-runtime/inc/actor_sched.h @@ -27,7 +27,6 @@ typedef struct _TenshiActorState* TenshiActorState; #define RIDX_ALLACTORS "tenshi.allActors" #define RIDX_RUNQUEUELO "tenshi.runQueueLo" #define RIDX_RUNQUEUEHI "tenshi.runQueueHi" -#define RIDX_TIMEOUTQUEUE "tenshi.timeoutQueue" // To be called in protected mode. extern int ActorSchedulerInit(lua_State *L); diff --git a/vm/lua/tenshi-runtime/inc/runtime_entry.h b/vm/lua/tenshi-runtime/inc/runtime_entry.h index 55ebdd06..c3d5d72e 100644 --- a/vm/lua/tenshi-runtime/inc/runtime_entry.h +++ b/vm/lua/tenshi-runtime/inc/runtime_entry.h @@ -18,6 +18,7 @@ #ifndef INC_RUNTIME_ENTRY_H_ #define INC_RUNTIME_ENTRY_H_ +#include #include #include "lua.h" // NOLINT(build/include) #include "lauxlib.h" // NOLINT(build/include) @@ -76,4 +77,8 @@ extern void TenshiRegisterCFunctions(TenshiRuntimeState s, const luaL_Reg *l); // LUA_OK on success. extern int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev); +// Get the current tick time of the runtime (incremetns when TenshiRunQuanta +// is called). +extern uint32_t TenshiGetTickTime(TenshiRuntimeState s); + #endif // INC_RUNTIME_ENTRY_H_ diff --git a/vm/lua/tenshi-runtime/inc/runtime_internal.h b/vm/lua/tenshi-runtime/inc/runtime_internal.h index 48d5d7b3..d6034636 100644 --- a/vm/lua/tenshi-runtime/inc/runtime_internal.h +++ b/vm/lua/tenshi-runtime/inc/runtime_internal.h @@ -18,6 +18,10 @@ #ifndef INC_RUNTIME_INTERNAL_H_ #define INC_RUNTIME_INTERNAL_H_ +#include + +#include "inc/priority_queue.h" + struct _TenshiRuntimeState { // Lua state object -- refers to "main" thread (that never executes) lua_State *L; @@ -27,6 +31,10 @@ struct _TenshiRuntimeState { // Thread called after "main" part of loop that translates data sent into // mailboxes into actuator update functions TenshiActorState actuator_actor; + // Time in "ticks" -- incremented every time TenshiRunQuanta is called + uint32_t time_ticks; + // A priority queue for threads blocking on some number of ticks to elapse + priority_queue_t timeout_ticks_queue; }; struct _TenshiActorState { diff --git a/vm/lua/tenshi-runtime/src/actor_sched.c b/vm/lua/tenshi-runtime/src/actor_sched.c index 6d3fed4b..e86e4df9 100644 --- a/vm/lua/tenshi-runtime/src/actor_sched.c +++ b/vm/lua/tenshi-runtime/src/actor_sched.c @@ -55,13 +55,6 @@ int ActorSchedulerInit(lua_State *L) { runqueue->tail = NULL; lua_settable(L, LUA_REGISTRYINDEX); - // This is a simple unsorted array of things waiting on timeouts. - // TODO(rqou): This is not algorithmically efficient -- eventually - // should replace with a priority queue. - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_newtable(L); - lua_settable(L, LUA_REGISTRYINDEX); - return 0; } @@ -158,9 +151,6 @@ static int _ActorDestroyAll(lua_State *L) { lua_pushstring(L, RIDX_RUNQUEUEHI); lua_pushnil(L); lua_settable(L, LUA_REGISTRYINDEX); - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_pushnil(L); - lua_settable(L, LUA_REGISTRYINDEX); return 0; } @@ -306,49 +296,26 @@ int ActorSetUnblocked(TenshiActorState a) { return LUA_OK; } -// Called in protected mode -static int _ActorProcessTimeouts(lua_State *L) { - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - - // Stack is timeoutqueue - - lua_pushnil(L); - while (lua_next(L, -2) != 0) { - // stack is timeoutqueue, key (actor), value (timeout) - int timeout = lua_tointeger(L, -1); - lua_pop(L, 1); - // stack is timeoutqueue, key (actor) - if (--timeout == 0) { - // Unblock this actor - lua_pushvalue(L, -1); - lua_pushnil(L); - lua_settable(L, -4); - // stack is timeoutqueue, key (actor) - TenshiActorState a = ActorObjectGetCState(L); - if (ActorSetRunnable(a, 0) != LUA_OK) { - lua_pushstring(L, "Error setting actor to runnable"); - lua_error(L); - } - // Set its woke_timeout flag - a->woke_timeout = 1; - } else { - // Decrement the timeout - lua_pushvalue(L, -1); - lua_pushinteger(L, timeout); - lua_settable(L, -4); - // stack is timeoutqueue, key (actor) +int ActorProcessTimeouts(TenshiRuntimeState s) { + int first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue); + if (first_timeout == -1) return LUA_OK; + + while (first_timeout <= s->time_ticks) { + // Unblock this actor + TenshiActorState a = + (TenshiActorState)priority_queue_get_obj(s->timeout_ticks_queue); + if (ActorSetRunnable(a, 0) != LUA_OK) { + lua_pushstring(s->L, "Error setting actor to runnable"); + lua_error(s->L); } - } + // Set its woke_timeout flag + a->woke_timeout = 1; - // Stack is timeoutqueue - lua_pop(L, 1); - return 0; -} + // Next thing? + first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue); + } -int ActorProcessTimeouts(TenshiRuntimeState s) { - lua_pushcfunction(s->L, _ActorProcessTimeouts); - return lua_pcall(s->L, 0, 0, 0); + return LUA_OK; } int ActorWasWokenTimeout(TenshiActorState a) { diff --git a/vm/lua/tenshi-runtime/src/mboxlib.c b/vm/lua/tenshi-runtime/src/mboxlib.c index eb1336f7..0b45b634 100644 --- a/vm/lua/tenshi-runtime/src/mboxlib.c +++ b/vm/lua/tenshi-runtime/src/mboxlib.c @@ -170,12 +170,19 @@ int MBoxCreate(lua_State *L) { // room. Input stack is alternating mailbox and value (only mailbox will be // read). // NOT A LUA C FUNCTION. -static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) { +static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout, + TenshiActorState a) { // In order to handle sending to the same mailbox multiple times, here we // total up the number of times each mailbox is referenced. We will then // check whether there is enough space to push all of that data into the // mailbox at once. + // Get the global state + lua_pushstring(L, RIDX_RUNTIMESTATE); + lua_gettable(L, LUA_REGISTRYINDEX); + TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1); + lua_pop(L, 1); + // Total up references to each mailbox // stack is ...args... lua_newtable(L); @@ -252,15 +259,10 @@ static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) { lua_pop(L, 1); // stack is ...args... - if (!enough_space && (timeout > 0)) { + if (!enough_space && a && (timeout > 0)) { // Add ourselves to the global timeout list - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushcfunction(L, ActorGetOwnActor); - lua_call(L, 0, 1); - lua_pushinteger(L, timeout); - lua_settable(L, -3); - lua_pop(L, 1); + priority_queue_insert(s->timeout_ticks_queue, a, + TenshiGetTickTime(s) + timeout); } return enough_space; @@ -427,6 +429,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { // yielded, and came back. // Check if timeout happened + TenshiActorState a = NULL; lua_pushcfunction(L, ActorGetOwnActor); lua_call(L, 0, 1); // We can have no actor if the external code is sending/receiving. Assume @@ -434,7 +437,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { if (lua_isnil(L, -1)) { lua_pop(L, 1); } else { - TenshiActorState a = ActorObjectGetCState(L); + a = ActorObjectGetCState(L); lua_pop(L, 1); if (ActorWasWokenTimeout(a)) { // It was due to a timeout @@ -468,7 +471,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { // Check if all mailboxes have space // stack is ...args... - if (!MBoxSendCheckSpace(L, num_mboxes, timeout)) { + if (!MBoxSendCheckSpace(L, num_mboxes, timeout, a)) { if (timeout == 0) { // No timeout, so we return immediately lua_pop(L, lua_gettop(L)); @@ -567,7 +570,14 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { // Called either on initial attempt to recv or when we tried, failed, // yielded, and came back. + // Get the global state + lua_pushstring(L, RIDX_RUNTIMESTATE); + lua_gettable(L, LUA_REGISTRYINDEX); + TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1); + lua_pop(L, 1); + // Check if timeout happened + TenshiActorState a = NULL; lua_pushcfunction(L, ActorGetOwnActor); lua_call(L, 0, 1); // We can have no actor if the external code is sending/receiving. Assume @@ -575,7 +585,7 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { if (lua_isnil(L, -1)) { lua_pop(L, 1); } else { - TenshiActorState a = ActorObjectGetCState(L); + a = ActorObjectGetCState(L); lua_pop(L, 1); if (ActorWasWokenTimeout(a)) { // It was due to a timeout @@ -696,15 +706,10 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { lua_pop(L, 2); } - if (timeout > 0) { + if (timeout > 0 && a) { // Add ourselves to the global timeout list - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushcfunction(L, ActorGetOwnActor); - lua_call(L, 0, 1); - lua_pushinteger(L, timeout); - lua_settable(L, -3); - lua_pop(L, 1); + priority_queue_insert(s->timeout_ticks_queue, a, + TenshiGetTickTime(s) + timeout); } // stack is ...args... diff --git a/vm/lua/tenshi-runtime/src/runtime_entry.c b/vm/lua/tenshi-runtime/src/runtime_entry.c index c9e6ffa3..fbb4b0d6 100644 --- a/vm/lua/tenshi-runtime/src/runtime_entry.c +++ b/vm/lua/tenshi-runtime/src/runtime_entry.c @@ -240,6 +240,12 @@ TenshiRuntimeState TenshiRuntimeInit(void) { return NULL; } + ret->timeout_ticks_queue = priority_queue_create(realloc, 0); + if (!ret->timeout_ticks_queue) { + TenshiRuntimeDeinit(ret); + return NULL; + } + // Register checkxip function on ARM #ifdef __arm__ lua_setcheckxip(ret->L, lua_arm_checkxip); @@ -323,6 +329,8 @@ TenshiRuntimeState TenshiRuntimeInit(void) { lua_gc(ret->L, LUA_GCCOLLECT, 0); + ret->time_ticks = 0; + return ret; } @@ -335,6 +343,10 @@ void TenshiRuntimeDeinit(TenshiRuntimeState s) { lua_close(s->L); } + if (s->timeout_ticks_queue) { + priority_queue_free(s->timeout_ticks_queue, free); + } + free(s); } @@ -436,6 +448,9 @@ int TenshiRunQuanta(TenshiRuntimeState s) { lua_gc(s->L, LUA_GCCOLLECT, 0); + // Increment tick count + s->time_ticks++; + return LUA_OK; } @@ -463,3 +478,7 @@ int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev) { lua_pushlightuserdata(s->L, dev); return lua_pcall(s->L, 1, 0, 0); } + +uint32_t TenshiGetTickTime(TenshiRuntimeState s) { + return s->time_ticks; +}