Skip to content
This repository has been archived by the owner on Jul 1, 2018. It is now read-only.

New timeout queue #261

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion vm/lua/tenshi-runtime/inc/actor_sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ typedef struct _TenshiActorState* TenshiActorState;
#define RIDX_ALLACTORS "tenshi.allActors"
#define RIDX_RUNQUEUELO "tenshi.runQueueLo"
#define RIDX_RUNQUEUEHI "tenshi.runQueueHi"
#define RIDX_TIMEOUTQUEUE "tenshi.timeoutQueue"

// To be called in protected mode.
extern int ActorSchedulerInit(lua_State *L);
Expand Down
5 changes: 5 additions & 0 deletions vm/lua/tenshi-runtime/inc/runtime_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef INC_RUNTIME_ENTRY_H_
#define INC_RUNTIME_ENTRY_H_

#include <stdint.h>
#include <stdlib.h>
#include "lua.h" // NOLINT(build/include)
#include "lauxlib.h" // NOLINT(build/include)
Expand Down Expand Up @@ -76,4 +77,8 @@ extern void TenshiRegisterCFunctions(TenshiRuntimeState s, const luaL_Reg *l);
// LUA_OK on success.
extern int TenshiFlagSensor(TenshiRuntimeState s, const void *const dev);

// Get the current tick time of the runtime (incremetns when TenshiRunQuanta
// is called).
extern uint32_t TenshiGetTickTime(TenshiRuntimeState s);

#endif // INC_RUNTIME_ENTRY_H_
10 changes: 10 additions & 0 deletions vm/lua/tenshi-runtime/inc/runtime_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#ifndef INC_RUNTIME_INTERNAL_H_
#define INC_RUNTIME_INTERNAL_H_

#include <stdint.h>

#include "inc/priority_queue.h"

