Skip to content
This repository has been archived by the owner on Jul 1, 2018. It is now read-only.

Commit

Permalink
Rewrite timeouts to use a priority queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ArcaneNibble committed Nov 15, 2014
1 parent 64911e3 commit 98771b5
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 71 deletions.
1 change: 0 additions & 1 deletion vm/lua/tenshi-runtime/inc/actor_sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ typedef struct _TenshiActorState* TenshiActorState;
#define RIDX_ALLACTORS "tenshi.allActors"
#define RIDX_RUNQUEUELO "tenshi.runQueueLo"
#define RIDX_RUNQUEUEHI "tenshi.runQueueHi"
#define RIDX_TIMEOUTQUEUE "tenshi.timeoutQueue"

// To be called in protected mode.
extern int ActorSchedulerInit(lua_State *L);
Expand Down
5 changes: 5 additions & 0 deletions vm/lua/tenshi-runtime/inc/runtime_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef INC_RUNTIME_ENTRY_H_
#define INC_RUNTIME_ENTRY_H_

#include <stdint.h>
#include <stdlib.h>
#include "lua.h" // NOLINT(build/include)
#include "lauxlib.h" // NOLINT(build/include)
Expand Down Expand Up @@ -76,4 +77,8 @@ extern void TenshiRegisterCFunctions(TenshiRuntimeState s, const luaL_Reg *l);
// LUA_OK on success.
extern int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev);

// Get the current tick time of the runtime (incremetns when TenshiRunQuanta
// is called).
extern uint32_t TenshiGetTickTime(TenshiRuntimeState s);

#endif // INC_RUNTIME_ENTRY_H_
8 changes: 8 additions & 0 deletions vm/lua/tenshi-runtime/inc/runtime_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#ifndef INC_RUNTIME_INTERNAL_H_
#define INC_RUNTIME_INTERNAL_H_

#include <stdint.h>

#include "inc/priority_queue.h"

