diff --git a/batch/batch/driver/canceller.py b/batch/batch/driver/canceller.py index d438a8519bb..b02d6f24a2f 100644 --- a/batch/batch/driver/canceller.py +++ b/batch/batch/driver/canceller.py @@ -99,7 +99,7 @@ async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -204,7 +204,7 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -311,7 +311,7 @@ async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/instance_collection/job_private.py b/batch/batch/driver/instance_collection/job_private.py index c4907ec590b..414a992739c 100644 --- a/batch/batch/driver/instance_collection/job_private.py +++ b/batch/batch/driver/instance_collection/job_private.py @@ -360,7 +360,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/instance_collection/pool.py b/batch/batch/driver/instance_collection/pool.py index 39ff5046b60..24d02a53af5 100644 --- a/batch/batch/driver/instance_collection/pool.py +++ b/batch/batch/driver/instance_collection/pool.py @@ -344,7 +344,7 @@ async def regions_to_ready_cores_mcpu_from_estimated_job_queue(self) -> List[Tup SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE jobs.batch_id = job_group_self_and_ancestors.batch_id AND jobs.job_group_id = job_group_self_and_ancestors.job_group_id @@ -622,7 +622,7 @@ async def user_runnable_jobs(user): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/job.py b/batch/batch/driver/job.py index c6d129eef18..84237ca073f 100644 --- a/batch/batch/driver/job.py +++ b/batch/batch/driver/job.py @@ -36,7 +36,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe SELECT batches.*, cost_t.cost, cost_t.cost_breakdown, - job_groups_cancelled.id IS NOT NULL AS cancelled, + job_groups_cancelled.batch_id IS NOT NULL AS cancelled, job_groups_n_jobs_in_complete_states.n_completed, job_groups_n_jobs_in_complete_states.n_succeeded, job_groups_n_jobs_in_complete_states.n_failed, @@ -56,7 +56,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe GROUP BY batch_id ) AS cost_t ON TRUE LEFT JOIN job_groups_cancelled - ON batches.id = job_groups_cancelled.id + ON batches.id = job_groups_cancelled.batch_id WHERE batches.id = %s AND NOT deleted AND callback IS NOT NULL AND batches.`state` = 'complete'; """, @@ -123,7 +123,7 @@ async def notify_job_group_on_job_complete( SELECT 1 AS cancelled FROM job_group_self_and_ancestors AS self_and_ancestors INNER JOIN job_groups_cancelled - ON self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE self_and_ancestors.batch_id = job_group_self_and_ancestors.batch_id AND self_and_ancestors.job_group_id = job_group_self_and_ancestors.ancestor_id diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index ac6f99d756e..9e5d878fab4 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1034,7 +1034,7 @@ async def check(tx): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -1312,7 +1312,7 @@ async def cancel_fast_failing_job_groups(app): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -1471,7 +1471,7 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data 1 FROM job_group_self_and_ancestors AS descendant INNER JOIN job_groups_cancelled AS cancelled - ON descendant.batch_id = cancelled.id + ON descendant.batch_id = cancelled.batch_id AND descendant.ancestor_id = cancelled.job_group_id WHERE descendant.batch_id = group_resources.batch_id AND descendant.job_group_id = group_resources.job_group_id diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 7cc6121b187..2aefa5dd4e0 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -911,7 +911,7 @@ async def _create_job_group( SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_group_self_and_ancestors.batch_id = %s AND job_group_self_and_ancestors.job_group_id = %s; """, @@ -1857,10 +1857,10 @@ async def update(tx: Transaction): SELECT cancelled_t.cancelled IS NOT NULL AS cancelled FROM batches LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id WHERE batches.id = %s AND batches.user = %s AND NOT deleted FOR UPDATE; """, @@ -1936,10 +1936,10 @@ async def _get_batch(app, batch_id): LEFT JOIN job_groups_n_jobs_in_complete_states ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id LEFT JOIN LATERAL ( SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown FROM ( @@ -1984,7 +1984,7 @@ async def _get_job_group(app, batch_id: int, job_group_id: int) -> GetJobGroupRe SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -2086,7 +2086,7 @@ async def close_batch(request, userdata): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -2129,10 +2129,10 @@ async def commit_update(request: web.Request, userdata): FROM batches LEFT JOIN batch_updates ON batches.id = batch_updates.batch_id LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id WHERE batches.user = %s AND batches.id = %s AND batch_updates.update_id = %s AND NOT deleted; """, (batch_id, ROOT_JOB_GROUP_ID, user, batch_id, update_id), diff --git a/batch/batch/front_end/query/query_v1.py b/batch/batch/front_end/query/query_v1.py index c5cbefdf2f3..4fa36cbe43a 100644 --- a/batch/batch/front_end/query/query_v1.py +++ b/batch/batch/front_end/query/query_v1.py @@ -106,7 +106,7 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int]) SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -171,7 +171,7 @@ def parse_list_job_groups_query_v1( SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/front_end/query/query_v2.py b/batch/batch/front_end/query/query_v2.py index 2eba30a95fa..974292f64e6 100644 --- a/batch/batch/front_end/query/query_v2.py +++ b/batch/batch/front_end/query/query_v2.py @@ -145,7 +145,7 @@ def parse_list_batches_query_v2(user: str, q: str, last_batch_id: Optional[int]) ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id LEFT JOIN (SELECT *, 1 AS cancelled FROM job_groups_cancelled) AS cancelled_t - ON job_groups.batch_id = cancelled_t.id + ON job_groups.batch_id = cancelled_t.batch_id AND job_groups.job_group_id = cancelled_t.job_group_id INNER JOIN LATERAL ( WITH resource_costs AS ( diff --git a/batch/sql/rename-job-groups-cancelled-column.sql b/batch/sql/rename-job-groups-cancelled-column.sql new file mode 100644 index 00000000000..9c6c9419956 --- /dev/null +++ b/batch/sql/rename-job-groups-cancelled-column.sql @@ -0,0 +1,521 @@ +/* +mysql> SELECT * FROM INFORMATION_SCHEMA.INNODB_SYS_FOREIGN \G + +(mysql 8.x or above) +mysql> SELECT * FROM INFORMATION_SCHEMA.INNODB_FOREIGN \G + +*************************** XXX row *************************** + ID: batches/job_groups_cancelled_ibfk_1 +FOR_NAME: batches/job_groups_cancelled +REF_NAME: batches/batches + N_COLS: 1 + TYPE: 33 +*************************** YYY row *************************** + ID: batches/job_groups_cancelled_ibfk_2 +FOR_NAME: batches/job_groups_cancelled +REF_NAME: batches/job_groups + N_COLS: 2 + TYPE: 33 +*/ + +ALTER TABLE job_groups_cancelled DROP FOREIGN KEY job_groups_cancelled_ibfk_1; +ALTER TABLE job_groups_cancelled DROP FOREIGN KEY job_groups_cancelled_ibfk_2; +ALTER TABLE job_groups_cancelled DROP PRIMARY KEY; +ALTER TABLE job_groups_cancelled CHANGE COLUMN `id` `batch_id` BIGINT NOT NULL; +ALTER TABLE job_groups_cancelled ADD PRIMARY KEY (`batch_id`, `job_group_id`), + ADD FOREIGN KEY (`batch_id`) REFERENCES batches(id) ON DELETE CASCADE, + ADD FOREIGN KEY (`batch_id`, `job_group_id`) REFERENCES job_groups (`batch_id`, `job_group_id`) ON DELETE CASCADE; + +DELIMITER $$ + +DROP TRIGGER IF EXISTS jobs_before_insert $$ +CREATE TRIGGER jobs_before_insert BEFORE INSERT ON jobs +FOR EACH ROW +BEGIN + DECLARE job_group_cancelled BOOLEAN; + + SET job_group_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id + LOCK IN SHARE MODE); + + IF job_group_cancelled THEN + SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = "job group has already been cancelled"; + END IF; +END $$ + +DROP TRIGGER IF EXISTS jobs_after_update $$ +CREATE TRIGGER jobs_after_update AFTER UPDATE ON jobs +FOR EACH ROW +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_job_group_cancelled BOOLEAN; + DECLARE cur_n_tokens INT; + DECLARE rand_token INT; + + DECLARE always_run boolean; + DECLARE cores_mcpu bigint; + + DECLARE was_marked_cancelled boolean; + DECLARE was_cancelled boolean; + DECLARE was_cancellable boolean; + + DECLARE now_marked_cancelled boolean; + DECLARE now_cancelled boolean; + DECLARE now_cancellable boolean; + + DECLARE was_ready boolean; + DECLARE now_ready boolean; + + DECLARE was_running boolean; + DECLARE now_running boolean; + + DECLARE was_creating boolean; + DECLARE now_creating boolean; + + DECLARE delta_n_ready_cancellable_jobs int; + DECLARE delta_ready_cancellable_cores_mcpu bigint; + DECLARE delta_n_ready_jobs int; + DECLARE delta_ready_cores_mcpu bigint; + DECLARE delta_n_cancelled_ready_jobs int; + + DECLARE delta_n_running_cancellable_jobs int; + DECLARE delta_running_cancellable_cores_mcpu bigint; + DECLARE delta_n_running_jobs int; + DECLARE delta_running_cores_mcpu bigint; + DECLARE delta_n_cancelled_running_jobs int; + + DECLARE delta_n_creating_cancellable_jobs int; + DECLARE delta_n_creating_jobs int; + DECLARE delta_n_cancelled_creating_jobs int; + + SELECT user INTO cur_user FROM batches WHERE id = NEW.batch_id; + + SET cur_job_group_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id + LOCK IN SHARE MODE); + + SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE; + SET rand_token = FLOOR(RAND() * cur_n_tokens); + + SET always_run = old.always_run; # always_run is immutable + SET cores_mcpu = old.cores_mcpu; # cores_mcpu is immutable + + SET was_marked_cancelled = old.cancelled OR cur_job_group_cancelled; + SET was_cancelled = NOT always_run AND was_marked_cancelled; + SET was_cancellable = NOT always_run AND NOT was_marked_cancelled; + + SET now_marked_cancelled = new.cancelled or cur_job_group_cancelled; + SET now_cancelled = NOT always_run AND now_marked_cancelled; + SET now_cancellable = NOT always_run AND NOT now_marked_cancelled; + + # NB: was_cancelled => now_cancelled b/c you cannot be uncancelled + + SET was_ready = old.state = 'Ready'; + SET now_ready = new.state = 'Ready'; + SET was_running = old.state = 'Running'; + SET now_running = new.state = 'Running'; + SET was_creating = old.state = 'Creating'; + SET now_creating = new.state = 'Creating'; + + SET delta_n_ready_cancellable_jobs = (-1 * was_ready * was_cancellable ) + (now_ready * now_cancellable ) ; + SET delta_n_ready_jobs = (-1 * was_ready * (NOT was_cancelled)) + (now_ready * (NOT now_cancelled)); + SET delta_n_cancelled_ready_jobs = (-1 * was_ready * was_cancelled ) + (now_ready * now_cancelled ) ; + + SET delta_n_running_cancellable_jobs = (-1 * was_running * was_cancellable ) + (now_running * now_cancellable ) ; + SET delta_n_running_jobs = (-1 * was_running * (NOT was_cancelled)) + (now_running * (NOT now_cancelled)); + SET delta_n_cancelled_running_jobs = (-1 * was_running * was_cancelled ) + (now_running * now_cancelled ) ; + + SET delta_n_creating_cancellable_jobs = (-1 * was_creating * was_cancellable ) + (now_creating * now_cancellable ) ; + SET delta_n_creating_jobs = (-1 * was_creating * (NOT was_cancelled)) + (now_creating * (NOT now_cancelled)); + SET delta_n_cancelled_creating_jobs = (-1 * was_creating * was_cancelled ) + (now_creating * now_cancelled ) ; + + SET delta_ready_cancellable_cores_mcpu = delta_n_ready_cancellable_jobs * cores_mcpu; + SET delta_ready_cores_mcpu = delta_n_ready_jobs * cores_mcpu; + + SET delta_running_cancellable_cores_mcpu = delta_n_running_cancellable_jobs * cores_mcpu; + SET delta_running_cores_mcpu = delta_n_running_jobs * cores_mcpu; + + INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, + n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs, + n_running_cancellable_jobs, + running_cancellable_cores_mcpu) + SELECT NEW.batch_id, NEW.update_id, job_group_self_and_ancestors.ancestor_id, NEW.inst_coll, rand_token, + delta_n_ready_cancellable_jobs, + delta_ready_cancellable_cores_mcpu, + delta_n_creating_cancellable_jobs, + delta_n_running_cancellable_jobs, + delta_running_cancellable_cores_mcpu + FROM job_group_self_and_ancestors + WHERE job_group_self_and_ancestors.batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id + ON DUPLICATE KEY UPDATE + n_ready_cancellable_jobs = n_ready_cancellable_jobs + delta_n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu + delta_ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs = n_creating_cancellable_jobs + delta_n_creating_cancellable_jobs, + n_running_cancellable_jobs = n_running_cancellable_jobs + delta_n_running_cancellable_jobs, + running_cancellable_cores_mcpu = running_cancellable_cores_mcpu + delta_running_cancellable_cores_mcpu; + + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, + n_running_jobs, + n_creating_jobs, + ready_cores_mcpu, + running_cores_mcpu, + n_cancelled_ready_jobs, + n_cancelled_running_jobs, + n_cancelled_creating_jobs + ) + VALUES (cur_user, NEW.inst_coll, rand_token, + delta_n_ready_jobs, + delta_n_running_jobs, + delta_n_creating_jobs, + delta_ready_cores_mcpu, + delta_running_cores_mcpu, + delta_n_cancelled_ready_jobs, + delta_n_cancelled_running_jobs, + delta_n_cancelled_creating_jobs + ) + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs + delta_n_ready_jobs, + n_running_jobs = n_running_jobs + delta_n_running_jobs, + n_creating_jobs = n_creating_jobs + delta_n_creating_jobs, + ready_cores_mcpu = ready_cores_mcpu + delta_ready_cores_mcpu, + running_cores_mcpu = running_cores_mcpu + delta_running_cores_mcpu, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + delta_n_cancelled_ready_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + delta_n_cancelled_running_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + delta_n_cancelled_creating_jobs; +END $$ + +DROP PROCEDURE IF EXISTS cancel_batch $$ +CREATE PROCEDURE cancel_batch( + IN in_batch_id VARCHAR(100) +) +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_batch_state VARCHAR(40); + DECLARE cur_cancelled BOOLEAN; + DECLARE cur_n_cancelled_ready_jobs INT; + DECLARE cur_cancelled_ready_cores_mcpu BIGINT; + DECLARE cur_n_cancelled_running_jobs INT; + DECLARE cur_cancelled_running_cores_mcpu BIGINT; + DECLARE cur_n_n_cancelled_creating_jobs INT; + + START TRANSACTION; + + SELECT user, `state` INTO cur_user, cur_batch_state FROM batches + WHERE id = in_batch_id + FOR UPDATE; + + SET cur_cancelled = EXISTS (SELECT TRUE + FROM job_groups_cancelled + WHERE batch_id = in_batch_id + FOR UPDATE); + + IF cur_batch_state = 'running' AND NOT cur_cancelled THEN + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, ready_cores_mcpu, + n_running_jobs, running_cores_mcpu, + n_creating_jobs, + n_cancelled_ready_jobs, n_cancelled_running_jobs, n_cancelled_creating_jobs) + SELECT user, inst_coll, 0, + -1 * (@n_ready_cancellable_jobs := COALESCE(SUM(n_ready_cancellable_jobs), 0)), + -1 * (@ready_cancellable_cores_mcpu := COALESCE(SUM(ready_cancellable_cores_mcpu), 0)), + -1 * (@n_running_cancellable_jobs := COALESCE(SUM(n_running_cancellable_jobs), 0)), + -1 * (@running_cancellable_cores_mcpu := COALESCE(SUM(running_cancellable_cores_mcpu), 0)), + -1 * (@n_creating_cancellable_jobs := COALESCE(SUM(n_creating_cancellable_jobs), 0)), + COALESCE(SUM(n_ready_cancellable_jobs), 0), + COALESCE(SUM(n_running_cancellable_jobs), 0), + COALESCE(SUM(n_creating_cancellable_jobs), 0) + FROM job_group_inst_coll_cancellable_resources + JOIN batches ON batches.id = job_group_inst_coll_cancellable_resources.batch_id + INNER JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND + job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id + WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND batch_updates.committed + GROUP BY user, inst_coll + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs - @n_ready_cancellable_jobs, + ready_cores_mcpu = ready_cores_mcpu - @ready_cancellable_cores_mcpu, + n_running_jobs = n_running_jobs - @n_running_cancellable_jobs, + running_cores_mcpu = running_cores_mcpu - @running_cancellable_cores_mcpu, + n_creating_jobs = n_creating_jobs - @n_creating_cancellable_jobs, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + @n_ready_cancellable_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + @n_running_cancellable_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + @n_creating_cancellable_jobs; + + # there are no cancellable jobs left, they have been cancelled + DELETE FROM job_group_inst_coll_cancellable_resources WHERE batch_id = in_batch_id; + + # cancel root job group only + INSERT INTO job_groups_cancelled (batch_id, job_group_id) VALUES (in_batch_id, 0); + END IF; + + COMMIT; +END $$ + +DROP PROCEDURE IF EXISTS cancel_job_group $$ +CREATE PROCEDURE cancel_job_group( + IN in_batch_id VARCHAR(100), + IN in_job_group_id INT +) +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_job_group_state VARCHAR(40); + DECLARE cur_cancelled BOOLEAN; + + START TRANSACTION; + + SELECT user, `state` INTO cur_user, cur_job_group_state + FROM job_groups + WHERE batch_id = in_batch_id AND job_group_id = in_job_group_id + FOR UPDATE; + + SET cur_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id + FOR UPDATE); + + IF NOT cur_cancelled THEN + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, ready_cores_mcpu, + n_running_jobs, running_cores_mcpu, + n_creating_jobs, + n_cancelled_ready_jobs, n_cancelled_running_jobs, n_cancelled_creating_jobs) + SELECT user, inst_coll, 0, + -1 * (@n_ready_cancellable_jobs := COALESCE(SUM(n_ready_cancellable_jobs), 0)), + -1 * (@ready_cancellable_cores_mcpu := COALESCE(SUM(ready_cancellable_cores_mcpu), 0)), + -1 * (@n_running_cancellable_jobs := COALESCE(SUM(n_running_cancellable_jobs), 0)), + -1 * (@running_cancellable_cores_mcpu := COALESCE(SUM(running_cancellable_cores_mcpu), 0)), + -1 * (@n_creating_cancellable_jobs := COALESCE(SUM(n_creating_cancellable_jobs), 0)), + COALESCE(SUM(n_ready_cancellable_jobs), 0), + COALESCE(SUM(n_running_cancellable_jobs), 0), + COALESCE(SUM(n_creating_cancellable_jobs), 0) + FROM job_group_inst_coll_cancellable_resources + INNER JOIN batches ON job_group_inst_coll_cancellable_resources.batch_id = batches.id + INNER JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND + job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id + WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND + job_group_inst_coll_cancellable_resources.job_group_id = in_job_group_id AND + batch_updates.committed + GROUP BY user, inst_coll + FOR UPDATE + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs - @n_ready_cancellable_jobs, + ready_cores_mcpu = ready_cores_mcpu - @ready_cancellable_cores_mcpu, + n_running_jobs = n_running_jobs - @n_running_cancellable_jobs, + running_cores_mcpu = running_cores_mcpu - @running_cancellable_cores_mcpu, + n_creating_jobs = n_creating_jobs - @n_creating_cancellable_jobs, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + @n_ready_cancellable_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + @n_running_cancellable_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + @n_creating_cancellable_jobs; + + INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, + n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs, + n_running_cancellable_jobs, + running_cancellable_cores_mcpu) + SELECT batch_id, update_id, ancestor_id, inst_coll, 0, + -1 * (@jg_n_ready_cancellable_jobs := old_n_ready_cancellable_jobs), + -1 * (@jg_ready_cancellable_cores_mcpu := old_ready_cancellable_cores_mcpu), + -1 * (@jg_n_creating_cancellable_jobs := old_n_creating_cancellable_jobs), + -1 * (@jg_n_running_cancellable_jobs := old_n_running_cancellable_jobs), + -1 * (@jg_running_cancellable_cores_mcpu := old_running_cancellable_cores_mcpu) + FROM job_group_self_and_ancestors + INNER JOIN LATERAL ( + SELECT update_id, inst_coll, COALESCE(SUM(n_ready_cancellable_jobs), 0) AS old_n_ready_cancellable_jobs, + COALESCE(SUM(ready_cancellable_cores_mcpu), 0) AS old_ready_cancellable_cores_mcpu, + COALESCE(SUM(n_creating_cancellable_jobs), 0) AS old_n_creating_cancellable_jobs, + COALESCE(SUM(n_running_cancellable_jobs), 0) AS old_n_running_cancellable_jobs, + COALESCE(SUM(running_cancellable_cores_mcpu), 0) AS old_running_cancellable_cores_mcpu + FROM job_group_inst_coll_cancellable_resources + WHERE job_group_inst_coll_cancellable_resources.batch_id = job_group_self_and_ancestors.batch_id AND + job_group_inst_coll_cancellable_resources.job_group_id = job_group_self_and_ancestors.job_group_id + GROUP BY update_id, inst_coll + FOR UPDATE + ) AS t ON TRUE + WHERE job_group_self_and_ancestors.batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id + ON DUPLICATE KEY UPDATE + n_ready_cancellable_jobs = n_ready_cancellable_jobs - @jg_n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu - @jg_ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs = n_creating_cancellable_jobs - @jg_n_creating_cancellable_jobs, + n_running_cancellable_jobs = n_running_cancellable_jobs - @jg_n_running_cancellable_jobs, + running_cancellable_cores_mcpu = running_cancellable_cores_mcpu - @jg_running_cancellable_cores_mcpu; + + # Group cancellation, like any operation, must be O(1) time. The number of descendant groups is unbounded, + # so we neither delete rows from job_group_inst_coll_cancellable_resources nor update job_groups_cancelled. + # The former is handled by main.py. In the latter case, group cancellation state is implicitly defined by an + # upwards traversal on the ancestor tree. + + INSERT INTO job_groups_cancelled (batch_id, job_group_id) + VALUES (in_batch_id, in_job_group_id); + END IF; + + COMMIT; +END $$ + +DROP PROCEDURE IF EXISTS schedule_job $$ +CREATE PROCEDURE schedule_job( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100) +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_cores_mcpu INT; + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_instance_state VARCHAR(40); + DECLARE cur_attempt_id VARCHAR(40); + DECLARE delta_cores_mcpu INT; + DECLARE cur_instance_is_pool BOOLEAN; + + START TRANSACTION; + + SELECT state, cores_mcpu, attempt_id + INTO cur_job_state, cur_cores_mcpu, cur_attempt_id + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + SELECT is_pool + INTO cur_instance_is_pool + FROM instances + LEFT JOIN inst_colls ON instances.inst_coll = inst_colls.name + WHERE instances.name = in_instance_name; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + IF cur_instance_is_pool THEN + IF delta_cores_mcpu = 0 THEN + SET delta_cores_mcpu = cur_cores_mcpu; + ELSE + SET delta_cores_mcpu = 0; + END IF; + END IF; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF (cur_job_state = 'Ready' OR cur_job_state = 'Creating') AND NOT cur_job_cancel AND cur_instance_state = 'active' THEN + UPDATE jobs SET state = 'Running', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + COMMIT; + SELECT 0 as rc, in_instance_name, delta_cores_mcpu; + ELSE + COMMIT; + SELECT 1 as rc, + cur_job_state, + cur_job_cancel, + cur_instance_state, + in_instance_name, + cur_attempt_id, + delta_cores_mcpu, + 'job not Ready or cancelled or instance not active, but attempt already exists' as message; + END IF; +END $$ + +DROP PROCEDURE IF EXISTS mark_job_creating $$ +CREATE PROCEDURE mark_job_creating( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100), + IN new_start_time BIGINT +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_cores_mcpu INT; + DECLARE cur_instance_state VARCHAR(40); + DECLARE delta_cores_mcpu INT; + + START TRANSACTION; + + SELECT state, cores_mcpu + INTO cur_job_state, cur_cores_mcpu + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + UPDATE attempts SET start_time = new_start_time, rollup_time = new_start_time + WHERE batch_id = in_batch_id AND job_id = in_job_id AND attempt_id = in_attempt_id; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF cur_job_state = 'Ready' AND NOT cur_job_cancel AND cur_instance_state = 'pending' THEN + UPDATE jobs SET state = 'Creating', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + END IF; + + COMMIT; + SELECT 0 as rc, delta_cores_mcpu; +END $$ + +DROP PROCEDURE IF EXISTS mark_job_started $$ +CREATE PROCEDURE mark_job_started( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100), + IN new_start_time BIGINT +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_cores_mcpu INT; + DECLARE cur_instance_state VARCHAR(40); + DECLARE delta_cores_mcpu INT; + + START TRANSACTION; + + SELECT state, cores_mcpu + INTO cur_job_state, cur_cores_mcpu + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + UPDATE attempts SET start_time = new_start_time, rollup_time = new_start_time + WHERE batch_id = in_batch_id AND job_id = in_job_id AND attempt_id = in_attempt_id; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF cur_job_state = 'Ready' AND NOT cur_job_cancel AND cur_instance_state = 'active' THEN + UPDATE jobs SET state = 'Running', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + END IF; + + COMMIT; + SELECT 0 as rc, delta_cores_mcpu; +END $$ + +DELIMITER ; diff --git a/build.yaml b/build.yaml index a12528f67e2..3a0838add1a 100644 --- a/build.yaml +++ b/build.yaml @@ -2294,6 +2294,9 @@ steps: - name: fix-mark-job-complete-deadlocks script: /io/sql/fix-mark-job-complete-deadlocks.sql online: true + - name: rename-job-groups-cancelled-column + script: /io/sql/rename-job-groups-cancelled-column.sql + online: false inputs: - from: /repo/batch/sql to: /io/sql