Skip to content

Commit fd59405

Browse files
committed
types and cleanups
1 parent c932df0 commit fd59405

File tree

5 files changed

+142
-72
lines changed

5 files changed

+142
-72
lines changed

src/toil/batchSystems/singleMachine.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,14 @@ def _runDebugJob(self, jobCommand, jobID, environment):
405405
# We can actually run in this thread
406406
jobName, jobStoreLocator, jobStoreID = jobCommand.split()[1:4] # Parse command
407407
jobStore = Toil.resumeJobStore(jobStoreLocator)
408-
toil_worker.workerScript(jobStore, jobStore.config, jobName, jobStoreID, None,
409-
redirectOutputToLogFile=not self.debugWorker) # Call the worker
408+
toil_worker.workerScript(
409+
jobStore,
410+
jobStore.config,
411+
jobName,
412+
jobStoreID,
413+
None,
414+
redirectOutputToLogFile=not self.debugWorker,
415+
) # Call the worker
410416
else:
411417
# Run synchronously. If starting or running the command fails, let the exception stop us.
412418
subprocess.check_call(jobCommand,

src/toil/leader.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -664,8 +664,8 @@ def innerLoop(self):
664664

665665
# Consistency check the toil state
666666
assert self.toilState.updatedJobs == {}
667-
#assert self.toilState.successorCounts == {}
668-
#assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
667+
# assert self.toilState.successorCounts == {}
668+
# assert self.toilState.successorJobStoreIDToPredecessorJobs == {}
669669
assert self.toilState.serviceJobStoreIDToPredecessorJob == {}
670670
assert self.toilState.servicesIssued == {}
671671
# assert self.toilState.jobsToBeScheduledWithMultiplePredecessors # These are not properly emptied yet
@@ -749,17 +749,22 @@ def issueJob(self, jobNode):
749749
for context in self.batchSystem.getWorkerContexts():
750750
# For each context manager hook the batch system wants to run in
751751
# the worker, serialize and send it.
752-
workerCommand.append('--context')
753-
workerCommand.append(base64.b64encode(pickle.dumps(context)).decode('utf-8'))
752+
workerCommand.append("--context")
753+
workerCommand.append(
754+
base64.b64encode(pickle.dumps(context)).decode("utf-8")
755+
)
754756

755757
# add the toilState as a pickle
756-
workerCommand.append('--toilState')
757-
workerCommand.append(base64.b64encode(pickle.dumps(self.toilState)).decode('utf-8'))
758-
759-
jobNode.command = ' '.join(workerCommand)
758+
workerCommand.append("--toilState")
759+
workerCommand.append(
760+
base64.b64encode(pickle.dumps(self.toilState)).decode("utf-8")
761+
)
760762

761-
omp_threads = os.environ.get('OMP_NUM_THREADS') \
762-
or str(max(1, int(jobNode.cores))) # make sure OMP_NUM_THREADS is a positive integer
763+
jobNode.command = " ".join(workerCommand)
764+
765+
omp_threads = os.environ.get("OMP_NUM_THREADS") or str(
766+
max(1, int(jobNode.cores))
767+
) # make sure OMP_NUM_THREADS is a positive integer
763768

764769
job_environment = {
765770
# Set the number of cores used by OpenMP applications

src/toil/test/cwl/cwlTest.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -814,11 +814,11 @@ def test_download_structure(self) -> None:
814814
class CWLToilOptimizeTests(ToilTest):
815815
def setUp(self):
816816
"""Runs anew before each test to create farm fresh temp dirs."""
817-
self.outDir = f'/tmp/toil-cwl-test-{str(uuid.uuid4())}'
817+
self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}"
818818
os.makedirs(self.outDir)
819819
self.rootDir = self._projectRootPath()
820-
self.jobDir = os.path.join(self.outDir, 'jobStore')
821-
self.statDir = os.path.join(self.jobDir, 'stats')
820+
self.jobDir = os.path.join(self.outDir, "jobStore")
821+
self.statDir = os.path.join(self.jobDir, "stats")
822822

823823
def tearDown(self):
824824
"""Clean up outputs."""
@@ -828,36 +828,53 @@ def tearDown(self):
828828

829829
def _tester(self, cwlfile, jobfile, expect, main_args=[]):
830830
from toil.cwl import cwltoil
831+
831832
st = StringIO()
832833
main_args = main_args[:]
833-
main_args.extend(['--logDebug','--stats','--outdir', self.outDir, '--jobStore', self.jobDir,
834-
os.path.join(self.rootDir, cwlfile), os.path.join(self.rootDir, jobfile)])
834+
main_args.extend(
835+
[
836+
"--logDebug",
837+
"--stats",
838+
"--outdir",
839+
self.outDir,
840+
"--jobStore",
841+
self.jobDir,
842+
os.path.join(self.rootDir, cwlfile),
843+
os.path.join(self.rootDir, jobfile),
844+
]
845+
)
835846
cwltoil.main(main_args, stdout=st)
836847
out = self._extract_job_lists()
837848
self.assertEqual(out, expect)
838849

839850
def _match_extract_string(self, stringin):
840851
import re
841-
search_pattern = re.compile('^.* (\w*) kind-CWLJob/instance-.*$')
852+
853+
search_pattern = re.compile("^.* (\w*) kind-CWLJob/instance-.*$")
842854
if search_pattern.match(stringin):
843-
return(search_pattern.sub(r'\1',stringin))
855+
return search_pattern.sub(r"\1", stringin)
844856
else:
845-
return(None)
857+
return None
846858

847859
def _extract_job_lists(self):
848860
worker_list = []
849861
for filename in os.listdir(self.statDir):
850-
with open(os.path.join(self.statDir,filename)) as f:
862+
with open(os.path.join(self.statDir, filename)) as f:
851863
test_json = json.load(f)
852-
if 'workers' in test_json.keys() and len(test_json['jobs']) > 0:
853-
job_list = [self._match_extract_string(x) for x in test_json['logs']['names']]
864+
if "workers" in test_json.keys() and len(test_json["jobs"]) > 0:
865+
job_list = [
866+
self._match_extract_string(x)
867+
for x in test_json["logs"]["names"]
868+
]
854869
if not all(x == None for x in job_list):
855870
worker_list.append(job_list)
856871
worker_list.sort()
857-
return(worker_list)
872+
return worker_list
858873

859874
def test_biobb_fail(self):
860-
self._tester('src/toil/test/cwl/md_list_reduced.cwl',
861-
'src/toil/test/cwl/md_list_reduced.json',
862-
[['genion', 'grompp', 'pdb2gmx', 'editconf', 'solvate']],
863-
main_args=[])
875+
self._tester(
876+
"src/toil/test/cwl/md_list_reduced.cwl",
877+
"src/toil/test/cwl/md_list_reduced.json",
878+
[["genion", "grompp", "pdb2gmx", "editconf", "solvate"]],
879+
main_args=[],
880+
)

src/toil/utils/toilDebugJob.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,5 @@ def main() -> None:
4646

4747
jobID = options.jobID[0]
4848
logger.debug(f"Running the following job locally: {jobID}")
49-
workerScript(jobStore, config, jobID, jobID, redirectOutputToLogFile=False)
49+
workerScript(jobStore, config, jobID, jobID, None, redirectOutputToLogFile=False)
5050
logger.debug(f"Finished running: {jobID}")

src/toil/worker.py

Lines changed: 85 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,52 @@
5656
logger = logging.getLogger(__name__)
5757

5858

59-
def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState):
59+
def checkSuccessorReadyToRunMultiplePredecessors(
60+
successor: JobDescription,
61+
predecessor: JobDescription,
62+
jobStore: AbstractJobStore,
63+
toilState: ToilState,
64+
) -> bool:
6065
"""
6166
Handle the special cases of checking if a successor job is
6267
ready to run when there are multiple predecessors.
6368
64-
:param toil.job.JobDescription successor: The successor which has failed.
65-
:param toil.job.JobDescription predecessor: The job which the successor comes after.
69+
:param successor: The successor which has failed.
70+
:param predecessor: The job which the successor comes after.
6671
6772
"""
6873
# See implementation note at the top of this file for discussion of multiple predecessors
69-
logger.debug("Successor job: %s of job: %s has multiple "
70-
"predecessors", successor, predecessor)
71-
logger.debug("Already finished predecessors are: %s", successor.predecessorsFinished)
74+
logger.debug(
75+
"Successor job: %s of job: %s has multiple " "predecessors",
76+
successor,
77+
predecessor,
78+
)
79+
logger.debug(
80+
"Already finished predecessors are: %s", successor.predecessorsFinished
81+
)
7282

