-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[batch] Add ability to create job groups at top level #14170
Changes from 39 commits
80b664a
0b50b03
6c9c776
284b457
d1fd11a
8ac5425
5ffa578
03cdaa5
904a045
3bdca11
e7fe638
e09c512
b40bff2
1e20595
ed95628
e6ed1f0
853d949
4c2b750
d7d3b53
1dc4ce9
b777802
295c339
166928c
322b01d
f1697c2
0d97818
0f2cc55
5fbd6e8
9b17076
239bd86
9889031
937d501
d32e968
328e7a6
26fe167
0b1b66b
9555687
8a468ba
8aa3bb8
f3b6e4c
c3b825f
36af4f8
7bb3f2b
0802a8e
d631d70
3597a89
1030f59
229d8b6
fc781a6
ef6163c
9fd31c3
4569b3d
4219370
9a9610f
1336950
f66f615
56f6c77
3ffdfae
823da60
df9ebcd
f76070d
ae1b484
51242a0
b138287
c63f961
490cff2
e67594f
e50ab12
2fdfcfc
aacfddb
16187ca
964dcfa
244354a
c4028c1
946ef12
1870039
de473d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,12 +179,13 @@ async def schedule_jobs_loop_body(self): | |
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.*, batches.format_version, batches.userdata, batches.user, attempts.instance_name, time_ready | ||
FROM batches | ||
INNER JOIN jobs ON batches.id = jobs.batch_id | ||
FROM job_groups | ||
LEFT JOIN batches ON batches.id = job_groups.batch_id | ||
LEFT JOIN jobs ON job_groups.batch_id = jobs.batch_id AND job_groups.job_group_id = jobs.job_group_id | ||
LEFT JOIN jobs_telemetry ON jobs.batch_id = jobs_telemetry.batch_id AND jobs.job_id = jobs_telemetry.job_id | ||
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id | ||
LEFT JOIN instances ON attempts.instance_name = instances.name | ||
WHERE batches.state = 'running' | ||
WHERE job_groups.state = 'running' | ||
AND jobs.state = 'Creating' | ||
AND (jobs.always_run OR NOT jobs.cancelled) | ||
AND jobs.inst_coll = %s | ||
|
@@ -349,54 +350,55 @@ async def create_instances_loop_body(self): | |
} | ||
|
||
async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | ||
async for batch in self.db.select_and_fetchall( | ||
async for job_group in self.db.select_and_fetchall( | ||
""" | ||
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version | ||
FROM batches | ||
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, job_groups.user, format_version | ||
FROM job_groups | ||
LEFT JOIN batches ON batches.id = job_groups.batch_id | ||
LEFT JOIN job_groups_cancelled | ||
ON batches.id = job_groups_cancelled.id | ||
WHERE user = %s AND `state` = 'running'; | ||
ON job_groups.batch_id = job_groups_cancelled.id AND job_groups.job_group_id = job_groups_cancelled.job_group_id | ||
WHERE job_groups.user = %s AND job_groups.`state` = 'running'; | ||
""", | ||
(user,), | ||
): | ||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.batch_id, jobs.job_id, jobs.spec, jobs.cores_mcpu, regions_bits_rep, COALESCE(SUM(instances.state IS NOT NULL AND | ||
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts | ||
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts, jobs.job_group_id | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_inst_coll_cancelled) | ||
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id | ||
LEFT JOIN instances ON attempts.instance_name = instances.name | ||
WHERE jobs.batch_id = %s AND jobs.state = 'Ready' AND always_run = 1 AND jobs.inst_coll = %s | ||
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND jobs.state = 'Ready' AND always_run = 1 AND jobs.inst_coll = %s | ||
GROUP BY jobs.job_id, jobs.spec, jobs.cores_mcpu | ||
HAVING live_attempts = 0 | ||
LIMIT %s; | ||
""", | ||
(batch['id'], self.name, remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], self.name, remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
record['userdata'] = batch['userdata'] | ||
record['user'] = batch['user'] | ||
record['format_version'] = batch['format_version'] | ||
record['batch_id'] = job_group['batch_id'] | ||
record['userdata'] = job_group['userdata'] | ||
record['user'] = job_group['user'] | ||
record['format_version'] = job_group['format_version'] | ||
yield record | ||
if not batch['cancelled']: | ||
if not job_group['cancelled']: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do this filtering in the database. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is correct. First we schedule all of the always run jobs. Then we schedule all jobs in job groups that haven't been cancelled. |
||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.batch_id, jobs.job_id, jobs.spec, jobs.cores_mcpu, regions_bits_rep, COALESCE(SUM(instances.state IS NOT NULL AND | ||
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts | ||
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts, job_group_id | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled) | ||
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id | ||
LEFT JOIN instances ON attempts.instance_name = instances.name | ||
WHERE jobs.batch_id = %s AND jobs.state = 'Ready' AND always_run = 0 AND jobs.inst_coll = %s AND cancelled = 0 | ||
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND jobs.state = 'Ready' AND always_run = 0 AND jobs.inst_coll = %s AND cancelled = 0 | ||
GROUP BY jobs.job_id, jobs.spec, jobs.cores_mcpu | ||
HAVING live_attempts = 0 | ||
LIMIT %s | ||
""", | ||
(batch['id'], self.name, remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], self.name, remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
record['userdata'] = batch['userdata'] | ||
record['user'] = batch['user'] | ||
record['format_version'] = batch['format_version'] | ||
record['batch_id'] = job_group['batch_id'] | ||
record['userdata'] = job_group['userdata'] | ||
record['user'] = job_group['user'] | ||
record['format_version'] = job_group['format_version'] | ||
yield record | ||
|
||
waitable_pool = WaitableSharedPool(self.async_worker_pool) | ||
|
@@ -420,6 +422,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | |
id = (batch_id, job_id) | ||
attempt_id = secret_alnum_string(6) | ||
record['attempt_id'] = attempt_id | ||
job_group_id = record['job_group_id'] | ||
|
||
if n_user_instances_created >= n_allocated_instances: | ||
if random.random() > self.exceeded_shares_counter.rate(): | ||
|
@@ -435,7 +438,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | |
log.info(f'creating job private instance for job {id}') | ||
|
||
async def create_instance_with_error_handling( | ||
batch_id: int, job_id: int, attempt_id: str, record: dict, id: Tuple[int, int] | ||
batch_id: int, job_id: int, attempt_id: str, job_group_id: int, record: dict, id: Tuple[int, int] | ||
): | ||
try: | ||
batch_format_version = BatchFormatVersion(record['format_version']) | ||
|
@@ -458,6 +461,7 @@ async def create_instance_with_error_handling( | |
await mark_job_errored( | ||
self.app, | ||
batch_id, | ||
job_group_id, | ||
job_id, | ||
attempt_id, | ||
record['user'], | ||
|
@@ -467,7 +471,9 @@ async def create_instance_with_error_handling( | |
except Exception: | ||
log.exception(f'while creating job private instance for job {id}', exc_info=True) | ||
|
||
await waitable_pool.call(create_instance_with_error_handling, batch_id, job_id, attempt_id, record, id) | ||
await waitable_pool.call( | ||
create_instance_with_error_handling, batch_id, job_id, attempt_id, job_group_id, record, id | ||
) | ||
|
||
remaining.value -= 1 | ||
if remaining.value <= 0: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this do a Python-side filter rather than a SQL-side filter like the next query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's two types of cancelled jobs. One is for cancelled job groups / batches in which all jobs that are not always run are cancelled. But then there's cancellation of individual jobs if their parent's failed. I'm guessing it was setup this way with two separate queries due to optimizing speed and being able to use the
remaining
share variable. I can see if there's a way to optimize it into 1 query if that's something you want to see happen.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this only applies to ready jobs -- not creating or running jobs.