Skip to content

Commit

Permalink
Task manager enhancements (#1594)
Browse files Browse the repository at this point in the history
Co-authored-by: Sultan Uramaev <[email protected]>
  • Loading branch information
olefirenque and Xottab-DUTY committed Apr 7, 2024
1 parent dd89487 commit 3e8fd72
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
24 changes: 15 additions & 9 deletions src/xrCore/Threading/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TaskWorker : public TaskQueue, public TaskWorkerStats
std::atomic_bool sleeps{};
Event event;
size_t id;
CRandom random{ s32(std::intptr_t(this)) };
} static thread_local s_tl_worker;

static TaskWorker* s_main_thread_worker = nullptr;
Expand Down Expand Up @@ -258,7 +259,7 @@ void TaskManager::SetThreadStatus(bool active)
activeWorkersCount.fetch_sub(1, std::memory_order_relaxed);
}

void TaskManager::WakeUpIfNeeded()
void TaskManager::WakeUpIfNeeded() const
{
const auto overall = workersCount.load(std::memory_order_relaxed);
const auto active = activeWorkersCount.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -318,7 +319,7 @@ void TaskManager::TaskWorkerStart()
}
steal:
{
task = TryToSteal(&s_tl_worker);
task = TryToSteal();
if (task)
goto execute;
}
Expand Down Expand Up @@ -364,7 +365,7 @@ void TaskManager::TaskWorkerStart()
s_tl_worker.event.Wait(); // prevent crash when other thread tries to steal
}

Task* TaskManager::TryToSteal(TaskWorker* thief)
Task* TaskManager::TryToSteal() const
{
const auto count = workersCount.load(std::memory_order_relaxed);
if (count == 1)
Expand All @@ -374,13 +375,18 @@ Task* TaskManager::TryToSteal(TaskWorker* thief)
return nullptr; // thread itself
}

TaskWorker* other = workers[random.randI(count)];
if (other != thief)
int steal_attempts = 3;
while (steal_attempts > 0)
{
TaskWorker* other = workers[s_tl_worker.random.randI(count)];
if (other == &s_tl_worker)
continue;
auto* task = other->steal();
if (!other->empty() && other->sleeps.load(std::memory_order_relaxed))
other->event.Set(); // Wake up, you have work to do!
return task;
if (task)
return task;
--steal_attempts;
}
return nullptr;
}
Expand Down Expand Up @@ -417,7 +423,7 @@ void TaskManager::IncrementTaskJobsCounter(Task& parent)
VERIFY2(prev != std::numeric_limits<decltype(prev)>::max(), "Max jobs overflow. (too much children)");
}

void TaskManager::PushTask(Task& task)
void TaskManager::PushTask(Task& task) const
{
s_tl_worker.push(&task);
WakeUpIfNeeded();
Expand Down Expand Up @@ -455,7 +461,7 @@ bool TaskManager::ExecuteOneTask()

Task* task = s_tl_worker.pop();
if (!task)
task = TryToSteal(&s_tl_worker);
task = TryToSteal();

if (task)
{
Expand Down Expand Up @@ -539,7 +545,7 @@ void TaskManager::GetStats(size_t& allocated, size_t& allocatedWithFallback, siz
finished += s_main_thread_worker->finishedTasks;

ScopeLock scope(&workersLock);
for (TaskWorker* worker : workers)
for (const TaskWorker* worker : workers)
{
allocated += worker->allocatedTasks;
pushed += worker->pushedTasks;
Expand Down
18 changes: 8 additions & 10 deletions src/xrCore/Threading/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ class XRCORE_API TaskManager final

std::atomic_bool shouldStop{};

CRandom random; // non-atomic intentionally, possible data-races can make it even more random

private:
ICN void TaskWorkerStart();

[[nodiscard]] Task* TryToSteal(TaskWorker* thief);
[[nodiscard]] Task* TryToSteal() const;

static void ExecuteTask(Task& task);
static void FinalizeTask(Task& task);
Expand All @@ -46,7 +44,7 @@ class XRCORE_API TaskManager final

private:
void SetThreadStatus(bool active);
void WakeUpIfNeeded();
void WakeUpIfNeeded() const;

public:
TaskManager();
Expand All @@ -55,18 +53,18 @@ class XRCORE_API TaskManager final
public:
// TaskFunc is at the end for fancy in-place lambdas
// Create a task, but don't run it yet
[[nodiscard]] Task& CreateTask(pcstr name, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] Task& CreateTask(pcstr name, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(pcstr name, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(pcstr name, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

// Create a task as child, but don't run it yet
[[nodiscard]] Task& CreateTask(Task& parent, pcstr name, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] Task& CreateTask(Task& parent, pcstr name, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(Task& parent, pcstr name, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
[[nodiscard]] static Task& CreateTask(Task& parent, pcstr name, const Task::OnFinishFunc& onFinishCallback, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);

// Run task in parallel
void PushTask(Task& task);
void PushTask(Task& task) const;

// Run task immediately in this thread
void RunTask(Task& task);
static void RunTask(Task& task);

// Shortcut: create a task and run it immediately
Task& AddTask(pcstr name, const Task::TaskFunc& taskFunc, size_t dataSize = 0, void* data = nullptr);
Expand Down

0 comments on commit 3e8fd72

Please sign in to comment.