struct _TenshiRuntimeState {
// Lua state object -- refers to "main" thread (that never executes)
lua_State *L;
Expand All @@ -27,6 +31,10 @@ struct _TenshiRuntimeState {
// Thread called after "main" part of loop that translates data sent into
// mailboxes into actuator update functions
TenshiActorState actuator_actor;
// Time in "ticks" -- incremented every time TenshiRunQuanta is called
uint32_t time_ticks;
// A priority queue for threads blocking on some number of ticks to elapse
priority_queue_t timeout_ticks_queue;
};

struct _TenshiActorState {
Expand All @@ -38,6 +46,8 @@ struct _TenshiActorState {
int isblocked;
// Set to true if this actor woke because of a timeout.
int woke_timeout;
// Used for scheduler linked lists (run queue)
TenshiActorState next;
};

#endif // INC_RUNTIME_INTERNAL_H_
218 changes: 74 additions & 144 deletions vm/lua/tenshi-runtime/src/actor_sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
#include "inc/actor_sched.h"
#include "inc/runtime_internal.h"

// Head node of a linked list for scheduler run queue
typedef struct RunQueueStruct {
TenshiActorState head;
TenshiActorState tail;
} RunQueueStruct;

int ActorSchedulerInit(lua_State *L) {
// This scheduler is almost the most naive possible scheduler consisting of
// two priority level with a FIFO of tasks ready to run. Scheduler state is
Expand All @@ -38,30 +44,15 @@ int ActorSchedulerInit(lua_State *L) {
// The runnable queue is a singly-linked list with a last element pointer.
// Here we initialize both head and tail in both queues to be nil.
lua_pushstring(L, RIDX_RUNQUEUELO);
lua_newtable(L);
lua_pushstring(L, "head");
lua_pushnil(L);
lua_settable(L, -3);
lua_pushstring(L, "tail");
lua_pushnil(L);
lua_settable(L, -3);
RunQueueStruct *runqueue = lua_newuserdata(L, sizeof(RunQueueStruct));
runqueue->head = NULL;
runqueue->tail = NULL;
lua_settable(L, LUA_REGISTRYINDEX);

lua_pushstring(L, RIDX_RUNQUEUEHI);
lua_newtable(L);
lua_pushstring(L, "head");
lua_pushnil(L);
lua_settable(L, -3);
lua_pushstring(L, "tail");
lua_pushnil(L);
lua_settable(L, -3);
lua_settable(L, LUA_REGISTRYINDEX);

// This is a simple unsorted array of things waiting on timeouts.
// TODO(rqou): This is not algorithmically efficient -- eventually
// should replace with a priority queue.
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_newtable(L);
runqueue = lua_newuserdata(L, sizeof(RunQueueStruct));
runqueue->head = NULL;
runqueue->tail = NULL;
lua_settable(L, LUA_REGISTRYINDEX);

return 0;
Expand Down Expand Up @@ -160,9 +151,6 @@ static int _ActorDestroyAll(lua_State *L) {
lua_pushstring(L, RIDX_RUNQUEUEHI);
lua_pushnil(L);
lua_settable(L, LUA_REGISTRYINDEX);
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_pushnil(L);
lua_settable(L, LUA_REGISTRYINDEX);

return 0;
}
Expand All @@ -177,51 +165,33 @@ void ActorDestroyAll(TenshiRuntimeState s) {
}

// Called in protected mode
// _ActorSetRunnable(lohi, TenshiActorState)
static int _ActorSetRunnable(lua_State *L) {
// Create our list object
lua_newtable(L);
lua_pushstring(L, "next");
lua_pushnil(L);
lua_settable(L, -3);
lua_pushstring(L, "thread");
lua_pushthread(L);
lua_settable(L, -3);

// Add ourselves to the back of the runnable list
lua_pushvalue(L, -2);
// Get the RunQueueStruct
lua_pushvalue(L, 1);
lua_gettable(L, LUA_REGISTRYINDEX);
RunQueueStruct *runqueue = (RunQueueStruct *)(lua_touserdata(L, -1));
lua_pop(L, 1);
// stack is lohi, TenshiActorState

lua_pushstring(L, "tail");
lua_gettable(L, -2);
// stack is lohi, newobj, runqueue, tailelem
// Get the TenshiActorState
TenshiActorState a = lua_touserdata(L, 2);

if (lua_isnil(L, -1)) {
if (runqueue->tail == NULL) {
// If there is no runnables list, make it point to us
lua_pop(L, 1);
// stack is lohi, newobj, runqueue
lua_pushstring(L, "head");
lua_pushvalue(L, -3);
lua_settable(L, -3);
lua_pushstring(L, "tail");
lua_pushvalue(L, -3);
lua_settable(L, -3);
// stack is lohi, newobj, runqueue
lua_pop(L, 3);
runqueue->head = runqueue->tail = a;
a->next = NULL;

// stack is lohi, TenshiActorState
lua_pop(L, 2);
return 0;
} else {
// stack is lohi, newobj, runqueue, tailelem
lua_pushstring(L, "next");
lua_pushvalue(L, -4);
// stack is lohi, newobj, runqueue, tailelem, "next", newobj
lua_settable(L, -3);
lua_pop(L, 1);
// stack is lohi, newobj, runqueue
// Need to update tail poninter now
lua_pushstring(L, "tail");
lua_pushvalue(L, -3);
lua_settable(L, -3);
// stack is lohi, newobj, runqueue
lua_pop(L, 3);
runqueue->tail->next = a;
a->next = NULL;
runqueue->tail = a;

// stack is lohi, TenshiActorState
lua_pop(L, 2);
return 0;
}
}
Expand All @@ -233,59 +203,49 @@ int ActorSetRunnable(TenshiActorState a, int highPriority) {
} else {
lua_pushstring(a->L, RIDX_RUNQUEUELO);
}
return lua_pcall(a->L, 1, 0, 0);
lua_pushlightuserdata(a->L, a);
return lua_pcall(a->L, 2, 0, 0);
}

// Called in protected mode
// TenshiActorState = _ActorDequeueHead()
static int _ActorDequeueHead(lua_State *L) {
RunQueueStruct *runqueue;

lua_pushstring(L, RIDX_RUNQUEUEHI);
lua_gettable(L, LUA_REGISTRYINDEX);
lua_pushstring(L, "head");
lua_gettable(L, -2);
// stack is runqueuehi, headelem
runqueue = lua_touserdata(L, -1);
lua_pop(L, 1);
// stack is empty

if (lua_isnil(L, -1)) {
if (runqueue->head == NULL) {
// No high priority, how about low?
lua_pop(L, 2);
lua_pushstring(L, RIDX_RUNQUEUELO);
lua_gettable(L, LUA_REGISTRYINDEX);
lua_pushstring(L, "head");
lua_gettable(L, -2);
// stack is runqueuelo, headelem
if (lua_isnil(L, -1)) {
runqueue = lua_touserdata(L, -1);
lua_pop(L, 1);
// stack is empty
if (runqueue->head == NULL) {
// No low priority either, so nothing at all.
lua_pop(L, 2);
lua_pushnil(L);
return 1;
}
}

// stack is runqueue(hi/lo), headelem
lua_pushstring(L, "thread");
lua_gettable(L, -2);
lua_pushstring(L, "next");
lua_gettable(L, -3);
// stack is runqueue, headelem, thread, nextelem
if (lua_isnil(L, -1)) {
// runqueue is the hi/lo runqueue, and it definitely has a head
TenshiActorState ret = runqueue->head;

if (runqueue->head->next == NULL) {
// This was the only thread
lua_pushstring(L, "head");
lua_pushnil(L);
lua_settable(L, -6);
lua_pushstring(L, "tail");
lua_pushnil(L);
lua_settable(L, -6);
// stack is runqueue, headelem, thread, nextelem
lua_copy(L, -2, -4);
lua_pop(L, 3);
runqueue->head = runqueue->tail = NULL;
// stack is empty
lua_pushlightuserdata(L, ret);
return 1;
} else {
// Other threads exist
lua_pushstring(L, "head");
lua_pushvalue(L, -2);
// stack is runqueue, headelem, thread, nextelem, "head", nextelem
lua_settable(L, -6);
lua_copy(L, -2, -4);
lua_pop(L, 3);
runqueue->head = runqueue->head->next;
// stack is empty
lua_pushlightuserdata(L, ret);
return 1;
}
}
Expand Down Expand Up @@ -314,15 +274,8 @@ int ActorDequeueHead(TenshiRuntimeState s, TenshiActorState *a_out) {
ret = lua_pcall(s->L, 0, 1, 0);
if (ret != LUA_OK) return ret;

lua_pushcfunction(s->L, ActorFindInTaskset);
lua_insert(s->L, -2);
ret = lua_pcall(s->L, 1, 1, 0);
if (ret != LUA_OK) return ret;

if (lua_isnil(s->L, -1))
*a_out = NULL;
else
*a_out = ActorObjectGetCState(s->L);
// Will be null if function returned nil
*a_out = lua_touserdata(s->L, -1);
lua_pop(s->L, 1);

return LUA_OK;
Expand All @@ -343,49 +296,26 @@ int ActorSetUnblocked(TenshiActorState a) {
return LUA_OK;
}

// Called in protected mode
static int _ActorProcessTimeouts(lua_State *L) {
lua_pushstring(L, RIDX_TIMEOUTQUEUE);
lua_gettable(L, LUA_REGISTRYINDEX);

// Stack is timeoutqueue

lua_pushnil(L);
while (lua_next(L, -2) != 0) {
// stack is timeoutqueue, key (actor), value (timeout)
int timeout = lua_tointeger(L, -1);
lua_pop(L, 1);
// stack is timeoutqueue, key (actor)
if (--timeout == 0) {
// Unblock this actor
lua_pushvalue(L, -1);
lua_pushnil(L);
lua_settable(L, -4);
// stack is timeoutqueue, key (actor)
TenshiActorState a = ActorObjectGetCState(L);
if (ActorSetRunnable(a, 0) != LUA_OK) {
lua_pushstring(L, "Error setting actor to runnable");
lua_error(L);
}
// Set its woke_timeout flag
a->woke_timeout = 1;
} else {
// Decrement the timeout
lua_pushvalue(L, -1);
lua_pushinteger(L, timeout);
lua_settable(L, -4);
// stack is timeoutqueue, key (actor)
int ActorProcessTimeouts(TenshiRuntimeState s) {
int first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue);
if (first_timeout == -1) return LUA_OK;

while (first_timeout <= s->time_ticks) {
// Unblock this actor
TenshiActorState a =
(TenshiActorState)priority_queue_get_obj(s->timeout_ticks_queue);
if (ActorSetRunnable(a, 0) != LUA_OK) {
lua_pushstring(s->L, "Error setting actor to runnable");
lua_error(s->L);
}
}
// Set its woke_timeout flag
a->woke_timeout = 1;

// Stack is timeoutqueue
lua_pop(L, 1);
return 0;
}
// Next thing?
first_timeout = priority_queue_peek_pri(s->timeout_ticks_queue);
}

int ActorProcessTimeouts(TenshiRuntimeState s) {
lua_pushcfunction(s->L, _ActorProcessTimeouts);
return lua_pcall(s->L, 0, 0, 0);
return LUA_OK;
}

int ActorWasWokenTimeout(TenshiActorState a) {
Expand Down
Loading