7383
# Get the successor JobDescription, which is cached
7484
if successor.jobStoreID not in toilState.jobsToBeScheduledWithMultiplePredecessors:
7585
# TODO: We're loading from the job store in an ad-hoc way!
7686
loaded = jobStore.load(successor.jobStoreID)
77-
toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID] = loaded
87+
toilState.jobsToBeScheduledWithMultiplePredecessors[
88+
successor.jobStoreID
89+
] = loaded
7890
# TODO: we're clobbering a JobDescription we're passing around by value.
79-
successor = toilState.jobsToBeScheduledWithMultiplePredecessors[successor.jobStoreID]
91+
successor = toilState.jobsToBeScheduledWithMultiplePredecessors[
92+
successor.jobStoreID
93+
]
8094

81-
logger.debug("Already finished predecessors are (2) : %s", successor.predecessorsFinished)
95+
logger.debug(
96+
"Already finished predecessors are (2) : %s", successor.predecessorsFinished
97+
)
8298

8399
# Add the predecessor as a finished predecessor to the successor
84100
successor.predecessorsFinished.add(predecessor.jobStoreID)
85101

86-
87-
logger.debug("Already finished predecessors are (3) : %s", successor.predecessorsFinished)
102+
logger.debug(
103+
"Already finished predecessors are (3) : %s", successor.predecessorsFinished
104+
)
88105