struct _TenshiRuntimeState {
// Lua state object -- refers to "main" thread (that never executes)
lua_State *L;
Expand All @@ -27,6 +31,10 @@ struct _TenshiRuntimeState {
// Thread called after "main" part of loop that translates data sent into
// mailboxes into actuator update functions
TenshiActorState actuator_actor;
// Time in "ticks" -- incremented every time TenshiRunQuanta is called
uint32_t time_ticks;
// A priority queue for threads blocking on some number of ticks to elapse
priority_queue_t timeout_ticks_queue;
};

struct _TenshiActorState {
Expand Down
67 changes: 17 additions & 50 deletions vm/lua/tenshi-runtime/src/actor_sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ int ActorSchedulerInit(lua_State *L) {
runqueue->tail = NULL;
lua_settable(L, LUA_REGISTRYINDEX);

// This is a simple unsorted array of things waiting on timeouts.
// TODO(rqou): This is not algorithmically efficient -- eventually
// should replace with a priority queue.
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_newtable(L);
lua_settable(L, LUA_REGISTRYINDEX);

return 0;
}

Expand Down Expand Up @@ -158,9 +151,6 @@ static int _ActorDestroyAll(lua_State *L) {
lua_pushstring(L, RIDX_RUNQUEUEHI);
lua_pushnil(L);
lua_settable(L, LUA_REGISTRYINDEX);
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_pushnil(L);
lua_settable(L, LUA_REGISTRYINDEX);

return 0;
}
Expand Down Expand Up @@ -306,49 +296,26 @@ int ActorSetUnblocked(TenshiActorState a) {
return LUA_OK;
}

// Called in protected mode
static int _ActorProcessTimeouts(lua_State *L) {
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_gettable(L, LUA_REGISTRYINDEX);

// Stack is timeoutqueue

lua_pushnil(L);
while (lua_next(L, -2) != 0) {
// stack is timeoutqueue, key (actor), value (timeout)
int timeout = lua_tointeger(L, -1);
lua_pop(L, 1);
// stack is timeoutqueue, key (actor)
if (--timeout == 0) {
// Unblock this actor
lua_pushvalue(L, -1);
lua_pushnil(L);
lua_settable(L, -4);
// stack is timeoutqueue, key (actor)
TenshiActorState a = ActorObjectGetCState(L);
if (ActorSetRunnable(a, 0) != LUA_OK) {
lua_pushstring(L, "Error setting actor to runnable");
lua_error(L);
}
// Set its woke_timeout flag
a->woke_timeout = 1;
} else {
// Decrement the timeout
lua_pushvalue(L, -1);
lua_pushinteger(L, timeout);
lua_settable(L, -4);
// stack is timeoutqueue, key (actor)
int ActorProcessTimeouts(TenshiRuntimeState s) {
int first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue);
if (first_timeout == -1) return LUA_OK;

while (first_timeout <= s->time_ticks) {
// Unblock this actor
TenshiActorState a =
(TenshiActorState)priority_queue_get_obj(s->timeout_ticks_queue);
if (ActorSetRunnable(a, 0) != LUA_OK) {
lua_pushstring(s->L, "Error setting actor to runnable");
lua_error(s->L);
}
}
// Set its woke_timeout flag
a->woke_timeout = 1;

// Stack is timeoutqueue
lua_pop(L, 1);
return 0;
}
// Next thing?
first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue);
}

int ActorProcessTimeouts(TenshiRuntimeState s) {
lua_pushcfunction(s->L, _ActorProcessTimeouts);
return lua_pcall(s->L, 0, 0, 0);
return LUA_OK;
}

int ActorWasWokenTimeout(TenshiActorState a) {
Expand Down
45 changes: 25 additions & 20 deletions vm/lua/tenshi-runtime/src/mboxlib.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,19 @@ int MBoxCreate(lua_State *L) {
// room. Input stack is alternating mailbox and value (only mailbox will be
// read).
// NOT A LUA C FUNCTION.
static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) {
static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout,
TenshiActorState a) {
// In order to handle sending to the same mailbox multiple times, here we
// total up the number of times each mailbox is referenced. We will then
// check whether there is enough space to push all of that data into the
// mailbox at once.

// Get the global state
lua_pushstring(L, RIDX_RUNTIMESTATE);
lua_gettable(L, LUA_REGISTRYINDEX);
TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1);
lua_pop(L, 1);

// Total up references to each mailbox
// stack is ...args...
lua_newtable(L);
Expand Down Expand Up @@ -252,15 +259,10 @@ static int MBoxSendCheckSpace(lua_State *L, int num_mboxes, int timeout) {
lua_pop(L, 1);
// stack is ...args...

if (!enough_space && (timeout > 0)) {
if (!enough_space && a && (timeout > 0)) {
// Add ourselves to the global timeout list
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_gettable(L, LUA_REGISTRYINDEX);
lua_pushcfunction(L, ActorGetOwnActor);
lua_call(L, 0, 1);
lua_pushinteger(L, timeout);
lua_settable(L, -3);
lua_pop(L, 1);
priority_queue_insert(s->timeout_ticks_queue, a,
TenshiGetTickTime(s) + timeout);
}

return enough_space;
Expand Down Expand Up @@ -427,14 +429,15 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) {
// yielded, and came back.

// Check if timeout happened
TenshiActorState a = NULL;
lua_pushcfunction(L, ActorGetOwnActor);
lua_call(L, 0, 1);
// We can have no actor if the external code is sending/receiving. Assume
// no timeout in that case
if (lua_isnil(L, -1)) {
lua_pop(L, 1);
} else {
TenshiActorState a = ActorObjectGetCState(L);
a = ActorObjectGetCState(L);
lua_pop(L, 1);
if (ActorWasWokenTimeout(a)) {
// It was due to a timeout
Expand Down Expand Up @@ -468,7 +471,7 @@ static int MBoxSendReal(lua_State *L, int status, int ctx) {

// Check if all mailboxes have space
// stack is ...args...
if (!MBoxSendCheckSpace(L, num_mboxes, timeout)) {
if (!MBoxSendCheckSpace(L, num_mboxes, timeout, a)) {
if (timeout == 0) {
// No timeout, so we return immediately
lua_pop(L, lua_gettop(L));
Expand Down Expand Up @@ -567,15 +570,22 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) {
// Called either on initial attempt to recv or when we tried, failed,
// yielded, and came back.

// Get the global state
lua_pushstring(L, RIDX_RUNTIMESTATE);
lua_gettable(L, LUA_REGISTRYINDEX);
TenshiRuntimeState s = (TenshiRuntimeState)lua_topointer(L, -1);
lua_pop(L, 1);

// Check if timeout happened
TenshiActorState a = NULL;
lua_pushcfunction(L, ActorGetOwnActor);
lua_call(L, 0, 1);
// We can have no actor if the external code is sending/receiving. Assume
// no timeout in that case
if (lua_isnil(L, -1)) {
lua_pop(L, 1);
} else {
TenshiActorState a = ActorObjectGetCState(L);
a = ActorObjectGetCState(L);
lua_pop(L, 1);
if (ActorWasWokenTimeout(a)) {
// It was due to a timeout
Expand Down Expand Up @@ -696,15 +706,10 @@ static int MBoxRecvReal(lua_State *L, int status, int ctx) {
lua_pop(L, 2);
}

if (timeout > 0) {
if (timeout > 0 && a) {
// Add ourselves to the global timeout list
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_gettable(L, LUA_REGISTRYINDEX);
lua_pushcfunction(L, ActorGetOwnActor);
lua_call(L, 0, 1);
lua_pushinteger(L, timeout);
lua_settable(L, -3);
lua_pop(L, 1);
priority_queue_insert(s->timeout_ticks_queue, a,
TenshiGetTickTime(s) + timeout);
}

// stack is ...args...
Expand Down
19 changes: 19 additions & 0 deletions vm/lua/tenshi-runtime/src/runtime_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ TenshiRuntimeState TenshiRuntimeInit(void) {
return NULL;
}

ret->timeout_ticks_queue = priority_queue_create(realloc, 0);
if (!ret->timeout_ticks_queue) {
TenshiRuntimeDeinit(ret);
return NULL;
}

// Register checkxip function on ARM
#ifdef __arm__
lua_setcheckxip(ret->L, lua_arm_checkxip);
Expand Down Expand Up @@ -323,6 +329,8 @@ TenshiRuntimeState TenshiRuntimeInit(void) {

lua_gc(ret->L, LUA_GCCOLLECT, 0);

ret->time_ticks = 0;

return ret;
}

Expand All @@ -335,6 +343,10 @@ void TenshiRuntimeDeinit(TenshiRuntimeState s) {
lua_close(s->L);
}

if (s->timeout_ticks_queue) {
priority_queue_free(s->timeout_ticks_queue, free);
}

free(s);
}

Expand Down Expand Up @@ -436,6 +448,9 @@ int TenshiRunQuanta(TenshiRuntimeState s) {

lua_gc(s->L, LUA_GCCOLLECT, 0);

// Increment tick count
s->time_ticks++;

return LUA_OK;
}

Expand Down Expand Up @@ -463,3 +478,7 @@ int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev) {
lua_pushlightuserdata(s->L, dev);
return lua_pcall(s->L, 1, 0, 0);
}

uint32_t TenshiGetTickTime(TenshiRuntimeState s) {
return s->time_ticks;
}

0 comments on commit 98771b5

Please sign in to comment.