Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] Add Job Groups to Batch #14282

Merged
merged 149 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
149 commits
Select commit Hold shift + click to select a range
80b664a
[batch] Finalize job groups in database
jigold Oct 16, 2023
0b50b03
fix
jigold Nov 9, 2023
6c9c776
fix for ambig column
jigold Nov 13, 2023
284b457
fix foreign key constraint
jigold Nov 13, 2023
d1fd11a
dont lock primary key updates
jigold Nov 13, 2023
8ac5425
fix cancel job group
jigold Nov 13, 2023
5ffa578
last fix?
jigold Nov 13, 2023
03cdaa5
get rid of extra complexity
jigold Nov 30, 2023
904a045
fixup estimated-current.sql
jigold Nov 30, 2023
3bdca11
fix cancel child job groups
jigold Nov 30, 2023
e7fe638
add new index
jigold Dec 1, 2023
e09c512
add back batch updates fields
jigold Jan 12, 2024
b40bff2
[batch] Use job group id in front end and driver queries
jigold Oct 18, 2023
1e20595
address comments
jigold Dec 1, 2023
ed95628
get rid of exposing job group id to worker
jigold Dec 1, 2023
e6ed1f0
address comments
jigold Dec 1, 2023
853d949
delint
jigold Dec 1, 2023
4c2b750
Merge commit '24525adb9c09a73a1ae820c9945acd35299878ed' into thread-j…
jigold Jan 12, 2024
d7d3b53
Merge commit 'fa2ef0f2c76654d0c037ff6db60ccb8842fb8539' into thread-j…
jigold Jan 12, 2024
1dc4ce9
partial ruff apply
jigold Jan 12, 2024
b777802
partial ruff apply
jigold Jan 12, 2024
295c339
[batch] Add job group in client and capability to list and get job gr…
jigold Oct 16, 2023
166928c
wip
jigold Nov 30, 2023
322b01d
fix
jigold Nov 30, 2023
f1697c2
delint
jigold Jan 9, 2024
0d97818
delint
jigold Jan 12, 2024
0f2cc55
[batch] Add ability to create job groups at top level only
jigold Jan 16, 2024
5fbd6e8
minor fixes
jigold Jan 18, 2024
9b17076
minor fixes
jigold Jan 18, 2024
239bd86
bad rebase fix
jigold Feb 1, 2024
9889031
fixing bad rebase
jigold Feb 1, 2024
937d501
finish fixing rebase
jigold Feb 1, 2024
d32e968
addressed most of front end comments
jigold Feb 1, 2024
328e7a6
refactored bunching
jigold Feb 1, 2024
26fe167
add update id default to 1
jigold Feb 2, 2024
0b1b66b
more front end changes
jigold Feb 2, 2024
9555687
more changes
jigold Feb 2, 2024
8a468ba
addressing more comments
jigold Feb 5, 2024
8aa3bb8
lots of comments addressed
jigold Feb 5, 2024
f3b6e4c
add ability to create jobs
jigold Feb 5, 2024
c3b825f
fix tests
jigold Feb 5, 2024
36af4f8
more fixes
jigold Feb 6, 2024
7bb3f2b
ruff check
jigold Feb 6, 2024
0802a8e
ruff format
jigold Feb 6, 2024
d631d70
delint client
jigold Feb 6, 2024
3597a89
final delint
jigold Feb 6, 2024
1030f59
fix index and various bugs
jigold Feb 6, 2024
229d8b6
fix database error in commit_batch_update
jigold Feb 6, 2024
fc781a6
attempt to fix mjc
jigold Feb 6, 2024
ef6163c
fix ambig field
jigold Feb 6, 2024
9fd31c3
wip
jigold Feb 7, 2024
4569b3d
cleanup db code
jigold Feb 7, 2024
4219370
fix mjc missing var
jigold Feb 7, 2024
9a9610f
turn off updating attempts to try and debug
jigold Feb 7, 2024
1336950
process of elimination
jigold Feb 7, 2024
f66f615
actually have new triggers in database
jigold Feb 7, 2024
56f6c77
fix build.yaml
jigold Feb 7, 2024
3ffdfae
modify commit_batch_update
jigold Feb 7, 2024
823da60
recursive job group state n_jobs and no migration transaction
jigold Feb 8, 2024
df9ebcd
fix cancel_job_group
jigold Feb 8, 2024
f76070d
more fixes
jigold Feb 8, 2024
ae1b484
fix python front end icr
jigold Feb 8, 2024
51242a0
fix bad global var collision
jigold Feb 8, 2024
b138287
fix cancel
jigold Feb 8, 2024
c63f961
fix cancel
jigold Feb 8, 2024
490cff2
turn off unschedule job in canceller
jigold Feb 9, 2024
e67594f
get rid of committed check
jigold Feb 9, 2024
e50ab12
dont unschedule jobs in canceller
jigold Feb 9, 2024
2fdfcfc
recursive populate jg_inst_coll_cancellable_resources
jigold Feb 9, 2024
aacfddb
test_job_group_cancel_after_n_failures_does_not_cancel_higher_up_jobs…
jigold Feb 9, 2024
16187ca
fix test
jigold Feb 9, 2024
964dcfa
in sync sql
jigold Feb 9, 2024
244354a
delint
jigold Feb 9, 2024
c4028c1
get state right in commit
jigold Feb 9, 2024
946ef12
get rid of unsed columns in staging table
jigold Feb 9, 2024
1870039
more fixes
jigold Feb 9, 2024
de473d5
add nested job groups
jigold Feb 9, 2024
205c081
add back job group id col in staging
jigold Feb 9, 2024
6514bae
rework cancellation
jigold Feb 11, 2024
233c70f
fix merge conflict
jigold Feb 11, 2024
a9f3fc8
get rid of debug message
jigold Feb 11, 2024
f1a69a6
fixes
jigold Feb 11, 2024
009d490
lock selects before inserts
jigold Feb 12, 2024
75d7aa1
actually lock table
jigold Feb 12, 2024
88a9bb5
fix cancel check
jigold Feb 12, 2024
0656872
fix sql
jigold Feb 12, 2024
f7a0a50
fix resource aggregation test to be recursive
jigold Feb 12, 2024
0b85932
fix test
jigold Feb 12, 2024
d00cbdd
delint
jigold Feb 12, 2024
cfb1dd4
more debugging
jigold Feb 12, 2024
348eb6f
delint
jigold Feb 12, 2024
3117df5
fix sql query in aggregation test
jigold Feb 12, 2024
c76fb7e
cleanup
jigold Feb 12, 2024
80ad0dd
add more for updates
jigold Feb 12, 2024
9989692
get rid of locks and fix trigger
jigold Feb 12, 2024
dae8607
address comments
jigold Feb 13, 2024
3e9cb7c
delint
jigold Feb 13, 2024
1d3a12b
lock everything
jigold Feb 13, 2024
263b695
fix syntax with for update
jigold Feb 13, 2024
336f362
fix test and start to fix callback test
jigold Feb 13, 2024
ad3b7a7
fix
jigold Feb 13, 2024
da8bf1f
traceback was not helpful
jigold Feb 13, 2024
cc98b5e
Revert "traceback was not helpful"
jigold Feb 13, 2024
cfe45f2
traceback
jigold Feb 13, 2024
0a6ece6
actually fix the test
jigold Feb 13, 2024
61152a3
replace ugly check for cancelled with function
jigold Feb 13, 2024
f4efc8c
format traceback
jigold Feb 13, 2024
4f33623
fix db error
jigold Feb 13, 2024
e5fdddd
Revert "fix db error"
jigold Feb 13, 2024
3800da8
Revert "replace ugly check for cancelled with function"
jigold Feb 13, 2024
2e82052
callback for job groups
jigold Feb 13, 2024
bd386ca
add batch callback back
jigold Feb 13, 2024
479461c
fix test
jigold Feb 13, 2024
0dd01bc
add new tests
jigold Feb 13, 2024
b158487
delint
jigold Feb 13, 2024
4d4c23d
missing AND
jigold Feb 13, 2024
aeb4d53
fix the test
jigold Feb 13, 2024
60a9c53
delint
jigold Feb 13, 2024
0339dd3
fix join on ancestor id
jigold Feb 14, 2024
283b9a9
add missing auth endpoint tests
jigold Feb 14, 2024
0440cc5
add tests for uncommitted update
jigold Feb 14, 2024
01dc316
attempt to fix callback query
jigold Feb 14, 2024
ad9d912
lower maximum depth to 2
jigold Feb 14, 2024
a86a282
fix test
jigold Feb 14, 2024
3e87e36
fix type
jigold Feb 14, 2024
cd99dd0
delint
jigold Feb 14, 2024
d4cf3e7
attempt to fix callback
jigold Feb 14, 2024
87aa8b7
revert back to max depth of 5
jigold Feb 14, 2024
41332a0
fix tests
jigold Feb 14, 2024
035ac78
maybe fix all tests
jigold Feb 14, 2024
5f348f8
delint
jigold Feb 14, 2024
e2f1aeb
last fixes
jigold Feb 14, 2024
3f9751a
fix missing field in group by
jigold Feb 14, 2024
5bd320c
address comments
jigold Feb 15, 2024
57ea2d9
address more comments
jigold Feb 15, 2024
4a62bb0
fix join query
jigold Feb 15, 2024
ed49d1c
fix query
jigold Feb 15, 2024
7526471
fix rebase conflicts
jigold Feb 15, 2024
ed4cacf
dont update nonexistant v2 tables
jigold Feb 15, 2024
c37308c
fix tests for new max depth
jigold Feb 15, 2024
909d29e
fix tests for new max depth
jigold Feb 15, 2024
b620b31
move constants to hailtop
jigold Feb 22, 2024
92e9a6d
address some comments
jigold Feb 22, 2024
1f72766
rest of changes
jigold Feb 22, 2024
1ff7e3f
get rid of fixmes
jigold Feb 22, 2024
0911cab
test fixes and delint
jigold Feb 22, 2024
d5574c1
fix test
jigold Feb 22, 2024
c82e0c4
fix merge conflict
jigold Feb 23, 2024
1449b00
get rid of transaction in migration
jigold Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 90 additions & 38 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