89106
# If the successor job's predecessors have all not all completed then
90107
# ignore the successor as is not yet ready to run
@@ -97,33 +114,43 @@ def checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStor
97114
return True
98115

99116

100-
101-
def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilState, config: Config) -> Optional[JobDescription]:
117+
def nextChainable(
118+
predecessor: JobDescription,
119+
jobStore: AbstractJobStore,
120+
toilState: ToilState,
121+
config: Config,
122+
) -> Optional[JobDescription]:
102123
"""
103124
Returns the next chainable job's JobDescription after the given predecessor
104125
JobDescription, if one exists, or None if the chain must terminate.
105126
106127
:param predecessor: The job to chain from
107128
:param jobStore: The JobStore to fetch JobDescriptions from.
108129
:param config: The configuration for the current run.
109-
:param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack
130+
:param toil.toilState.ToilState toilState: A local toilState, for providing a mutatable stack
110131
"""
111132
#If no more jobs to run or services not finished, quit
112133
if len(predecessor.stack) == 0 or len(predecessor.services) > 0 or (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None):
113134
logger.debug("Stopping running chain of jobs: length of stack: %s, services: %s, checkpoint: %s",
114135
len(predecessor.stack), len(predecessor.services), (isinstance(predecessor, CheckpointJobDescription) and predecessor.checkpoint != None))
115136
return None
116137

117-
# logger.debug("Length of stack: %s",len(predecessor.stack))
118-
# logger.debug("Number of : %s",len(predecessor.stack))
119-
if len(predecessor.stack) > 1 and len(predecessor.stack[-1]) > 0 and len(predecessor.stack[-2]) > 0:
138+
# logger.debug("Length of stack: %s",len(predecessor.stack))
139+
# logger.debug("Number of : %s",len(predecessor.stack))
140+
if (
141+
len(predecessor.stack) > 1
142+
and len(predecessor.stack[-1]) > 0
143+
and len(predecessor.stack[-2]) > 0
144+
):
120145
# TODO: Without a real stack list we can freely mutate, we can't chain
121146
# to a child, which may branch, and then go back and do the follow-ons
122147
# of the original job.
123148
# TODO: Go back to a free-form stack list and require some kind of
124149
# stack build phase?
125-
#logger.debug("Job has both children and follow-ons - let's see if this breaks")
126-
logger.debug("Stopping running chain of jobs because job has both children and follow-ons")
150+
# logger.debug("Job has both children and follow-ons - let's see if this breaks")
151+
logger.debug(
152+
"Stopping running chain of jobs because job has both children and follow-ons"
153+
)
127154
return None
128155

129156
#Get the next set of jobs to run
@@ -135,17 +162,17 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS
135162

136163
#If there are 2 or more jobs to run in parallel we quit
137164
if len(jobs) >= 2:
138-
logger.debug("No more jobs can run in series by this worker,"
139-
" it's got %i children", len(jobs))
165+
logger.debug(
166+
"No more jobs can run in series by this worker," " it's got %i children",
167+
len(jobs),
168+
)
140169
return None
141170

142171
# Grab the only job that should be there.
143172
successorID = next(iter(jobs))
144173

145174
# Load the successor JobDescription
146175
successor = jobStore.load(successorID)
147-
148-
#testresult = checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState)
149176

