-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.hpp
94 lines (79 loc) · 1.99 KB
/
executor.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <bits/stdc++.h>
#include "taskflow.hpp"
using namespace std;
class Executor {
// friend class task;
// friend class taskflow;
public:
Executor(){};
mutex mtx, cv_mtx;
condition_variable cond_var;
// Taskflow current_taskflow;
vector<Task*> task_list;
int total_task=0;
static void execute_thread(Executor* exe){
bool flag=true;
do {
exe->mtx.lock();
if(exe->task_list.size()==0){
exe->mtx.unlock();
return;
//sleep // sleep->
}
else{
Task* new_task = exe->task_list[exe->task_list.size()-1];
exe->task_list.pop_back();
if(new_task->dependency>0){
exe->task_list.push_back(new_task);
exe->mtx.unlock();
unique_lock<mutex> lck(exe->cv_mtx);
exe->cond_var.wait(lck);
//sleep
}
else{
exe->mtx.unlock();
new_task->execute_task();
for(auto suc_task: new_task->successor){
suc_task->dependency--;
}
unique_lock<mutex> lck(exe->cv_mtx);
// cout<<"Notify them stupid\n";
exe->cond_var.notify_all();
// notify all
}
}
} while(flag!=false);
// function execute
// dependency remove
};
void start_threads(int num_of_threads){
vector<thread> all_threads;
for(int i=0;i<num_of_threads;i++){
thread th(execute_thread, this);
all_threads.push_back(move(th));
}
for(int i=0;i<num_of_threads;i++){
all_threads[i].join();
}
return;
}
void run(vector<Taskflow> taskflow_list){
// graph build
for(auto current_taskflow: taskflow_list){
current_taskflow.build();
for(Task* task: current_taskflow.topological_sort_list){
task_list.push_back(task);
total_task++;
}
}
// cout<<"Top list:";
// for(auto task: task_list){
// cout<<task->id+1<<" ";
// }
// cout<<"\n";
// cout<<"Size after build"<<task_list[2]->successor.size()<<"\n";
// algorith start -- threads
int num_of_threads = NUMBER_OF_THREADS;
this->start_threads(num_of_threads);
}
};