Skip to content

Commit

Permalink
[batch] Use job group id in front end and driver queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jigold committed Nov 30, 2023
1 parent 904a045 commit b16b283
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 269 deletions.
79 changes: 42 additions & 37 deletions batch/batch/driver/canceller.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,39 +94,40 @@ async def cancel_cancelled_ready_jobs_loop_body(self):
}

async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
'''
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM job_groups
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
''',
(user,),
):
if batch['cancelled']:
async for record in self.db.select_and_fetchall(
if job_group['cancelled']:
async for record in self.db.select_and_fetchall( # FIXME: Do we need a new index again?
'''
SELECT jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0
LIMIT %s;
''',
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
record['batch_id'] = job_group['batch_id']
yield record
else:
async for record in self.db.select_and_fetchall(
async for record in self.db.select_and_fetchall( # FIXME: Do we need a new index again?
'''
SELECT jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
LIMIT %s;
''',
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
record['batch_id'] = job_group['batch_id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand Down Expand Up @@ -182,29 +183,31 @@ async def cancel_cancelled_creating_jobs_loop_body(self):
}

async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
'''
SELECT batches.id
FROM batches
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM job_groups
LEFT JOIN job_groups_cancelled
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
''',
(user,),
):
async for record in self.db.select_and_fetchall(
'''
if job_group['cancelled']:
async for record in self.db.select_and_fetchall(
'''
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
LIMIT %s;
''',
(batch['id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = job_group['batch_id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)

Expand Down Expand Up @@ -279,29 +282,31 @@ async def cancel_cancelled_running_jobs_loop_body(self):
}

async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
'''
SELECT batches.id
FROM batches
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM job_groups
LEFT JOIN job_groups_cancelled
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
''',
(user,),
):
async for record in self.db.select_and_fetchall(
'''
if job_group['cancelled']:
async for record in self.db.select_and_fetchall(
'''
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
LIMIT %s;
''',
(batch['id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = job_group['batch_id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)

Expand Down
56 changes: 31 additions & 25 deletions batch/batch/driver/instance_collection/job_private.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ async def schedule_jobs_loop_body(self):
async for record in self.db.select_and_fetchall(
'''
SELECT jobs.*, batches.format_version, batches.userdata, batches.user, attempts.instance_name, time_ready
FROM batches
INNER JOIN jobs ON batches.id = jobs.batch_id
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
INNER JOIN jobs ON job_groups.batch_id = jobs.batch_id AND job_groups.job_group_id = jobs.job_group_id
LEFT JOIN jobs_telemetry ON jobs.batch_id = jobs_telemetry.batch_id AND jobs.job_id = jobs_telemetry.job_id
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id
LEFT JOIN instances ON attempts.instance_name = instances.name
WHERE batches.state = 'running'
WHERE job_groups.state = 'running'
AND jobs.state = 'Creating'
AND (jobs.always_run OR NOT jobs.cancelled)
AND jobs.inst_coll = %s
Expand Down Expand Up @@ -349,54 +350,55 @@ async def create_instances_loop_body(self):
}

async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
'''
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, user, format_version
FROM batches
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled, userdata, job_groups.user, format_version
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
WHERE user = %s AND `state` = 'running';
ON job_groups.batch_id = job_groups_cancelled.id AND job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE job_groups.user = %s AND job_groups.`state` = 'running';
''',
(user,),
):
async for record in self.db.select_and_fetchall(
'''
SELECT jobs.batch_id, jobs.job_id, jobs.spec, jobs.cores_mcpu, regions_bits_rep, COALESCE(SUM(instances.state IS NOT NULL AND
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts, jobs.job_group_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_inst_coll_cancelled)
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id
LEFT JOIN instances ON attempts.instance_name = instances.name
WHERE jobs.batch_id = %s AND jobs.state = 'Ready' AND always_run = 1 AND jobs.inst_coll = %s
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND jobs.state = 'Ready' AND always_run = 1 AND jobs.inst_coll = %s
GROUP BY jobs.job_id, jobs.spec, jobs.cores_mcpu
HAVING live_attempts = 0
LIMIT %s;
''',
(batch['id'], self.name, remaining.value),
(job_group['batch_id'], job_group['job_group_id'], self.name, remaining.value),
):
record['batch_id'] = batch['id']
record['userdata'] = batch['userdata']
record['user'] = batch['user']
record['format_version'] = batch['format_version']
record['batch_id'] = job_group['batch_id']
record['userdata'] = job_group['userdata']
record['user'] = job_group['user']
record['format_version'] = job_group['format_version']
yield record
if not batch['cancelled']:
if not job_group['cancelled']:
async for record in self.db.select_and_fetchall(
'''
SELECT jobs.batch_id, jobs.job_id, jobs.spec, jobs.cores_mcpu, regions_bits_rep, COALESCE(SUM(instances.state IS NOT NULL AND
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts
(instances.state = 'pending' OR instances.state = 'active')), 0) as live_attempts, job_group_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
LEFT JOIN attempts ON jobs.batch_id = attempts.batch_id AND jobs.job_id = attempts.job_id
LEFT JOIN instances ON attempts.instance_name = instances.name
WHERE jobs.batch_id = %s AND jobs.state = 'Ready' AND always_run = 0 AND jobs.inst_coll = %s AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND jobs.state = 'Ready' AND always_run = 0 AND jobs.inst_coll = %s AND cancelled = 0
GROUP BY jobs.job_id, jobs.spec, jobs.cores_mcpu
HAVING live_attempts = 0
LIMIT %s
''',
(batch['id'], self.name, remaining.value),
(job_group['batch_id'], job_group['job_group_id'], self.name, remaining.value),
):
record['batch_id'] = batch['id']
record['userdata'] = batch['userdata']
record['user'] = batch['user']
record['format_version'] = batch['format_version']
record['batch_id'] = job_group['batch_id']
record['userdata'] = job_group['userdata']
record['user'] = job_group['user']
record['format_version'] = job_group['format_version']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand All @@ -420,6 +422,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
id = (batch_id, job_id)
attempt_id = secret_alnum_string(6)
record['attempt_id'] = attempt_id
job_group_id = record['job_group_id']

if n_user_instances_created >= n_allocated_instances:
if random.random() > self.exceeded_shares_counter.rate():
Expand All @@ -435,7 +438,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
log.info(f'creating job private instance for job {id}')

async def create_instance_with_error_handling(
batch_id: int, job_id: int, attempt_id: str, record: dict, id: Tuple[int, int]
batch_id: int, job_id: int, attempt_id: str, job_group_id: int, record: dict, id: Tuple[int, int]
):
try:
batch_format_version = BatchFormatVersion(record['format_version'])
Expand All @@ -460,14 +463,17 @@ async def create_instance_with_error_handling(
batch_id,
job_id,
attempt_id,
job_group_id,
record['user'],
record['format_version'],
traceback.format_exc(),
)
except Exception:
log.exception(f'while creating job private instance for job {id}', exc_info=True)

await waitable_pool.call(create_instance_with_error_handling, batch_id, job_id, attempt_id, record, id)
await waitable_pool.call(
create_instance_with_error_handling, batch_id, job_id, attempt_id, job_group_id, record, id
)

remaining.value -= 1
if remaining.value <= 0:
Expand Down
Loading

0 comments on commit b16b283

Please sign in to comment.