-
Notifications
You must be signed in to change notification settings - Fork 2
/
run_speculative_backfilling.py
106 lines (85 loc) · 3.79 KB
/
run_speculative_backfilling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import sys
sys.path.append("./ScheduleFlow_v1.1")
import ScheduleFlow
import SpeculativeBackfill
import Workload
import SpeculativeSubmission
import numpy as np
import sys
num_jobs_large = 70
num_jobs_small = 30
def req_time_large(walltimes):
return [i+np.random.randint(3600, 7200) for i in walltimes]
def req_time_small(walltimes):
return [i+np.random.randint(1800, 5400) for i in walltimes]
def req_procs(walltimes):
distr = Workload.BetaDistr(2, 2)
sequence = distr.random_sample(len(walltimes))
return [max(1, int(i*10)) for i in sequence]
def generate_workload(sequence, procs, wd_large, wd_small,
distr_large, distr_small):
if sequence == "TOptimal":
sw = Workload.TOptimalSequence(distr_large, discret_samples=127)
wd_large.set_request_time(
request_sequence=sw.compute_request_sequence())
elif sequence == "MASI":
wd_large.set_request_time(request_function=req_time_large)
elif sequence == "HPC":
request_sequence = Workload.ConstantDistr(
wd_large.upper_bound).random_sample(1)
wd_large.set_request_time(request_sequence=request_sequence)
apl = wd_large.generate_workload()
if sequence == "TOptimal":
sw = Workload.TOptimalSequence(distr_small, discret_samples=127)
wd_small.set_request_time(
request_sequence=sw.compute_request_sequence())
elif sequence == "MASI":
wd_small.set_request_time(request_function=req_time_small)
elif sequence == "HPC":
request_sequence = Workload.ConstantDistr(
wd_small.upper_bound).random_sample(1)
wd_small.set_request_time(request_sequence=request_sequence)
apl_small = wd_small.generate_workload()
for job in apl_small:
job.job_id = job.job_id + num_jobs_large
job.submission_time = 10
return apl + apl_small
if __name__ == '__main__':
print("Generating the workload...")
scenario_list = ["TOptimal", "HPC"]
procs_list = ["beta", "full"]
distr_large = Workload.TruncNormalDistr(1, 8, 4, 2)
wd_large = Workload.Workload(distr_large, num_jobs_large)
distr_small = Workload.TruncNormalDistr(0.1, 6, 1, 1)
wd_small = Workload.Workload(distr_small, num_jobs_small)
for procs in procs_list:
if procs == "full":
wd_large.set_processing_units(
distribution=Workload.ConstantDistr(10))
wd_small.set_processing_units(
distribution=Workload.ConstantDistr(10))
elif procs == "beta":
wd_large.set_processing_units(procs_function=req_procs)
wd_small.set_processing_units(procs_function=req_procs)
outf = sys.stdout
for sequence in scenario_list:
#outf = open("backfill_%s_%s_%d" % (sequence.lower(),
# procs,
# num_jobs_small), "a")
simulation = ScheduleFlow.Simulator(check_correctness=True,
output_file_handler=outf)
apl = generate_workload(sequence, procs, wd_large,
wd_small, distr_large, distr_small)
print(sequence, procs)
print("Running the new backfilling scheme...")
sch = SpeculativeBackfill.SpeculativeBatchScheduler(
ScheduleFlow.System(10))
simulation.create_scenario("speculative", sch)
simulation.add_applications(apl)
ret = simulation.run()
print("Running the classic HPC backfilling scheme...")
sch = ScheduleFlow.BatchScheduler(ScheduleFlow.System(10))
simulation.create_scenario("classic", sch)
simulation.add_applications(apl)
ret = simulation.run()
#outf.close()