Skip to content

Commit c932df0

Browse files
committed
Merge remote-tracking branch 'origin/master' into jobstack
2 parents a4ac602 + 2cc02d3 commit c932df0

31 files changed

+729
-169
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ To be copied to the [draft changelog](https://github.com/DataBiosphere/toil/wiki
1919
* [ ] New functions without [type hints](https://docs.python.org/3/library/typing.html).
2020
* [ ] New functions or classes without informative docstrings.
2121
* [ ] Changes to semantics not reflected in the relevant docstrings.
22-
* [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/cliOptions.rst`
22+
* [ ] New or changed command line options for Toil workflows that are not reflected in `docs/running/{cliOptions,cwl,wdl}.rst`
2323
* [ ] New features without tests.
2424
* [ ] Comment on the lines of code where problems exist with a review comment. You can shift-click the line numbers in the diff to select multiple lines.
2525
* [ ] Finish the review with an overall description of your opinion.

docs/appendices/environment_vars.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,5 +174,9 @@ There are several environment variables that affect the way Toil runs.
174174
| | in the Toil Docker container to use as a mirror |
175175
| | for Docker Hub. |
176176
+----------------------------------+----------------------------------------------------+
177+
| OMP_NUM_THREADS | The number of cores set for OpenMP applications in |
178+
| | the workers. If not set, Toil will use the number |
179+
| | of job threads. |
180+
+----------------------------------+----------------------------------------------------+
177181

178182
.. _standard temporary directory: https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir

docs/running/cwl.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ printed to the stdout stream after workflow execution.
9595
``--stats``: Save resources usages in json files that can be collected with the
9696
``toil stats`` command after the workflow is done.
9797

98+
``--disable-streaming``: Does not allow streaming of input files. This is enabled
99+
by default for files marked with ``streamable`` flag True and only for remote files
100+
when the jobStore is not on local machine.
101+
98102
Running CWL in the Cloud
99103
------------------------
100104

setup.py

100644100755
Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
from setuptools import find_packages, setup
1919

2020

21+
cwltool_version = '3.1.20210816212154'
22+
23+
2124
def run_setup():
2225
"""
2326
Calls setup(). This function exists so the setup() invocation preceded more internal
@@ -33,7 +36,7 @@ def run_setup():
3336
gcs = 'google-cloud-storage==1.6.0'
3437
gcs_oauth2_boto_plugin = 'gcs_oauth2_boto_plugin==1.14'
3538
apacheLibcloud = 'apache-libcloud==2.2.1'
36-
cwltool = 'cwltool==3.1.20210616134059'
39+
cwltool = f'cwltool=={cwltool_version}'
3740
galaxyToolUtil = 'galaxy-tool-util'
3841
htcondor = 'htcondor>=8.6.0'
3942
kubernetes = 'kubernetes>=12.0.1, <13'
@@ -168,7 +171,10 @@ def import_version():
168171
# Use the template to generate src/toil/version.py
169172
import version_template
170173
with NamedTemporaryFile(mode='w', dir='src/toil', prefix='version.py.', delete=False) as f:
171-
f.write(version_template.expand_())
174+
f.write(version_template.expand_(others={
175+
# expose the dependency versions that we may need to access in Toil
176+
'cwltool_version': cwltool_version,
177+
}))
172178
os.rename(f.name, 'src/toil/version.py')
173179

174180
# Unfortunately, we can't use a straight import here because that would also load the stuff

src/toil/batchSystems/abstractBatchSystem.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,13 @@ def setUserScript(self, userScript):
110110
raise NotImplementedError()
111111

112112
@abstractmethod
113-
def issueBatchJob(self, jobDesc):
113+
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
114114
"""
115115
Issues a job with the specified command to the batch system and returns a unique jobID.
116116
117117
:param jobDesc a toil.job.JobDescription
118+
:param job_environment: a collection of job-specific environment variables
119+
to be set on the worker.
118120
119121
:return: a unique jobID that can be used to reference the newly issued job
120122
:rtype: int
@@ -443,6 +445,7 @@ def shutdownLocal(self): # type: () -> None
443445
"""To be called from shutdown()"""
444446
self.localBatch.shutdown()
445447

448+
446449
class BatchSystemCleanupSupport(BatchSystemLocalSupport):
447450
"""
448451
Adds cleanup support when the last running job leaves a node, for batch

src/toil/batchSystems/abstractGridEngineBatchSystem.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from datetime import datetime
1818
from queue import Empty, Queue
1919
from threading import Lock, Thread
20-
from typing import Any, List, Union
20+
from typing import Any, List, Dict, Union, Optional
2121

2222
from toil.batchSystems.abstractBatchSystem import (BatchJobExitReason,
2323
BatchSystemCleanupSupport,
@@ -107,10 +107,10 @@ def createJobs(self, newJob: Any) -> bool:
107107
while len(self.waitingJobs) > 0 and \
108108
len(self.runningJobs) < int(self.boss.config.maxLocalJobs):
109109
activity = True
110-
jobID, cpu, memory, command, jobName = self.waitingJobs.pop(0)
110+
jobID, cpu, memory, command, jobName, environment = self.waitingJobs.pop(0)
111111

112112
# prepare job submission command
113-
subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName)
113+
subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName, environment)
114114
logger.debug("Running %r", subLine)
115115
batchJobID = self.boss.with_retries(self.submitJob, subLine)
116116
logger.debug("Submitted job %s", str(batchJobID))
@@ -255,29 +255,35 @@ def run(self):
255255
logger.error("GridEngine like batch system failure", exc_info=ex)
256256
raise
257257

258-
@abstractmethod
259258
def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list:
260259
"""
261260
Returns exit codes for a list of jobs.
262261
Implementation-specific; called by
263262
AbstractGridEngineWorker.checkOnJobs()
264-
:param string batchjobIDList: List of batch system job ID
263+
:param string batch_job_id_list: List of batch system job ID
265264
"""
266265
raise NotImplementedError()
267266

268267
@abstractmethod
269-
def prepareSubmission(self, cpu, memory, jobID, command, jobName):
268+
def prepareSubmission(self,
269+
cpu: int,
270+
memory: int,
271+
jobID: int,
272+
command: str,
273+
jobName: str,
274+
job_environment: Optional[Dict[str, str]] = None) -> List[str]:
270275
"""
271276
Preparation in putting together a command-line string
272277
for submitting to batch system (via submitJob().)
273278
274-
:param: string cpu
275-
:param: string memory
276-
:param: string jobID : Toil job ID
279+
:param: int cpu
280+
:param: int memory
281+
:param: int jobID: Toil job ID
277282
:param: string subLine: the command line string to be called
278283
:param: string jobName: the name of the Toil job, to provide metadata to batch systems if desired
284+
:param: dict job_environment: the environment variables to be set on the worker
279285
280-
:rtype: string
286+
:rtype: List[str]
281287
"""
282288
raise NotImplementedError()
283289

@@ -354,7 +360,7 @@ def supportsWorkerCleanup(cls):
354360
def supportsAutoDeployment(cls):
355361
return False
356362

357-
def issueBatchJob(self, jobDesc):
363+
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
358364
# Avoid submitting internal jobs to the batch queue, handle locally
359365
localID = self.handleLocalJob(jobDesc)
360366
if localID:
@@ -363,7 +369,8 @@ def issueBatchJob(self, jobDesc):
363369
self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk)
364370
jobID = self.getNextJobID()
365371
self.currentJobs.add(jobID)
366-
self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName))
372+
self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.jobName,
373+
job_environment))
367374
logger.debug("Issued the job command: %s with job id: %s and job name %s", jobDesc.command, str(jobID),
368375
jobDesc.jobName)
369376
return jobID

src/toil/batchSystems/gridengine.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import os
1717
import time
1818
from pipes import quote
19+
from typing import Optional, List, Dict
1920

2021
from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem
2122
from toil.lib.misc import CalledProcessErrorStderr, call_command
@@ -48,8 +49,14 @@ def getRunningJobIDs(self):
4849
def killJob(self, jobID):
4950
call_command(['qdel', self.getBatchSystemID(jobID)])
5051

51-
def prepareSubmission(self, cpu, memory, jobID, command, jobName):
52-
return self.prepareQsub(cpu, memory, jobID) + [command]
52+
def prepareSubmission(self,
53+
cpu: int,
54+
memory: int,
55+
jobID: int,
56+
command: str,
57+
jobName: str,
58+
job_environment: Optional[Dict[str, str]] = None):
59+
return self.prepareQsub(cpu, memory, jobID, job_environment) + [command]
5360

5461
def submitJob(self, subLine):
5562
stdout = call_command(subLine)
@@ -94,14 +101,22 @@ def getJobExitCode(self, sgeJobID):
94101
"""
95102
Implementation-specific helper methods
96103
"""
97-
def prepareQsub(self, cpu: int, mem: int, jobID: int) -> List[str]:
104+
def prepareQsub(self,
105+
cpu: int,
106+
mem: int,
107+
jobID: int,
108+
job_environment: Optional[Dict[str, str]] = None) -> List[str]:
98109
qsubline = ['qsub', '-V', '-b', 'y', '-terse', '-j', 'y', '-cwd',
99110
'-N', 'toil_job_' + str(jobID)]
100111

101-
if self.boss.environment:
112+
environment = self.boss.environment.copy()
113+
if job_environment:
114+
environment.update(job_environment)
115+
116+
if environment:
102117
qsubline.append('-v')
103118
qsubline.append(','.join(k + '=' + quote(os.environ[k] if v is None else v)
104-
for k, v in self.boss.environment.items()))
119+
for k, v in environment.items()))
105120

106121
reqline = list()
107122
sgeArgs = os.getenv('TOIL_GRIDENGINE_ARGS')

src/toil/batchSystems/htcondor.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import math
1717
import os
1818
import time
19-
from typing import Any
19+
from typing import Any, Optional, Dict
2020

2121
import htcondor
2222

@@ -277,12 +277,12 @@ def connectSchedd(self):
277277
return schedd
278278

279279
def getEnvString(self):
280-
'''Build an environment string that a HTCondor Submit object can use.
280+
"""
281+
Build an environment string that a HTCondor Submit object can use.
281282
282283
For examples of valid strings, see:
283284
http://research.cs.wisc.edu/htcondor/manual/current/condor_submit.html#man-condor-submit-environment
284-
285-
'''
285+
"""
286286

287287
env_items = []
288288
if self.boss.environment:
@@ -302,7 +302,7 @@ def getEnvString(self):
302302
return '"' + ' '.join(env_items) + '"'
303303

304304
# Override the issueBatchJob method so HTCondor can be given the disk request
305-
def issueBatchJob(self, jobNode):
305+
def issueBatchJob(self, jobNode, job_environment: Optional[Dict[str, str]] = None):
306306
# Avoid submitting internal jobs to the batch queue, handle locally
307307
localID = self.handleLocalJob(jobNode)
308308
if localID:
@@ -313,6 +313,7 @@ def issueBatchJob(self, jobNode):
313313
self.currentJobs.add(jobID)
314314

315315
# Add the jobNode.disk and jobNode.jobName to the job tuple
316-
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command))
316+
self.newJobsQueue.put((jobID, jobNode.cores, jobNode.memory, jobNode.disk, jobNode.jobName, jobNode.command,
317+
job_environment))
317318
logger.debug("Issued the job command: %s with job id: %s ", jobNode.command, str(jobID))
318319
return jobID

src/toil/batchSystems/kubernetes.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import tempfile
3333
import time
3434
import uuid
35+
from typing import Optional, Dict
3536

3637
import kubernetes
3738
import pytz
@@ -362,15 +363,23 @@ def _create_affinity(self, preemptable: bool) -> kubernetes.client.V1Affinity:
362363
# Make the node affinity into an overall affinity
363364
return kubernetes.client.V1Affinity(node_affinity=node_affinity)
364365

365-
def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSpec:
366+
def _create_pod_spec(
367+
self,
368+
jobDesc: JobDescription,
369+
job_environment: Optional[Dict[str, str]] = None
370+
) -> kubernetes.client.V1PodSpec:
366371
"""
367372
Make the specification for a pod that can execute the given job.
368373
"""
369374

375+
environment = self.environment.copy()
376+
if job_environment:
377+
environment.update(job_environment)
378+
370379
# Make a job dict to send to the executor.
371380
# First just wrap the command and the environment to run it in
372381
job = {'command': jobDesc.command,
373-
'environment': self.environment.copy()}
382+
'environment': environment}
374383
# TODO: query customDockerInitCmd to respect TOIL_CUSTOM_DOCKER_INIT_COMMAND
375384

376385
if self.userScript is not None:
@@ -456,8 +465,7 @@ def _create_pod_spec(self, jobDesc: JobDescription) -> kubernetes.client.V1PodSp
456465

457466
return pod_spec
458467

459-
460-
def issueBatchJob(self, jobDesc):
468+
def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None):
461469
# TODO: get a sensible self.maxCores, etc. so we can checkResourceRequest.
462470
# How do we know if the cluster will autoscale?
463471

@@ -473,7 +481,7 @@ def issueBatchJob(self, jobDesc):
473481
self.checkResourceRequest(jobDesc.memory, jobDesc.cores, jobDesc.disk)
474482

475483
# Make a pod that describes running the job
476-
pod_spec = self._create_pod_spec(jobDesc)
484+
pod_spec = self._create_pod_spec(jobDesc, job_environment=job_environment)
477485

478486
# Make a batch system scope job ID
479487
jobID = self.getNextJobID()

src/toil/batchSystems/lsf.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import subprocess
2626
from datetime import datetime
2727
from random import randint
28-
from typing import List, Union
28+
from typing import Union, Optional, List, Dict
2929

3030
from dateutil.parser import parse
3131
from dateutil.tz import tzlocal
@@ -85,12 +85,23 @@ def fallbackRunningJobIDs(self, currentjobs):
8585
def killJob(self, jobID):
8686
call_command(['bkill', self.getBatchSystemID(jobID)])
8787

88-
def prepareSubmission(self, cpu, memory, jobID, command, jobName):
89-
return self.prepareBsub(cpu, memory, jobID) + [command]
88+
def prepareSubmission(self,
89+
cpu: int,
90+
memory: int,
91+
jobID: int,
92+
command: str,
93+
jobName: str,
94+
job_environment: Optional[Dict[str, str]] = None):
95+
return (self.prepareBsub(cpu, memory, jobID) + [command],
96+
job_environment) # pass job_environment to .submitJob()
9097

9198
def submitJob(self, subLine):
99+
subLine, job_environment = subLine
92100
combinedEnv = self.boss.environment
93101
combinedEnv.update(os.environ)
102+
if job_environment:
103+
combinedEnv.update(job_environment)
104+
94105
stdout = call_command(subLine, env=combinedEnv)
95106
# Example success: Job <39605914> is submitted to default queue <general>.
96107
# Example fail: Service class does not exist. Job not submitted.
@@ -102,7 +113,7 @@ def submitJob(self, subLine):
102113
else:
103114
logger.error("Could not submit job\nReason: {}".format(stdout))
104115
temp_id = randint(10000000, 99999999)
105-
#Flag this job to be handled by getJobExitCode
116+
# Flag this job to be handled by getJobExitCode
106117
result = "NOT_SUBMITTED_{}".format(temp_id)
107118
return result
108119

@@ -229,7 +240,6 @@ def parse_bjobs_record(self, bjobs_record: dict, job: int) -> Union[int, None]:
229240

230241
return self.getJobExitCodeBACCT(job)
231242

232-
233243
def getJobExitCodeBACCT(self,job):
234244
# if not found in bjobs, then try bacct (slower than bjobs)
235245
logger.debug("bjobs failed to detect job - trying bacct: "
@@ -318,7 +328,7 @@ def prepareBsub(self, cpu: int, mem: int, jobID: int) -> List[str]:
318328
bsubline.extend(lsfArgs.split())
319329
return bsubline
320330

321-
def parseBjobs(self,bjobs_output_str):
331+
def parseBjobs(self, bjobs_output_str):
322332
"""
323333
Parse records from bjobs json type output
324334
params:

0 commit comments

Comments
 (0)