This repository has been archived by the owner on Feb 11, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdask-cluster.jl
85 lines (61 loc) · 1.6 KB
/
dask-cluster.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# based on http://matthewrocklin.com/blog/work/2017/01/24/dask-custom
if !isempty(ARGS)
addprocs(parse(Int, ARGS[1]))
else
addprocs()
end
using Dispatcher
using LightGraphs
using Memento
const LOG_LEVEL = "info" # could also be "debug", "notice", "warn", etc
Memento.config(LOG_LEVEL; fmt="[{level} | {name}]: {msg}")
const logger = get_logger(current_module())
@everywhere function load(address)
sleep(rand() / 2)
return 1
end
@everywhere function load_from_sql(address)
sleep(rand() / 2)
return 1
end
@everywhere function process(data, reference)
sleep(rand() / 2)
return 1
end
@everywhere function roll(a, b, c)
sleep(rand() / 5)
return 1
end
@everywhere function compare(a, b)
sleep(rand() / 10)
return 1
end
@everywhere function reduction(seq)
sleep(rand() / 1)
return 1
end
function main()
filenames = ["mydata-$d.dat" for d in 1:100]
data = [(@op load(filename)) for filename in filenames]
reference = @op load_from_sql("sql://mytable")
processed = [(@op process(d, reference)) for d in data]
rolled = map(1:(length(processed) - 2)) do i
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
roll_result = @op roll(a, b, c)
return roll_result
end
compared = map(1:200) do i
a = rand(rolled)
b = rand(rolled)
compare_result = @op compare(a, b)
return compare_result
end
best = @op reduction(CollectNode(compared))
executor = ParallelExecutor()
(run_best,) = run!(executor, [best])
return run_best
end
main()
@time main()