diff --git a/vm/lua/tenshi-runtime/inc/actor_sched.h b/vm/lua/tenshi-runtime/inc/actor_sched.h index 64af7278..696110c6 100644 --- a/vm/lua/tenshi-runtime/inc/actor_sched.h +++ b/vm/lua/tenshi-runtime/inc/actor_sched.h @@ -27,7 +27,6 @@ typedef struct _TenshiActorState* TenshiActorState; #define RIDX_ALLACTORS "tenshi.allActors" #define RIDX_RUNQUEUELO "tenshi.runQueueLo" #define RIDX_RUNQUEUEHI "tenshi.runQueueHi" -#define RIDX_TIMEOUTQUEUE "tenshi.timeoutQueue" // To be called in protected mode. extern int ActorSchedulerInit(lua_State *L); diff --git a/vm/lua/tenshi-runtime/inc/runtime_entry.h b/vm/lua/tenshi-runtime/inc/runtime_entry.h index 55ebdd06..c3d5d72e 100644 --- a/vm/lua/tenshi-runtime/inc/runtime_entry.h +++ b/vm/lua/tenshi-runtime/inc/runtime_entry.h @@ -18,6 +18,7 @@ #ifndef INC_RUNTIME_ENTRY_H_ #define INC_RUNTIME_ENTRY_H_ +#include #include #include "lua.h" // NOLINT(build/include) #include "lauxlib.h" // NOLINT(build/include) @@ -76,4 +77,8 @@ extern void TenshiRegisterCFunctions(TenshiRuntimeState s, const luaL_Reg *l); // LUA_OK on success. extern int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev); +// Get the current tick time of the runtime (incremetns when TenshiRunQuanta +// is called). +extern uint32_t TenshiGetTickTime(TenshiRuntimeState s); + #endif // INC_RUNTIME_ENTRY_H_ diff --git a/vm/lua/tenshi-runtime/inc/runtime_internal.h b/vm/lua/tenshi-runtime/inc/runtime_internal.h index 9524a77f..d6034636 100644 --- a/vm/lua/tenshi-runtime/inc/runtime_internal.h +++ b/vm/lua/tenshi-runtime/inc/runtime_internal.h @@ -18,6 +18,10 @@ #ifndef INC_RUNTIME_INTERNAL_H_ #define INC_RUNTIME_INTERNAL_H_ +#include + +#include "inc/priority_queue.h" + struct _TenshiRuntimeState { // Lua state object -- refers to "main" thread (that never executes) lua_State *L; @@ -27,6 +31,10 @@ struct _TenshiRuntimeState { // Thread called after "main" part of loop that translates data sent into // mailboxes into actuator update functions TenshiActorState actuator_actor; + // Time in "ticks" -- incremented every time TenshiRunQuanta is called + uint32_t time_ticks; + // A priority queue for threads blocking on some number of ticks to elapse + priority_queue_t timeout_ticks_queue; }; struct _TenshiActorState { @@ -38,6 +46,8 @@ struct _TenshiActorState { int isblocked; // Set to true if this actor woke because of a timeout. int woke_timeout; + // Used for scheduler linked lists (run queue) + TenshiActorState next; }; #endif // INC_RUNTIME_INTERNAL_H_ diff --git a/vm/lua/tenshi-runtime/src/actor_sched.c b/vm/lua/tenshi-runtime/src/actor_sched.c index bbf0ab4d..e86e4df9 100644 --- a/vm/lua/tenshi-runtime/src/actor_sched.c +++ b/vm/lua/tenshi-runtime/src/actor_sched.c @@ -18,6 +18,12 @@ #include "inc/actor_sched.h" #include "inc/runtime_internal.h" +// Head node of a linked list for scheduler run queue +typedef struct RunQueueStruct { + TenshiActorState head; + TenshiActorState tail; +} RunQueueStruct; + int ActorSchedulerInit(lua_State *L) { // This scheduler is almost the most naive possible scheduler consisting of // two priority level with a FIFO of tasks ready to run. Scheduler state is @@ -38,30 +44,15 @@ int ActorSchedulerInit(lua_State *L) { // The runnable queue is a singly-linked list with a last element pointer. // Here we initialize both head and tail in both queues to be nil. lua_pushstring(L, RIDX_RUNQUEUELO); - lua_newtable(L); - lua_pushstring(L, "head"); - lua_pushnil(L); - lua_settable(L, -3); - lua_pushstring(L, "tail"); - lua_pushnil(L); - lua_settable(L, -3); + RunQueueStruct *runqueue = lua_newuserdata(L, sizeof(RunQueueStruct)); + runqueue->head = NULL; + runqueue->tail = NULL; lua_settable(L, LUA_REGISTRYINDEX); lua_pushstring(L, RIDX_RUNQUEUEHI); - lua_newtable(L); - lua_pushstring(L, "head"); - lua_pushnil(L); - lua_settable(L, -3); - lua_pushstring(L, "tail"); - lua_pushnil(L); - lua_settable(L, -3); - lua_settable(L, LUA_REGISTRYINDEX); - - // This is a simple unsorted array of things waiting on timeouts. - // TODO(rqou): This is not algorithmically efficient -- eventually - // should replace with a priority queue. - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_newtable(L); + runqueue = lua_newuserdata(L, sizeof(RunQueueStruct)); + runqueue->head = NULL; + runqueue->tail = NULL; lua_settable(L, LUA_REGISTRYINDEX); return 0; @@ -160,9 +151,6 @@ static int _ActorDestroyAll(lua_State *L) { lua_pushstring(L, RIDX_RUNQUEUEHI); lua_pushnil(L); lua_settable(L, LUA_REGISTRYINDEX); - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_pushnil(L); - lua_settable(L, LUA_REGISTRYINDEX); return 0; } @@ -177,51 +165,33 @@ void ActorDestroyAll(TenshiRuntimeState s) { } // Called in protected mode +// _ActorSetRunnable(lohi, TenshiActorState) static int _ActorSetRunnable(lua_State *L) { - // Create our list object - lua_newtable(L); - lua_pushstring(L, "next"); - lua_pushnil(L); - lua_settable(L, -3); - lua_pushstring(L, "thread"); - lua_pushthread(L); - lua_settable(L, -3); - - // Add ourselves to the back of the runnable list - lua_pushvalue(L, -2); + // Get the RunQueueStruct + lua_pushvalue(L, 1); lua_gettable(L, LUA_REGISTRYINDEX); + RunQueueStruct *runqueue = (RunQueueStruct *)(lua_touserdata(L, -1)); + lua_pop(L, 1); + // stack is lohi, TenshiActorState - lua_pushstring(L, "tail"); - lua_gettable(L, -2); - // stack is lohi, newobj, runqueue, tailelem + // Get the TenshiActorState + TenshiActorState a = lua_touserdata(L, 2); - if (lua_isnil(L, -1)) { + if (runqueue->tail == NULL) { // If there is no runnables list, make it point to us - lua_pop(L, 1); - // stack is lohi, newobj, runqueue - lua_pushstring(L, "head"); - lua_pushvalue(L, -3); - lua_settable(L, -3); - lua_pushstring(L, "tail"); - lua_pushvalue(L, -3); - lua_settable(L, -3); - // stack is lohi, newobj, runqueue - lua_pop(L, 3); + runqueue->head = runqueue->tail = a; + a->next = NULL; + + // stack is lohi, TenshiActorState + lua_pop(L, 2); return 0; } else { - // stack is lohi, newobj, runqueue, tailelem - lua_pushstring(L, "next"); - lua_pushvalue(L, -4); - // stack is lohi, newobj, runqueue, tailelem, "next", newobj - lua_settable(L, -3); - lua_pop(L, 1); - // stack is lohi, newobj, runqueue - // Need to update tail poninter now - lua_pushstring(L, "tail"); - lua_pushvalue(L, -3); - lua_settable(L, -3); - // stack is lohi, newobj, runqueue - lua_pop(L, 3); + runqueue->tail->next = a; + a->next = NULL; + runqueue->tail = a; + + // stack is lohi, TenshiActorState + lua_pop(L, 2); return 0; } } @@ -233,59 +203,49 @@ int ActorSetRunnable(TenshiActorState a, int highPriority) { } else { lua_pushstring(a->L, RIDX_RUNQUEUELO); } - return lua_pcall(a->L, 1, 0, 0); + lua_pushlightuserdata(a->L, a); + return lua_pcall(a->L, 2, 0, 0); } // Called in protected mode +// TenshiActorState = _ActorDequeueHead() static int _ActorDequeueHead(lua_State *L) { + RunQueueStruct *runqueue; + lua_pushstring(L, RIDX_RUNQUEUEHI); lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushstring(L, "head"); - lua_gettable(L, -2); - // stack is runqueuehi, headelem + runqueue = lua_touserdata(L, -1); + lua_pop(L, 1); + // stack is empty - if (lua_isnil(L, -1)) { + if (runqueue->head == NULL) { // No high priority, how about low? - lua_pop(L, 2); lua_pushstring(L, RIDX_RUNQUEUELO); lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushstring(L, "head"); - lua_gettable(L, -2); - // stack is runqueuelo, headelem - if (lua_isnil(L, -1)) { + runqueue = lua_touserdata(L, -1); + lua_pop(L, 1); + // stack is empty + if (runqueue->head == NULL) { // No low priority either, so nothing at all. - lua_pop(L, 2); lua_pushnil(L); return 1; } } - // stack is runqueue(hi/lo), headelem - lua_pushstring(L, "thread"); - lua_gettable(L, -2); - lua_pushstring(L, "next"); - lua_gettable(L, -3); - // stack is runqueue, headelem, thread, nextelem - if (lua_isnil(L, -1)) { + // runqueue is the hi/lo runqueue, and it definitely has a head + TenshiActorState ret = runqueue->head; + + if (runqueue->head->next == NULL) { // This was the only thread - lua_pushstring(L, "head"); - lua_pushnil(L); - lua_settable(L, -6); - lua_pushstring(L, "tail"); - lua_pushnil(L); - lua_settable(L, -6); - // stack is runqueue, headelem, thread, nextelem - lua_copy(L, -2, -4); - lua_pop(L, 3); + runqueue->head = runqueue->tail = NULL; + // stack is empty + lua_pushlightuserdata(L, ret); return 1; } else { // Other threads exist - lua_pushstring(L, "head"); - lua_pushvalue(L, -2); - // stack is runqueue, headelem, thread, nextelem, "head", nextelem - lua_settable(L, -6); - lua_copy(L, -2, -4); - lua_pop(L, 3); + runqueue->head = runqueue->head->next; + // stack is empty + lua_pushlightuserdata(L, ret); return 1; } } @@ -314,15 +274,8 @@ int ActorDequeueHead(TenshiRuntimeState s, TenshiActorState *a_out) { ret = lua_pcall(s->L, 0, 1, 0); if (ret != LUA_OK) return ret; - lua_pushcfunction(s->L, ActorFindInTaskset); - lua_insert(s->L, -2); - ret = lua_pcall(s->L, 1, 1, 0); - if (ret != LUA_OK) return ret; - - if (lua_isnil(s->L, -1)) - *a_out = NULL; - else - *a_out = ActorObjectGetCState(s->L); + // Will be null if function returned nil + *a_out = lua_touserdata(s->L, -1); lua_pop(s->L, 1); return LUA_OK; @@ -343,49 +296,26 @@ int ActorSetUnblocked(TenshiActorState a) { return LUA_OK; } -// Called in protected mode -static int _ActorProcessTimeouts(lua_State *L) { - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - - // Stack is timeoutqueue - - lua_pushnil(L); - while (lua_next(L, -2) != 0) { - // stack is timeoutqueue, key (actor), value (timeout) - int timeout = lua_tointeger(L, -1); - lua_pop(L, 1); - // stack is timeoutqueue, key (actor) - if (--timeout == 0) { - // Unblock this actor - lua_pushvalue(L, -1); - lua_pushnil(L); - lua_settable(L, -4); - // stack is timeoutqueue, key (actor) - TenshiActorState a = ActorObjectGetCState(L); - if (ActorSetRunnable(a, 0) != LUA_OK) { - lua_pushstring(L, "Error setting actor to runnable"); - lua_error(L); - } - // Set its woke_timeout flag - a->woke_timeout = 1; - } else { - // Decrement the timeout - lua_pushvalue(L, -1); - lua_pushinteger(L, timeout); - lua_settable(L, -4); - // stack is timeoutqueue, key (actor) +int ActorProcessTimeouts(TenshiRuntimeState s) { + int first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue); + if (first_timeout == -1) return LUA_OK; + + while (first_timeout <= s->time_ticks) { + // Unblock this actor + TenshiActorState a = + (TenshiActorState)priority_queue_get_obj(s->timeout_ticks_queue); + if (ActorSetRunnable(a, 0) != LUA_OK) { + lua_pushstring(s->L, "Error setting actor to runnable"); + lua_error(s->L); } - } + // Set its woke_timeout flag + a->woke_timeout = 1; - // Stack is timeoutqueue - lua_pop(L, 1); - return 0; -} + // Next thing? + first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue); + } -int ActorProcessTimeouts(TenshiRuntimeState s) { - lua_pushcfunction(s->L, _ActorProcessTimeouts); - return lua_pcall(s->L, 0, 0, 0); + return LUA_OK; } int ActorWasWokenTimeout(TenshiActorState a) { diff --git a/vm/lua/tenshi-runtime/src/mboxlib.c b/vm/lua/tenshi-runtime/src/mboxlib.c index eb1336f7..0b45b634 100644 --- a/vm/lua/tenshi-runtime/src/mboxlib.c +++ b/vm/lua/tenshi-runtime/src/mboxlib.c @@ -170,12 +170,19 @@ int MBoxCreate(lua_State *L) { // room. Input stack is alternating mailbox and value (only mailbox will be // read). // NOT A LUA C FUNCTION. -static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) { +static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout, + TenshiActorState a) { // In order to handle sending to the same mailbox multiple times, here we // total up the number of times each mailbox is referenced. We will then // check whether there is enough space to push all of that data into the // mailbox at once. + // Get the global state + lua_pushstring(L, RIDX_RUNTIMESTATE); + lua_gettable(L, LUA_REGISTRYINDEX); + TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1); + lua_pop(L, 1); + // Total up references to each mailbox // stack is ...args... lua_newtable(L); @@ -252,15 +259,10 @@ static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) { lua_pop(L, 1); // stack is ...args... - if (!enough_space && (timeout > 0)) { + if (!enough_space && a && (timeout > 0)) { // Add ourselves to the global timeout list - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushcfunction(L, ActorGetOwnActor); - lua_call(L, 0, 1); - lua_pushinteger(L, timeout); - lua_settable(L, -3); - lua_pop(L, 1); + priority_queue_insert(s->timeout_ticks_queue, a, + TenshiGetTickTime(s) + timeout); } return enough_space; @@ -427,6 +429,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { // yielded, and came back. // Check if timeout happened + TenshiActorState a = NULL; lua_pushcfunction(L, ActorGetOwnActor); lua_call(L, 0, 1); // We can have no actor if the external code is sending/receiving. Assume @@ -434,7 +437,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { if (lua_isnil(L, -1)) { lua_pop(L, 1); } else { - TenshiActorState a = ActorObjectGetCState(L); + a = ActorObjectGetCState(L); lua_pop(L, 1); if (ActorWasWokenTimeout(a)) { // It was due to a timeout @@ -468,7 +471,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) { // Check if all mailboxes have space // stack is ...args... - if (!MBoxSendCheckSpace(L, num_mboxes, timeout)) { + if (!MBoxSendCheckSpace(L, num_mboxes, timeout, a)) { if (timeout == 0) { // No timeout, so we return immediately lua_pop(L, lua_gettop(L)); @@ -567,7 +570,14 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { // Called either on initial attempt to recv or when we tried, failed, // yielded, and came back. + // Get the global state + lua_pushstring(L, RIDX_RUNTIMESTATE); + lua_gettable(L, LUA_REGISTRYINDEX); + TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1); + lua_pop(L, 1); + // Check if timeout happened + TenshiActorState a = NULL; lua_pushcfunction(L, ActorGetOwnActor); lua_call(L, 0, 1); // We can have no actor if the external code is sending/receiving. Assume @@ -575,7 +585,7 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { if (lua_isnil(L, -1)) { lua_pop(L, 1); } else { - TenshiActorState a = ActorObjectGetCState(L); + a = ActorObjectGetCState(L); lua_pop(L, 1); if (ActorWasWokenTimeout(a)) { // It was due to a timeout @@ -696,15 +706,10 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) { lua_pop(L, 2); } - if (timeout > 0) { + if (timeout > 0 && a) { // Add ourselves to the global timeout list - lua_pushstring(L, RIDX_TIMEOUTQUEUE); - lua_gettable(L, LUA_REGISTRYINDEX); - lua_pushcfunction(L, ActorGetOwnActor); - lua_call(L, 0, 1); - lua_pushinteger(L, timeout); - lua_settable(L, -3); - lua_pop(L, 1); + priority_queue_insert(s->timeout_ticks_queue, a, + TenshiGetTickTime(s) + timeout); } // stack is ...args... diff --git a/vm/lua/tenshi-runtime/src/runtime_entry.c b/vm/lua/tenshi-runtime/src/runtime_entry.c index c9e6ffa3..fbb4b0d6 100644 --- a/vm/lua/tenshi-runtime/src/runtime_entry.c +++ b/vm/lua/tenshi-runtime/src/runtime_entry.c @@ -240,6 +240,12 @@ TenshiRuntimeState TenshiRuntimeInit(void) { return NULL; } + ret->timeout_ticks_queue = priority_queue_create(realloc, 0); + if (!ret->timeout_ticks_queue) { + TenshiRuntimeDeinit(ret); + return NULL; + } + // Register checkxip function on ARM #ifdef __arm__ lua_setcheckxip(ret->L, lua_arm_checkxip); @@ -323,6 +329,8 @@ TenshiRuntimeState TenshiRuntimeInit(void) { lua_gc(ret->L, LUA_GCCOLLECT, 0); + ret->time_ticks = 0; + return ret; } @@ -335,6 +343,10 @@ void TenshiRuntimeDeinit(TenshiRuntimeState s) { lua_close(s->L); } + if (s->timeout_ticks_queue) { + priority_queue_free(s->timeout_ticks_queue, free); + } + free(s); } @@ -436,6 +448,9 @@ int TenshiRunQuanta(TenshiRuntimeState s) { lua_gc(s->L, LUA_GCCOLLECT, 0); + // Increment tick count + s->time_ticks++; + return LUA_OK; } @@ -463,3 +478,7 @@ int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev) { lua_pushlightuserdata(s->L, dev); return lua_pcall(s->L, 1, 0, 0); } + +uint32_t TenshiGetTickTime(TenshiRuntimeState s) { + return s->time_ticks; +}