-
Notifications
You must be signed in to change notification settings - Fork 1
/
SlurmConnect.jl
115 lines (94 loc) · 3.66 KB
/
SlurmConnect.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
VERSION >= v"0.4.0-dev+6521" && __precompile__(true)
module SlurmConnect
using Compat
export launch, manage, kill, init_worker, connect
import Base: launch, manage, kill, init_worker, connect
export SlurmManager, addprocs_slurm
worker_arg = `--worker`
function __init__()
global worker_arg
worker_arg = `--worker=$(Base.cluster_cookie())`
end
# ClusterManager for Slurm
immutable SlurmManager <: ClusterManager
np::Integer
end
function launch(manager::SlurmManager, params::Dict, instances_arr::Array, c::Condition)
try
exehome = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
stdkeys = keys(Base.Distributed.default_addprocs_params())
p = filter((x,y) -> !(x in stdkeys), params)
srunargs = []
for k in keys(p)
if length(string(k)) == 1
push!(srunargs, "-$k")
val = p[k]
if length(val) > 0
push!(srunargs, "$(p[k])")
end
else
k2 = replace(string(k), "_", "-")
val = p[k]
if length(val) > 0
push!(srunargs, "--$(k2)=$(p[k])")
else
push!(srunargs, "--$(k2)")
end
end
end
println("p is: $p")
println("srun args: $srunargs")
println("params: $params")
println("\n\n")
# cleanup old files
map(rm, filter(t -> ismatch(r"job.*\.out", t), readdir(exehome)))
np = manager.np
jobname = "julia-$(getpid())"
srun_cmd = `srun -J $jobname -n $np -o "job%4t.out" -D $exehome $(srunargs) $exename $exeflags $worker_arg`
println(jobname)
println(srun_cmd)
out, srun_proc = open(srun_cmd)
println("out: $out")
println("srun_proc: $srun_proc")
for i = 0:np - 1
print("connecting to worker $(i + 1) out of $np\r")
local w=[]
fn = "$exehome/job$(lpad(i, 4, "0")).out"
t0 = time()
while true
if time() > t0 + 60 + np
warn("dropping worker: file not created in $(60 + np) seconds")
break
end
sleep(0.01)
if isfile(fn) && filesize(fn) > 0
w = open(fn) do f
return split(split(readline(f), ":")[2], "#")
end
break
end
end
print("w is: $w")
if length(w) > 0
config = WorkerConfig()
config.port = parse(Int, w[1])
config.host = strip(w[2])
# Keep a reference to the proc, so it's properly closed once
# the last worker exits.
config.userdata = srun_proc
push!(instances_arr, config)
notify(c)
end
end
catch e
println("Error launching Slurm job:")
rethrow(e)
end
end
function manage(manager::SlurmManager, id::Integer, config::WorkerConfig, op::Symbol)
# This function needs to exist, but so far we don't do anything
end
addprocs_slurm(np::Integer; kwargs...) = addprocs(SlurmManager(np); kwargs...)
end