Skip to content

Commit

Permalink
feat: Execute pending tasks on each loop iteration
Browse files Browse the repository at this point in the history
Pending tasks are resumed. If they complete, they're added to the `dead'
table where they will later be collected (this is done to avoid
modifying the `pending' table while iterating over it). The runtime does
not signal exit unless no pending and no blocked tasks exist.

This may need slight adjustments when we introduce awaiting on tasks, as
it currently wouldn't handle structured concurrency correctly if await()
was called.
  • Loading branch information
mcb2003 committed Jun 13, 2024
1 parent b8d82b2 commit 7356518
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 9 deletions.
20 changes: 12 additions & 8 deletions rt/modules/task.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#include "modules/task.hpp"
#include "lua/helpers.hpp"
#include "modules/weak.hpp"

namespace lua = lege::lua;
namespace weak = lege::weak;

#define ENV_NAME "lege.task.env"
#define MT_NAME "lege.task.mt"
// this is useful for more than just metatables
#define luaL_newregistrytable(L, name) (luaL_newmetatable((L), (name)))

Expand Down Expand Up @@ -54,7 +53,7 @@ static int l_current(lua_State *L) {
}

static int l_tostring(lua_State *L) {
luaL_checkudata(L, 1, MT_NAME);
luaL_checkudata(L, 1, LEGE_TASK_MT_NAME);
lua_settop(L, 1);

lua_getfenv(L, 1);
Expand All @@ -67,7 +66,7 @@ static int l_tostring(lua_State *L) {
}

static int l_index(lua_State *L) {
luaL_checkudata(L, 1, MT_NAME);
luaL_checkudata(L, 1, LEGE_TASK_MT_NAME);
lua_settop(L, 2);

// Fields come from the environment table
Expand All @@ -87,7 +86,7 @@ static int l_index(lua_State *L) {
}

static int l_newindex(lua_State *L) {
luaL_checkudata(L, 1, MT_NAME);
luaL_checkudata(L, 1, LEGE_TASK_MT_NAME);

const char *field = lua_tostring(L, 2);
luaL_callmeta(L, 1, "__tostring");
Expand All @@ -108,7 +107,7 @@ static void make_task(lua_State *L, int nameindex, int funcindex) {
lua_pushvalue(L, funcindex);
lua_xmove(L, co, 1);

if (luaL_newmetatable(L, MT_NAME)) {
if (luaL_newmetatable(L, LEGE_TASK_MT_NAME)) {
// Set up metatable
lua_pushliteral(L, "__tostring");
lua_pushcfunction(L, l_tostring);
Expand Down Expand Up @@ -223,7 +222,7 @@ static const luaL_Reg TASK_FUNCS[]{{"get_support_tables", l_get_support_tables},
{nullptr, nullptr}};

static void make_support_tables(lua_State *L) {
if (luaL_newregistrytable(L, ENV_NAME)) {
if (luaL_newregistrytable(L, LEGE_TASK_ENV_NAME)) {
lua_pushliteral(L, "by_thread");
weak::new_kv(L);
lua_rawset(L, -3);
Expand All @@ -238,9 +237,14 @@ static void make_support_tables(lua_State *L) {
lua_newtable(L);
lua_rawset(L, -3);

// Tasks that are dead, and need to be cleaned up
lua_pushliteral(L, "dead");
weak::new_k(L);
lua_rawset(L, -3);

// Tasks spawned from the main thread
lua_pushliteral(L, "toplevels");
lua_newtable(L);
weak::new_k(L);
lua_rawset(L, -3);
}
}
Expand Down
7 changes: 7 additions & 0 deletions rt/modules/task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#ifndef LIBLEGE_TASK_HPP
#define LIBLEGE_TASK_HPP

#define LEGE_TASK_ENV_NAME "lege.task.env"
#define LEGE_TASK_MT_NAME "lege.task.mt"

#endif
63 changes: 62 additions & 1 deletion rt/runtime.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <cstddef>
#include <exception>
#include <memory>
#include <string_view>
Expand All @@ -6,6 +7,7 @@
#include <lua.hpp>

#include "lua/helpers.hpp"
#include "modules/task.hpp"
#include "runtime.hpp"

namespace lua = lege::lua;
Expand Down Expand Up @@ -68,6 +70,65 @@ void Runtime::setup() {
lua_call(L, 0, 0);
}

bool Runtime::runOnce() { return true; }
bool Runtime::runOnce() {
lua_settop(L, 0); // Clear stack

// Run pending tasks
luaL_getmetatable(L, LEGE_TASK_ENV_NAME);
lua_pushliteral(L, "dead");
lua_rawget(L, 1);
lua_pushliteral(L, "pending");
lua_rawget(L, 1);

// Loop over pending tasks and run them
std::size_t numAliveTasks = 0;
for (lua_pushnil(L); lua_next(L, 3); lua_settop(L, 4)) {
// 1 = env, 2 = dead, 3 = pending, 4 = task
++numAliveTasks;
lua_settop(L, 4); // Pop `true' value

lua_getfenv(L, 4);
lua_pushliteral(L, "co");
lua_rawget(L, -2);
lua_State *co = lua_tothread(L, -1);
switch (lua_resume(co, 0)) {
case LUA_OK:
// Coroutine finished, is now dead

// dead[task] = true
lua_pushvalue(L, 4);
lua_pushboolean(L, true);
lua_rawset(L, 2);
break;
case LUA_YIELD:
// Coroutine yielded, is now either blocked or still pending
break;
default:
throw lua::Error(co, "Error running coroutine");
}
}

// Remove dead tasks
for (lua_pushnil(L); lua_next(L, 2); lua_settop(L, 4)) {
// 1 = env, 2 = dead, 3 = pending, 4 = task

// pending[task] = nil
lua_pushvalue(L, 4);
lua_pushnil(L);
lua_rawset(L, 3);
// dead is automatically cleaned up because it's a weak-keyed table
}

lua_settop(L, 1);
lua_pushliteral(L, "blocked");
lua_rawget(L, 1);
// Count the blocked tasks
// Todo: cache this, update the cache when tasks are blocked / scheduled
for (lua_pushnil(L); lua_next(L, 2); lua_settop(L, 3)) {
++numAliveTasks;
}

return numAliveTasks > 0;
}

} // namespace lege

0 comments on commit 7356518

Please sign in to comment.