150177
#We check the requirements of the successor to see if we can run it
151178
#within the current worker
@@ -161,14 +188,19 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS
161188
if successor.preemptable != predecessor.preemptable:
162189
logger.debug("Preemptability is different for the next job, returning to the leader")
163190
return None
164-
# if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1:
165-
if not checkSuccessorReadyToRunMultiplePredecessors(successor, predecessor, jobStore, toilState):
166-
logger.debug("The next job has %i predecessors that are not yet "
167-
"recorded as finished; we must return to the leader.", successor.predecessorNumber)
191+
# if (successor.predecessorNumber - len(successor.predecessorsFinished)) > 1:
192+
if not checkSuccessorReadyToRunMultiplePredecessors(
193+
successor, predecessor, jobStore, toilState
194+
):
195+
logger.debug(
196+
"The next job has %i predecessors that are not yet "
197+
"recorded as finished; we must return to the leader.",
198+
successor.predecessorNumber,
199+
)
168200
logger.debug(successor.predecessorsFinished)
169201
return None
170202
else:
171-
logger.debug('all predecessors are finished, we can chain to the successor')
203+
logger.debug("all predecessors are finished, we can chain to the successor")
172204

173205
if len(successor.services) > 0:
174206
logger.debug("The next job requires services that will not yet be started; we must return to the leader.")
@@ -182,7 +214,15 @@ def nextChainable(predecessor: JobDescription, jobStore: AbstractJobStore, toilS
182214
# Made it through! This job is chainable.
183215
return successor
184216

185-
def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobStoreID: str, parentToilState, redirectOutputToLogFile: bool = True) -> int:
217+
218+
def workerScript(
219+
jobStore: AbstractJobStore,
220+
config: Config,
221+
jobName: str,
222+
jobStoreID: str,
223+
parentToilState: Optional[str],
224+
redirectOutputToLogFile: bool = True,
225+
) -> int:
186226
"""
187227
Worker process script, runs a job.
188228
@@ -191,9 +231,9 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt
191231
:param jobName: The "job name" (a user friendly name) of the job to be run
192232
:param jobStoreID: The job store ID of the job to be run
193233
194-
:param str parentToilState: Pickle containing the parent toilState
234+
:param parentToilState: Pickle containing the parent toilState
195235
196-
:return int: 1 if a job failed, or 0 if all jobs succeeded
236+
:return 1 if a job failed, or 0 if all jobs succeeded
197237
"""
198238

199239
configure_root_logger()
@@ -357,7 +397,7 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt
357397
if parentToilState:
358398
logger.debug(parentToilState)
359399
else:
360-
logger.debug('parentToilState empty')
400+
logger.debug("parentToilState empty")
361401

362402
##########################################
363403
#Connect to the deferred function system
@@ -424,16 +464,14 @@ def workerScript(jobStore: AbstractJobStore, config: Config, jobName: str, jobSt
424464

425465
startTime = time.time()
426466

427-
428-
logger.debug(jobStore)
429-
# Get a snap shot of the current state of the jobs in the jobStore
467+
logger.debug(jobStore)
468+
# Get a snap shot of the current state of the jobs in the jobStore
430469
# - creating a local version of the leader's ToilState
431470
if parentToilState:
432-
toilState = pickle.loads(base64.b64decode(parentToilState.encode('utf-8')))
471+
toilState = pickle.loads(base64.b64decode(parentToilState.encode("utf-8")))
433472
else:
434473
toilState = ToilState(jobStore, jobDesc, jobCache=None)
435-
436-
474+
437475
while True:
438476
##########################################
439477
#Run the job body, if there is one
@@ -725,9 +763,12 @@ def parse_args(args: List[str]) -> argparse.Namespace:
725763
that the worker can then run before/after the job on the batch
726764
system's behalf.""")
727765

728-
parser.add_argument("--toilState", default=None, type=str,
729-
help="""Pickled, base64-encoded copy of the Toul leader's toilState.""")
730-
766+
parser.add_argument(
767+
"--toilState",
768+
default=None,
769+
type=str,
770+
help="""Pickled, base64-encoded copy of the Toul leader's toilState.""",
771+
)
731772

732773
return parser.parse_args(args)
733774

@@ -779,10 +820,11 @@ def main(argv: Optional[List[str]] = None) -> None:
779820
jobStore = Toil.resumeJobStore(options.jobStoreLocator)
780821
config = jobStore.config
781822

782-
783823
with in_contexts(options.context):
784824
# Call the worker
785-
exit_code = workerScript(jobStore, config, options.jobName, options.jobStoreID, options.toilState)
825+
exit_code = workerScript(
826+
jobStore, config, options.jobName, options.jobStoreID, options.toilState
827+
)
786828

787829
# Exit with its return value
788830
sys.exit(exit_code)

0 commit comments

Comments
 (0)