-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker-manager.js
113 lines (89 loc) · 2.5 KB
/
worker-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*******************
* Helper Functions
*******************/
function getCoreCount() {
let cores = window.navigator.hardwareConcurrency
if(!cores) {
// this makes me angry (looking at you, Safari). it's not worth the effort
const w = window.screen.width * window.devicePixelRatio
const h = window.screen.height * window.devicePixelRatio
if (w === 2880 && h === 1800) { cores = 8 } // MacBook Pro 15" retina
else { cores = 4 }
}
return cores
}
const run_f = args => worker => {
return worker.run(args)
//the show must go on no matter what.
.catch(e => console.error(e))
.then(result => {
worker.is_busy(false)
return {
args,
worker,
result
}
})
}
/***********************
* End Helper Functions
***********************/
function WorkerManager() {
// TODO: support fallbacks when worker errors out
/*
interface AbstractWorker<T>() {
is_busy: flyd.Stream(Boolean),
destroy: () => {},
run: args => Promise(T)
}
*/
this.worker_pool = []
// this.queue :: [queue_item]
/*
queue_item :: worker => ({
wrapped_f,
args,
worker
})
*/
this.queue = []
this.processed = []
}
WorkerManager.prototype.queue_pluck_one = function () {
const available_worker = this.worker_pool.find(w => !w.is_busy())
if(available_worker === undefined)
return
const job = this.queue.shift()
if(job !== undefined) {
available_worker.is_busy(true)
job(available_worker)
}
}
WorkerManager.prototype.queue_push = function (args) {
// Promises automatically unnest themselves
return new Promise((resolve, reject) => {
const linked_job = worker =>
run_f(args)(worker)
.then(x => {
this.processed.push(x)
return x
})
.then(x => {
this.queue_pluck_one()
return x
})
.then(resolve)
this.queue.push(linked_job)
this.queue_pluck_one()
})
}
WorkerManager.prototype.worker_add = function(worker) {
flyd.on(is_busy => {
const job = this.queue.shift()
if(job !== undefined) {
worker.is_busy(true)
job(worker)
}
}, flyd.filter(x => !x, worker.is_busy))
this.worker_pool.push(worker)
}