from gear import transaction
from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha
from hailtop.batch_client.globals import ROOT_JOB_GROUP_ID
from hailtop.batch_client.types import CostBreakdownEntry, GetJobGroupResponseV1Alpha, JobListEntryV1Alpha
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
from .exceptions import NonExistentBatchError, OpenBatchError
from .exceptions import NonExistentJobGroupError
from .utils import coalesce

log = logging.getLogger('batch')


def _maybe_time_msecs_str(t: Optional[int]) -> Optional[str]:
if t is not None:
return time_msecs_str(t)
return None


def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]:
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]

Expand All @@ -30,14 +37,9 @@ def batch_record_to_dict(record: Dict[str, Any]) -> Dict[str, Any]:
else:
state = 'running'

def _time_msecs_str(t):
if t:
return time_msecs_str(t)
return None

time_created = _time_msecs_str(record['time_created'])
time_closed = _time_msecs_str(record['time_closed'])
time_completed = _time_msecs_str(record['time_completed'])
time_created = _maybe_time_msecs_str(record['time_created'])
time_closed = _maybe_time_msecs_str(record['time_closed'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
Expand All @@ -49,7 +51,7 @@ def _time_msecs_str(t):
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

d = {
batch_response = {
'id': record['id'],
'user': record['user'],
'billing_project': record['billing_project'],
Expand All @@ -74,9 +76,55 @@ def _time_msecs_str(t):

attributes = json.loads(record['attributes'])
if attributes:
d['attributes'] = attributes
batch_response['attributes'] = attributes

return batch_response


def job_group_record_to_dict(record: Dict[str, Any]) -> GetJobGroupResponseV1Alpha:
if record['n_failed'] > 0:
state = 'failure'
elif record['cancelled'] or record['n_cancelled'] > 0:
state = 'cancelled'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is n_cancelled > 0 but cancelled is False?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens when there are jobs that have been cancelled due to failed dependencies (jobs.cancelled = 1) or the batch / job group was cancelled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a job is cancelled because a parent failed wouldn't the first if branch fire? When is record['n_failed'] True, record['cancelled'] is False, but record['n_cancelled'] > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is the intention. This code is the same as the Batch response, which has been there forever. I think this guards against child dependencies being cancelled causing the state to be cancelled while allowing for cancelled empty batches and batches that have been cancelled with jobs.

elif record['state'] == 'complete':
assert record['n_succeeded'] == record['n_jobs']
state = 'success'
else:
state = 'running'

return d
time_created = _maybe_time_msecs_str(record['time_created'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
else:
duration_ms = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

job_group_response = {
'batch_id': record['batch_id'],
'job_group_id': record['job_group_id'],
'state': state,
'complete': record['state'] == 'complete',
'n_jobs': record['n_jobs'],
'n_completed': record['n_completed'],
'n_succeeded': record['n_succeeded'],
'n_failed': record['n_failed'],
'n_cancelled': record['n_cancelled'],
'time_created': time_created,
'time_completed': time_completed,
'duration': duration_ms,
'cost': coalesce(record['cost'], 0),
'cost_breakdown': record['cost_breakdown'],
}

attributes = json.loads(record['attributes'])
if attributes:
job_group_response['attributes'] = attributes

return cast(GetJobGroupResponseV1Alpha, job_group_response)


def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha:
Expand All @@ -93,38 +141,42 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEn
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

return {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record['cost'], 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': record['cost_breakdown'],
}


async def cancel_batch_in_db(db, batch_id):
return cast(
JobListEntryV1Alpha,
{
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record['cost'], 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': record['cost_breakdown'],
},
)


async def cancel_job_group_in_db(db, batch_id, job_group_id):
@transaction(db)
async def cancel(tx):
record = await tx.execute_and_fetchone(
"""
SELECT `state` FROM batches
WHERE id = %s AND NOT deleted
SELECT 1
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN batch_updates ON job_groups.batch_id = batch_updates.batch_id AND
job_groups.update_id = batch_updates.update_id
WHERE job_groups.batch_id = %s AND job_groups.job_group_id = %s AND NOT deleted AND (batch_updates.committed OR job_groups.job_group_id = %s)
FOR UPDATE;
""",
(batch_id,),
(batch_id, job_group_id, ROOT_JOB_GROUP_ID),
)
if not record:
raise NonExistentBatchError(batch_id)

if record['state'] == 'open':
raise OpenBatchError(batch_id)
raise NonExistentJobGroupError(batch_id, job_group_id)

await tx.just_execute('CALL cancel_batch(%s);', (batch_id,))
await tx.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, job_group_id))

await cancel()
1 change: 0 additions & 1 deletion batch/batch/constants.py

This file was deleted.

107 changes: 70 additions & 37 deletions batch/batch/driver/canceller.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,39 +94,44 @@ async def cancel_cancelled_ready_jobs_loop_body(self):
}

async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
SELECT job_groups.batch_id, job_groups.job_group_id, t.cancelled IS NOT NULL AS cancelled
FROM job_groups
LEFT JOIN LATERAL (
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
) AS t ON TRUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In estimated-current.sql there is this comment:

    # inserting all cancelled job groups is not performant with many children job groups
    INSERT INTO job_groups_cancelled (id, job_group_id)
    VALUES (in_batch_id, in_job_group_id);

But at the top of cancel_job_group and also here in this query we aggregate over all the ancestors.

Suppose we had a depth of five: that will insert at most 32 records into job_groups_cancelled. That's a pretty small number rows. MySQL should be able to do that just fine.

In the current system, it seems to me we need to aggregate over all of our ancestors anywhere we want to know if our job group is cancelled, right? For example, in schedule_job, we really ought to check if not just our direct job group is cancelled but any transitive ancestor is cancelled. It doesn't seem to make sense to me to prevent scheduling a job if its direct group is cancelled but not prevent it when some transitive ancestor group is cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've got it backwards. Traversing the tree upwards is cheap. If we make the max depth 5, then that's 5 records. However, traversing down the tree is slow. Let's say we're trying to cancel the root job group and there's a massive tree with 100K direct children job groups and each of those has additional children. Then this INSERT statement to insert all of the records at once is going to be extremely slow. It's the same reason why we don't just update all of the jobs records at once when we cancel and have these checks on whether the job group has been cancelled.

With regards to all of the checks about ancestors being cancelled, yes, I tried to put them in all the correct places.

Please let me know if you disagree with this reasoning as the implication of it changes a lot of your other comments and code in other places.

Copy link
Contributor

@danking danking Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I was thinking job groups were binary trees, but there's an arbitrary number of children.

I think if someone creates 100,000 job groups, that's going to create other problems in our system. We make assumptions that the number of batches is relatively small. For example, in user_runnable_jobs, there's no limit on the number of job groups we fetch.

The options seem to be:

  1. Set all the children eagerly. This makes cancellation $O(N_{DESCENDANT GROUPS})$ (I suspect this is fine at least up to 10,000 children. I just inserted 10,000 records into a table and it took 0.14s).
  2. Never set the children. This makes many parts of our code base $O(GROUP_{DEPTH})$ which is currently fixed to five. SQL code is duplicated in many places.
  3. Set the children iteratively like jobs, but eagerly remove resources. This means a group will appear not cancelled even though the scheduler and autoscaler are not considering its resources. Seems confusing.
  4. Set the children iteratively like jobs and do not eagerly remove resources. This means when a group has many descendant groups the descendant groups will run for some time even though the user intends them to be cancelled.

Since we've decided to limit the depth but not limit the width, it seems to me the right choice is 2. We should consider options for avoiding the mass duplication of this lateral join code though. It'd be great to not having to re-understand it every time I see it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to do this with a SQL function. Then it would just be is_cancelled(job_groups.batch_id, job_groups.job_group_id) AS cancelled.

SELECT batches.*,
  is_cancelled(job_groups.batch_id, job_groups.job_group_id) AS cancelled,
  job_groups_n_jobs_in_complete_states.n_completed,
  job_groups_n_jobs_in_complete_states.n_succeeded,
  job_groups_n_jobs_in_complete_states.n_failed,
  job_groups_n_jobs_in_complete_states.n_cancelled,
  cost_t.*
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN job_groups_n_jobs_in_complete_states
       ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id
LEFT JOIN LATERAL (
  SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
  FROM (
    SELECT resource_id, CAST(COALESCE(SUM(`usage`), 0) AS SIGNED) AS `usage`
    FROM aggregated_job_group_resources_v3
    WHERE job_groups.batch_id = aggregated_job_group_resources_v3.batch_id AND job_groups.job_group_id = aggregated_job_group_resources_v3.job_group_id
    GROUP BY resource_id
  ) AS usage_t
  LEFT JOIN resources ON usage_t.resource_id = resources.resource_id
) AS cost_t ON TRUE
WHERE job_groups.batch_id = %s AND job_groups.job_group_id = %s AND NOT deleted;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried replacing all of these lateral joins with a stored function and it mostly worked except there was some sort of deadlock in the cancel_job_group stored procedure. I think we should make an issue to fix this later on and replace the use of the lateral joins with an is_job_group_cancelled function as a separate project. At least we know the lateral joins should perform well and I wasn't 100% convinced the functions had the same performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, let's create an issue and link in a comment here.

WHERE user = %s AND `state` = 'running';
""",
(user,),
):
if batch['cancelled']:
if job_group['cancelled']:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing https://github.com/hail-is/hail/pull/14170/files#r1476847018

OK, let's not change anything in this PR so as to keep it as simple and direct as possible.


This is my concrete critqiue: the two SQL queries are nearly the same, but they are long enough to
make seeing that challenging.

SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0
LIMIT %s;
SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
LIMIT %s;

They only differ in the cancelled condition. As a reader, I'd prefer code that revealed that
similarity and used names to suggest what the difference was doing, and maybe a comment, if there is
no better way to say it, indicaing what you indicated in your GitHub comment.

if job_group['cancelled']:
    where_job_needs_cancelling = ''  # every job in a cancelled group needs cancelling
else:
    where_job_needs_cancelling = 'AND jobs.cancelled'  # jobs.cancelled means child of a failed job
query_for_jobs_to_be_cancelled = f"""
SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s
  AND job_group_id = %s
  AND state = 'Ready'
  AND NOT always_run
  {where_job_needs_cancelling}
LIMIT %s;
"""
async for record in self.db.select_and_fetchall(
    query_for_jobs_to_be_cancelled,
    (job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
    yield record

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I added this to the list of issues to create.

async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id
SELECT jobs.batch_id, jobs.job_id, jobs.job_group_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat indifferent on this change. More data transmitted (potentially a lot more because jobs >> groups), but it's a bit more direct and clear as it is now written. However: why include it in this PR? Is there something changing about how batch_ids work that required this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I was trying to clean up everything to avoid more comments about unreadability and confusion with unneeded lines of code. I feel like I am trying to read your mind on minimizing code changes while at the same time a lot of comments are about cleaning up the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I think of it: a PR presents a set of changes. The author reads the PR to understand the proposed change. For that purpose, we want the PR as slim as possible because it aids understanding. The resulting conversation about the change may reveal related simplifications that the author and reviewer decide, as a pair, either to include in the current PR or to add to follow up PRs. In general, if a comment I make seems unrelated I'm 100% OK with it becoming a separate PR.

If an author notices poor code while preparing a PR, those changes should be made in independent PRs or, if the PR depends on it, should be made first.

For example, the sync tool PR (#14248) spawned a few PRs (#14139, #14162, #14176, #14181) that were independent but related PRs (some of which needed to merge before the sync tool could be reviewed). I recognize this process does take some time but ensures the reviewers can fully grok each change and incorporate it into their mental model of the system.

yield record
else:
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id
SELECT jobs.batch_id, jobs.job_id, jobs.job_group_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here about the addition of batch_id. I'm ambivalent but not sure why its here: did something change that I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response to #14282 (comment)

yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand All @@ -137,18 +142,30 @@ async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str,
async for record in user_cancelled_ready_jobs(user, remaining):
batch_id = record['batch_id']
job_id = record['job_id']
job_group_id = record['job_group_id']
id = (batch_id, job_id)
log.info(f'cancelling job {id}')

async def cancel_with_error_handling(app, batch_id, job_id, id):
async def cancel_with_error_handling(app, batch_id, job_id, job_group_id, id):
try:
await mark_job_complete(
app, batch_id, job_id, None, None, 'Cancelled', None, None, None, 'cancelled', []
app,
batch_id,
job_id,
None,
job_group_id,
None,
'Cancelled',
None,
None,
None,
'cancelled',
[],
)
except Exception:
log.info(f'error while cancelling job {id}', exc_info=True)

await waitable_pool.call(cancel_with_error_handling, self.app, batch_id, job_id, id)
await waitable_pool.call(cancel_with_error_handling, self.app, batch_id, job_id, job_group_id, id)

remaining.value -= 1
if remaining.value <= 0:
Expand Down Expand Up @@ -182,28 +199,34 @@ async def cancel_cancelled_creating_jobs_loop_body(self):
}

async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id
FROM batches
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
SELECT job_groups.batch_id, job_groups.job_group_id
FROM job_groups
INNER JOIN LATERAL (
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
) AS t ON TRUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here as above: we're walking up the group tree every time we run the cancel query. Seems better to update all those children once, and then benefit from it on all these periodic queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHERE user = %s AND `state` = 'running';
""",
(user,),
):
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name, jobs.job_group_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about batch id. Did something change that I'm missing or is this an unrelated cosmetic change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response to #14282 (comment)

yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand All @@ -215,17 +238,21 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st
batch_id = record['batch_id']
job_id = record['job_id']
attempt_id = record['attempt_id']
job_group_id = record['job_group_id']
instance_name = record['instance_name']
id = (batch_id, job_id)

async def cancel_with_error_handling(app, batch_id, job_id, attempt_id, instance_name, id):
async def cancel_with_error_handling(
app, batch_id, job_id, attempt_id, job_group_id, instance_name, id
):
try:
end_time = time_msecs()
await mark_job_complete(
app,
batch_id,
job_id,
attempt_id,
job_group_id,
instance_name,
'Cancelled',
None,
Expand All @@ -246,7 +273,7 @@ async def cancel_with_error_handling(app, batch_id, job_id, attempt_id, instance
log.info(f'cancelling creating job {id} on instance {instance_name}', exc_info=True)

await waitable_pool.call(
cancel_with_error_handling, self.app, batch_id, job_id, attempt_id, instance_name, id
cancel_with_error_handling, self.app, batch_id, job_id, attempt_id, job_group_id, instance_name, id
)

remaining.value -= 1
Expand Down Expand Up @@ -279,28 +306,34 @@ async def cancel_cancelled_running_jobs_loop_body(self):
}

async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id
FROM batches
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
SELECT job_groups.batch_id, job_groups.job_group_id
FROM job_groups
INNER JOIN LATERAL (
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
) AS t ON TRUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about cost of walking up the tree on every query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WHERE user = %s AND `state` = 'running';
""",
(user,),
):
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the batch_id column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See response to #14282 (comment)

yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand Down
Loading