Skip to content

Commit

Permalink
Task scheduling (#8676)
Browse files Browse the repository at this point in the history
* adding task scheduling

* Initialising tw_name with 'any' to fix issue #8618

* Changing any to crab-prod-tw01:testing

* Final changes: fixing key names

* Removing configreq reference

* fixing EOL

* Lazy logging

* Change selectwork comment and make use of return value

* Adding an Adhoc section in the configuration and using it to set the selection_limit

* Removing hardcoded tw_name value from Task.py and making use of SQL bindings

* Adding logging capability in task scheduling

* Adding a dry_run option under Adhoc to make user and task count logging conditional

* Making round robin task scheduling as an option for dry_run. All waiting tasks are returned if dry_run is false.

* Change Adhoc to TaskScheduling and logging a table of username waiting and selected tasks

* Default dry_run for taskscheduling set to True

---------

Co-authored-by: Vijay Chakravarty <[email protected]>
Co-authored-by: Vijay Chakravarty <[email protected]>
  • Loading branch information
3 people committed Sep 4, 2024
1 parent 6063580 commit 9199c3a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/python/CRABInterface/DataWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def submit(self, workflow, activity, jobtype, jobsw, jobarch, use_parent, second
self.api.modify(self.Task.New_sql,
task_name = [workflow],
task_activity = [activity],
task_status = ['NEW'],
task_status = ['WAITING'],
task_command = ['SUBMIT'],
task_failure = [''],
job_sw = [jobsw],
Expand Down Expand Up @@ -212,6 +212,7 @@ def submit(self, workflow, activity, jobtype, jobsw, jobarch, use_parent, second
job_type = [jobtype],
arguments = [dbSerializer(arguments)],
save_logs = ['T' if savelogsflag else 'F'],
tw_name = ['NotKnownYet'],
user_infiles = [dbSerializer(adduserfiles)],
maxjobruntime = [maxjobruntime],
numcores = [numcores],
Expand Down
4 changes: 3 additions & 1 deletion src/python/CRABUtils/TaskUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def updateTaskStatus(crabserver=None, taskName=None, status=None, logger=None):
""" change task status in the DB """
msg = f"Will set to {status} task {taskName}"
logger.info(msg)
if status == 'NEW':
if status == 'WAITING':
command = 'SUBMIT'
elif status == 'NEW':
command = 'SUBMIT'
elif status == 'SUBMITREFUSED':
command = 'SUBMIT'
Expand Down
4 changes: 2 additions & 2 deletions src/python/Databases/TaskDB/Oracle/Task/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Task(object):
tm_split_algo, tm_split_args, tm_totalunits, tm_user_sandbox, tm_debug_files, tm_cache_url, tm_username, tm_user_dn, \
tm_user_vo, tm_user_role, tm_user_group, tm_publish_name, tm_asyncdest, tm_dbs_url, tm_publish_dbs_url, \
tm_publication, tm_outfiles, tm_tfile_outfiles, tm_edm_outfiles, tm_job_type, tm_generator, tm_arguments, \
tm_save_logs, tm_user_infiles, tm_maxjobruntime, tm_numcores, tm_maxmemory, tm_priority, \
tm_save_logs, tw_name, tm_user_infiles, tm_maxjobruntime, tm_numcores, tm_maxmemory, tm_priority, \
tm_scriptexe, tm_scriptargs, tm_extrajdl, tm_events_per_lumi, tm_collector, tm_schedd, tm_dry_run, \
tm_user_files, tm_transfer_outputs, tm_output_lfn, tm_ignore_locality, tm_fail_limit, tm_one_event_mode, tm_submitter_ip_addr, tm_ignore_global_blacklist, \
tm_user_config) \
Expand All @@ -59,7 +59,7 @@ class Task(object):
:split_algo, :split_args, :total_units, :user_sandbox, :debug_files, :cache_url, :username, :user_dn, \
:user_vo, :user_role, :user_group, :publish_name, :asyncdest, :dbs_url, :publish_dbs_url, \
:publication, :outfiles, :tfile_outfiles, :edm_outfiles, :job_type, :generator, :arguments, \
:save_logs, :user_infiles, :maxjobruntime, :numcores, :maxmemory, :priority, \
:save_logs, :tw_name, :user_infiles, :maxjobruntime, :numcores, :maxmemory, :priority, \
:scriptexe, :scriptargs, :extrajdl, :events_per_lumi, :collector, :schedd_name, :dry_run, \
:user_files, :transfer_outputs, :output_lfn, :ignore_locality, :fail_limit, :one_event_mode, :submitter_ip_addr, :ignore_global_blacklist, \
:user_config)"
Expand Down
2 changes: 1 addition & 1 deletion src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
MAX_TB_TO_RECALL_AT_A_SINGLE_SITE = 1000 # effectively no limit. See https://github.com/dmwm/CRABServer/issues/7610

# These are all possible statuses of a task in the TaskDB.
TASKDBSTATUSES_TMP = ['NEW', 'HOLDING', 'QUEUED', 'TAPERECALL', 'KILLRECALL']
TASKDBSTATUSES_TMP = ['WAITING', 'NEW', 'HOLDING', 'QUEUED', 'TAPERECALL', 'KILLRECALL']
TASKDBSTATUSES_FAILURES = ['SUBMITFAILED', 'KILLFAILED', 'RESUBMITFAILED', 'FAILED']
TASKDBSTATUSES_FINAL = ['UPLOADED', 'SUBMITTED', 'KILLED'] + TASKDBSTATUSES_FAILURES
TASKDBSTATUSES = TASKDBSTATUSES_TMP + TASKDBSTATUSES_FINAL
Expand Down
3 changes: 3 additions & 0 deletions src/python/TaskWorker/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def main():
raise ConfigException("Configuration not found")

configuration = loadConfigurationFile(os.path.abspath(options.config))
configuration.section_('TaskScheduling')
configuration.TaskScheduling.selection_limit = 10
configuration.TaskScheduling.dry_run= True
status, msg = validateConfig(configuration)
if not status:
raise ConfigException(msg)
Expand Down
124 changes: 124 additions & 0 deletions src/python/TaskWorker/MasterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from TaskWorker.WorkerExceptions import ConfigException
from TaskWorker.Actions.Recurring.BaseRecurringAction import handleRecurring
from TaskWorker.Actions.Handler import handleResubmit, handleNewTask, handleKill
from CRABUtils.TaskUtils import getTasks, updateTaskStatus
import random

## NOW placing this here, then to be verified if going into Action.Handler, or TSM
## The meaning of the elements in the 3-tuples are as follows:
Expand Down Expand Up @@ -245,6 +247,123 @@ def getRecurringActionInst(self, actionName):
return getattr(mod, actionName)(self.config.TaskWorker.logsDir)


def _externalScheduling(self, limit):
"""
External scheduling method using round-robin algorithm to get tasks
in waiting status and consider resource utilization for fair share.
"""
self.logger.info("Starting external scheduling.")

try:
# Retrieve tasks with 'WAITING' status
waiting_tasks = getTasks(crabserver=self.crabserver, status='WAITING', logger=self.logger, limit=limit)

if not waiting_tasks:
self.logger.info("No tasks in 'WAITING' status found.")
return []

# Organize tasks by user
tasks_by_user = {}
for task in waiting_tasks:
user = task['tm_username']
if user not in tasks_by_user:
tasks_by_user[user] = []
tasks_by_user[user].append(task)

# Perform round-robin selection among users
users = list(tasks_by_user.keys())
random.shuffle(users) # To ensure fair round-robin each time
selected_tasks = []

for user in users:
user_tasks = tasks_by_user[user]
selected_tasks.extend(user_tasks[:limit // len(users)])

# Create and populate task_count dictionary
task_count = {'selected': {}, 'waiting': {}}

for status, tasks in [('selected', selected_tasks), ('waiting', waiting_tasks)]:
for task in tasks:
username = task['tm_username']
task_count[status][username] = task_count[status].get(username, 0) + 1

# Prepare table headers and rows
headers = ['Username', 'Waiting', 'Selected']
rows = []

# Collect all usernames to ensure every user appears in the table
all_usernames = set(task_count['selected'].keys()).union(task_count['waiting'].keys())

for username in all_usernames:
waiting_count = task_count['waiting'].get(username, 0)
selected_count = task_count['selected'].get(username, 0)
rows.append([username, waiting_count, selected_count])

# Determine the width of each column for formatting
widths = [max(len(header) for header in headers)] + [max(len(str(row[i])) for row in rows) for i in range(1, len(headers))]

# Prepare formatted table string
table_header = ' | '.join(f'{header:<{width}}' for header, width in zip(headers, widths))
table_separator = '-|-'.join('-' * width for width in widths)
table_rows = '\n'.join(' | '.join(f'{str(cell):<{width}}' for cell, width in zip(row, widths)) for row in rows)

# Combine header, separator, and rows into one string
table = f"{table_header}\n{table_separator}\n{table_rows}"

# Log the formatted table
self.logger.info('\n%s', table)

if self.config.TaskScheduling.dry_run:
return selected_tasks #dry_run True (with Task Scheduling)
else:
return waiting_tasks #dry_run False (without Task Scheduling)

except Exception as e:
self.logger.exception("Exception occurred during external scheduling: %s", str(e))
return []

def _pruneTaskQueue(self):
self.logger.info("Pruning the queue if required...logic tbd")

def _reportQueueStatus(self):
self.logger.info("Report Queue status... logic tbd")


def _selectWork(self, limit):
"""This function calls external scheduling and updates task status for the selected tasks"""
self.logger.info("Starting work selection process.")

# Call the external scheduling method
selected_tasks = self._externalScheduling(limit)

if not selected_tasks:
return False

try:
# Update the status of each selected task to 'NEW'
for task in selected_tasks:
task_name = task['tm_taskname']
updateTaskStatus(crabserver=self.crabserver, taskName=task_name, status='NEW', logger=self.logger)
self.logger.info("Task %s status updated to 'NEW'.", task_name)

# Prune the task queue if necessary
self._pruneTaskQueue()

# Report queue status
self._reportQueueStatus()

except HTTPException as hte:
msg = "HTTP Error during _selectWork: %s\n" % str(hte)
msg += "HTTP Headers are %s: " % hte.headers
self.logger.error(msg)
return False

except Exception: #pylint: disable=broad-except
self.logger.exception("Server could not process the _selectWork request.")
return False

return True

def _lockWork(self, limit, getstatus, setstatus):
"""Today this is always returning true, because we do not want the worker to die if
the server endpoint is not avaialable.
Expand Down Expand Up @@ -400,6 +519,11 @@ def algorithm(self):
self.restartQueuedTasks()
self.logger.debug("Master Worker Starting Main Cycle.")
while not self.STOP:
selection_limit = self.config.TaskScheduling.selection_limit
if not self._selectWork(limit=selection_limit):
self.logger.warning("Selection of work failed.")
else:
self.logger.info("Work selected successfully.")
limit = self.slaves.queueableTasks()
if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'):
time.sleep(self.config.TaskWorker.polling)
Expand Down

0 comments on commit 9199c3a

Please sign in to comment.