-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[batch] Add Job Groups to Batch #14282
Changes from all commits
80b664a
0b50b03
6c9c776
284b457
d1fd11a
8ac5425
5ffa578
03cdaa5
904a045
3bdca11
e7fe638
e09c512
b40bff2
1e20595
ed95628
e6ed1f0
853d949
4c2b750
d7d3b53
1dc4ce9
b777802
295c339
166928c
322b01d
f1697c2
0d97818
0f2cc55
5fbd6e8
9b17076
239bd86
9889031
937d501
d32e968
328e7a6
26fe167
0b1b66b
9555687
8a468ba
8aa3bb8
f3b6e4c
c3b825f
36af4f8
7bb3f2b
0802a8e
d631d70
3597a89
1030f59
229d8b6
fc781a6
ef6163c
9fd31c3
4569b3d
4219370
9a9610f
1336950
f66f615
56f6c77
3ffdfae
823da60
df9ebcd
f76070d
ae1b484
51242a0
b138287
c63f961
490cff2
e67594f
e50ab12
2fdfcfc
aacfddb
16187ca
964dcfa
244354a
c4028c1
946ef12
1870039
de473d5
205c081
6514bae
233c70f
a9f3fc8
f1a69a6
009d490
75d7aa1
88a9bb5
0656872
f7a0a50
0b85932
d00cbdd
cfb1dd4
348eb6f
3117df5
c76fb7e
80ad0dd
9989692
dae8607
3e9cb7c
1d3a12b
263b695
336f362
ad3b7a7
da8bf1f
cc98b5e
cfe45f2
0a6ece6
61152a3
f4efc8c
4f33623
e5fdddd
3800da8
2e82052
bd386ca
479461c
0dd01bc
b158487
4d4c23d
aeb4d53
60a9c53
0339dd3
283b9a9
0440cc5
01dc316
ad9d912
a86a282
3e87e36
cd99dd0
d4cf3e7
87aa8b7
41332a0
035ac78
5f348f8
e2f1aeb
3f9751a
5bd320c
57ea2d9
4a62bb0
ed49d1c
7526471
ed4cacf
c37308c
909d29e
b620b31
92e9a6d
1f72766
1ff7e3f
0911cab
d5574c1
c82e0c4
1449b00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,39 +94,44 @@ async def cancel_cancelled_ready_jobs_loop_body(self): | |
} | ||
|
||
async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | ||
async for batch in self.db.select_and_fetchall( | ||
async for job_group in self.db.select_and_fetchall( | ||
""" | ||
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled | ||
FROM batches | ||
LEFT JOIN job_groups_cancelled | ||
ON batches.id = job_groups_cancelled.id | ||
SELECT job_groups.batch_id, job_groups.job_group_id, t.cancelled IS NOT NULL AS cancelled | ||
FROM job_groups | ||
LEFT JOIN LATERAL ( | ||
SELECT 1 AS cancelled | ||
FROM job_group_self_and_ancestors | ||
INNER JOIN job_groups_cancelled | ||
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND | ||
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id | ||
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND | ||
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id | ||
) AS t ON TRUE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In estimated-current.sql there is this comment: # inserting all cancelled job groups is not performant with many children job groups
INSERT INTO job_groups_cancelled (id, job_group_id)
VALUES (in_batch_id, in_job_group_id); But at the top of cancel_job_group and also here in this query we aggregate over all the ancestors. Suppose we had a depth of five: that will insert at most 32 records into job_groups_cancelled. That's a pretty small number rows. MySQL should be able to do that just fine. In the current system, it seems to me we need to aggregate over all of our ancestors anywhere we want to know if our job group is cancelled, right? For example, in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've got it backwards. Traversing the tree upwards is cheap. If we make the max depth 5, then that's 5 records. However, traversing down the tree is slow. Let's say we're trying to cancel the root job group and there's a massive tree with 100K direct children job groups and each of those has additional children. Then this INSERT statement to insert all of the records at once is going to be extremely slow. It's the same reason why we don't just update all of the jobs records at once when we cancel and have these checks on whether the job group has been cancelled. With regards to all of the checks about ancestors being cancelled, yes, I tried to put them in all the correct places. Please let me know if you disagree with this reasoning as the implication of it changes a lot of your other comments and code in other places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, I was thinking job groups were binary trees, but there's an arbitrary number of children. I think if someone creates 100,000 job groups, that's going to create other problems in our system. We make assumptions that the number of batches is relatively small. For example, in user_runnable_jobs, there's no limit on the number of job groups we fetch. The options seem to be:
Since we've decided to limit the depth but not limit the width, it seems to me the right choice is 2. We should consider options for avoiding the mass duplication of this lateral join code though. It'd be great to not having to re-understand it every time I see it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be able to do this with a SQL function. Then it would just be
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried replacing all of these lateral joins with a stored function and it mostly worked except there was some sort of deadlock in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, let's create an issue and link in a comment here. |
||
WHERE user = %s AND `state` = 'running'; | ||
""", | ||
(user,), | ||
): | ||
if batch['cancelled']: | ||
if job_group['cancelled']: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Continuing https://github.com/hail-is/hail/pull/14170/files#r1476847018 OK, let's not change anything in this PR so as to keep it as simple and direct as possible. This is my concrete critqiue: the two SQL queries are nearly the same, but they are long enough to SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0
LIMIT %s; SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
LIMIT %s; They only differ in the cancelled condition. As a reader, I'd prefer code that revealed that if job_group['cancelled']:
where_job_needs_cancelling = '' # every job in a cancelled group needs cancelling
else:
where_job_needs_cancelling = 'AND jobs.cancelled' # jobs.cancelled means child of a failed job
query_for_jobs_to_be_cancelled = f"""
SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s
AND job_group_id = %s
AND state = 'Ready'
AND NOT always_run
{where_job_needs_cancelling}
LIMIT %s;
"""
async for record in self.db.select_and_fetchall(
query_for_jobs_to_be_cancelled,
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
yield record There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I added this to the list of issues to create. |
||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.job_id | ||
SELECT jobs.batch_id, jobs.job_id, jobs.job_group_id | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled) | ||
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0 | ||
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 | ||
LIMIT %s; | ||
""", | ||
(batch['id'], remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm somewhat indifferent on this change. More data transmitted (potentially a lot more because jobs >> groups), but it's a bit more direct and clear as it is now written. However: why include it in this PR? Is there something changing about how batch_ids work that required this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I was trying to clean up everything to avoid more comments about unreadability and confusion with unneeded lines of code. I feel like I am trying to read your mind on minimizing code changes while at the same time a lot of comments are about cleaning up the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I think of it: a PR presents a set of changes. The author reads the PR to understand the proposed change. For that purpose, we want the PR as slim as possible because it aids understanding. The resulting conversation about the change may reveal related simplifications that the author and reviewer decide, as a pair, either to include in the current PR or to add to follow up PRs. In general, if a comment I make seems unrelated I'm 100% OK with it becoming a separate PR. If an author notices poor code while preparing a PR, those changes should be made in independent PRs or, if the PR depends on it, should be made first. For example, the sync tool PR (#14248) spawned a few PRs (#14139, #14162, #14176, #14181) that were independent but related PRs (some of which needed to merge before the sync tool could be reviewed). I recognize this process does take some time but ensures the reviewers can fully grok each change and incorporate it into their mental model of the system. |
||
yield record | ||
else: | ||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.job_id | ||
SELECT jobs.batch_id, jobs.job_id, jobs.job_group_id | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled) | ||
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1 | ||
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1 | ||
LIMIT %s; | ||
""", | ||
(batch['id'], remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here about the addition of batch_id. I'm ambivalent but not sure why its here: did something change that I'm missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See response to #14282 (comment) |
||
yield record | ||
|
||
waitable_pool = WaitableSharedPool(self.async_worker_pool) | ||
|
@@ -137,18 +142,30 @@ async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, | |
async for record in user_cancelled_ready_jobs(user, remaining): | ||
batch_id = record['batch_id'] | ||
job_id = record['job_id'] | ||
job_group_id = record['job_group_id'] | ||
id = (batch_id, job_id) | ||
log.info(f'cancelling job {id}') | ||
|
||
async def cancel_with_error_handling(app, batch_id, job_id, id): | ||
async def cancel_with_error_handling(app, batch_id, job_id, job_group_id, id): | ||
try: | ||
await mark_job_complete( | ||
app, batch_id, job_id, None, None, 'Cancelled', None, None, None, 'cancelled', [] | ||
app, | ||
batch_id, | ||
job_id, | ||
None, | ||
job_group_id, | ||
None, | ||
'Cancelled', | ||
None, | ||
None, | ||
None, | ||
'cancelled', | ||
[], | ||
) | ||
except Exception: | ||
log.info(f'error while cancelling job {id}', exc_info=True) | ||
|
||
await waitable_pool.call(cancel_with_error_handling, self.app, batch_id, job_id, id) | ||
await waitable_pool.call(cancel_with_error_handling, self.app, batch_id, job_id, job_group_id, id) | ||
|
||
remaining.value -= 1 | ||
if remaining.value <= 0: | ||
|
@@ -182,28 +199,34 @@ async def cancel_cancelled_creating_jobs_loop_body(self): | |
} | ||
|
||
async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | ||
async for batch in self.db.select_and_fetchall( | ||
async for job_group in self.db.select_and_fetchall( | ||
""" | ||
SELECT batches.id | ||
FROM batches | ||
INNER JOIN job_groups_cancelled | ||
ON batches.id = job_groups_cancelled.id | ||
SELECT job_groups.batch_id, job_groups.job_group_id | ||
FROM job_groups | ||
INNER JOIN LATERAL ( | ||
SELECT 1 AS cancelled | ||
FROM job_group_self_and_ancestors | ||
INNER JOIN job_groups_cancelled | ||
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND | ||
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id | ||
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND | ||
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id | ||
) AS t ON TRUE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here as above: we're walking up the group tree every time we run the cancel query. Seems better to update all those children once, and then benefit from it on all these periodic queries. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #14282 (comment) |
||
WHERE user = %s AND `state` = 'running'; | ||
""", | ||
(user,), | ||
): | ||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name | ||
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name, jobs.job_group_id | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled) | ||
STRAIGHT_JOIN attempts | ||
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id | ||
WHERE jobs.batch_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0 | ||
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0 | ||
LIMIT %s; | ||
""", | ||
(batch['id'], remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about batch id. Did something change that I'm missing or is this an unrelated cosmetic change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See response to #14282 (comment) |
||
yield record | ||
|
||
waitable_pool = WaitableSharedPool(self.async_worker_pool) | ||
|
@@ -215,17 +238,21 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st | |
batch_id = record['batch_id'] | ||
job_id = record['job_id'] | ||
attempt_id = record['attempt_id'] | ||
job_group_id = record['job_group_id'] | ||
instance_name = record['instance_name'] | ||
id = (batch_id, job_id) | ||
|
||
async def cancel_with_error_handling(app, batch_id, job_id, attempt_id, instance_name, id): | ||
async def cancel_with_error_handling( | ||
app, batch_id, job_id, attempt_id, job_group_id, instance_name, id | ||
): | ||
try: | ||
end_time = time_msecs() | ||
await mark_job_complete( | ||
app, | ||
batch_id, | ||
job_id, | ||
attempt_id, | ||
job_group_id, | ||
instance_name, | ||
'Cancelled', | ||
None, | ||
|
@@ -246,7 +273,7 @@ async def cancel_with_error_handling(app, batch_id, job_id, attempt_id, instance | |
log.info(f'cancelling creating job {id} on instance {instance_name}', exc_info=True) | ||
|
||
await waitable_pool.call( | ||
cancel_with_error_handling, self.app, batch_id, job_id, attempt_id, instance_name, id | ||
cancel_with_error_handling, self.app, batch_id, job_id, attempt_id, job_group_id, instance_name, id | ||
) | ||
|
||
remaining.value -= 1 | ||
|
@@ -279,28 +306,34 @@ async def cancel_cancelled_running_jobs_loop_body(self): | |
} | ||
|
||
async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: | ||
async for batch in self.db.select_and_fetchall( | ||
async for job_group in self.db.select_and_fetchall( | ||
""" | ||
SELECT batches.id | ||
FROM batches | ||
INNER JOIN job_groups_cancelled | ||
ON batches.id = job_groups_cancelled.id | ||
SELECT job_groups.batch_id, job_groups.job_group_id | ||
FROM job_groups | ||
INNER JOIN LATERAL ( | ||
SELECT 1 AS cancelled | ||
FROM job_group_self_and_ancestors | ||
INNER JOIN job_groups_cancelled | ||
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND | ||
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id | ||
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND | ||
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id | ||
) AS t ON TRUE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about cost of walking up the tree on every query. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #14282 (comment) |
||
WHERE user = %s AND `state` = 'running'; | ||
""", | ||
(user,), | ||
): | ||
async for record in self.db.select_and_fetchall( | ||
""" | ||
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name | ||
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name | ||
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled) | ||
STRAIGHT_JOIN attempts | ||
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id | ||
WHERE jobs.batch_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0 | ||
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0 | ||
LIMIT %s; | ||
""", | ||
(batch['id'], remaining.value), | ||
(job_group['batch_id'], job_group['job_group_id'], remaining.value), | ||
): | ||
record['batch_id'] = batch['id'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about the batch_id column. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See response to #14282 (comment) |
||
yield record | ||
|
||
waitable_pool = WaitableSharedPool(self.async_worker_pool) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is n_cancelled > 0 but cancelled is False?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens when there are jobs that have been cancelled due to failed dependencies (jobs.cancelled = 1) or the batch / job group was cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a job is cancelled because a parent failed wouldn't the first
if
branch fire? When isrecord['n_failed']
True
,record['cancelled']
isFalse
, butrecord['n_cancelled'] > 0
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that is the intention. This code is the same as the Batch response, which has been there forever. I think this guards against child dependencies being cancelled causing the state to be cancelled while allowing for cancelled empty batches and batches that have been cancelled with jobs.