diff --git a/.gitignore b/.gitignore index 299f26c..63f11be 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.pyc splicing_0.1.tar.gz pysplicing-0.1.tar.gz +.project +.pydevproject diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py new file mode 100644 index 0000000..dce0153 --- /dev/null +++ b/misopy/cluster/__init__.py @@ -0,0 +1,455 @@ +''' +misopy.cluster + +Cluster factory for different cluster types + +@author: Aaron Kitzmiller +@copyright: 2016 The Presidents and Fellows of Harvard College. All rights reserved. +@license: GPL v2.0 +@contact: aaron_kitzmiller@harvard.edu +''' + +import os, subprocess, traceback, time + +from misopy.settings import load_settings, Settings +from misopy import misc_utils + + +def getClusterEngine(clustercmd,settings_fname): + ''' + Returns the correct cluster engine + ''' + ce = None + if clustercmd == 'sbatch': + ce = SlurmClusterEngine(settings_fname) + elif clustercmd == 'bsub': + ce = LsfClusterEngine(settings_fname) + elif clustercmd == 'qsub': + ce = SgeClusterEngine(settings_fname) + else: + raise Exception('Unknown cluster command %s' % clustercmd) + + return ce + +class AbstractClusterEngine(object): + ''' + Base class for cluster engines + ''' + def __init__(self,settings_filename): + ''' + Load settings + ''' + self.settings = load_settings(settings_filename) + + def wait_on_jobs(self,job_ids, cluster_cmd, + delay=120.0): + """ + Wait on a set of job IDs. + """ + if len(job_ids) == 0: + return + num_jobs = len(job_ids) + print "Waiting on a set of %d jobs..." %(num_jobs) + curr_time = time.strftime("%x, %X") + t_start = time.time() + print " - Starting to wait at %s" %(curr_time) + completed_jobs = {} + for job_id in job_ids: + if job_id in completed_jobs: + continue + self.wait_on_job(job_id, cluster_cmd) + print " - Job ", job_id, " completed." + completed_jobs[job_id] = True + curr_time = time.strftime("%x, %X") + t_end = time.time() + print "Jobs completed at %s" %(curr_time) + duration = ((t_end - t_start) / 60.) / 60. + print " - Took %.2f hours." %(duration) + + + def wait_on_job(self, job_id, delay): + + raise Exception('Must implement wait on job') + + +class LsfClusterEngine(AbstractClusterEngine): + ''' + Run jobs on an LSF cluster + ''' + + def make_bash_script(self,filename, cmd, crate_dir=None): + """ + Make an executable bash script out of the given command. + """ + # os.system('ls %s' %(filename)) + if crate_dir == None: + crate_dir = \ + os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) + f = open(filename, 'w') + f.write("#!/bin/bash\n") + f.write("export PATH=$PATH:%s\n" %(crate_dir)) + f.write("source ~/.bash_profile\n") + f.write("cd %s\n" %(crate_dir)) + #write_cluster_preface(f) + f.write(cmd + "\n") + f.close() + os.system('chmod +x \"%s\"' %(filename)) + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None): + ''' + Composes job script and launches job + ''' + print "Submitting job: %s" %(job_name) + queue_name = None + + # Load command name from settings file + cmd_name = self.settings.get_cluster_command() + + if queue_type == "long": + queue_name = self.settings.get_long_queue_name() + elif queue_type == "short": + queue_name = self.settings.get_short_queue_name() + else: + print "Warning: Unknown queue type: %s" %(queue_type) + queue_name = queue_type + + if queue_type is None: + print " - queue type: unspecified" + else: + print " - queue type: %s" %(queue_type) + if queue_name is None: + print " - queue name unspecified" + else: + print " - queue name: %s" %(queue_name) + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'bsub -o \"%s\" -e \"%s\"' %(scripts_output_dir, + scripts_output_dir) + # Add queue type if given one + if queue_name != None: + cluster_call += ' -q \"%s\"' %(queue_name) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=60): + # Handle bsub + while True: + output = subprocess.Popen("bjobs %i" %(job_id), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() + if len(output[0]) > 0: + status = output[0].split()[10] + if status == "DONE": + break + else: + # No jobs available + break + time.sleep(delay) + time.sleep(delay) + + + + def launch_job(self, cluster_cmd): + """ + Execute cluster_cmd and return its job ID if + it can be fetched. + """ + print "Executing: %s" %(cluster_cmd) + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output = proc.communicate() + job_id = None + if "is submitted to" in output[0]: + job_id = int(output[0].strip().split()[1][1:-1]) + return job_id + + + +class SgeClusterEngine(AbstractClusterEngine): + ''' + Run jobs on an SGE cluster + ''' + + def make_bash_script(self,filename, cmd, crate_dir=None): + """ + Make an executable bash script out of the given command. + """ + # os.system('ls %s' %(filename)) + if crate_dir == None: + crate_dir = \ + os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) + f = open(filename, 'w') + f.write("#!/bin/bash\n") + f.write("export PATH=$PATH:%s\n" %(crate_dir)) + f.write("source ~/.bash_profile\n") + f.write("cd %s\n" %(crate_dir)) + #write_cluster_preface(f) + f.write(cmd + "\n") + f.close() + os.system('chmod +x \"%s\"' %(filename)) + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None): + ''' + Composes job script and launches job + ''' + print "Submitting job: %s" %(job_name) + queue_name = None + + # Load command name from settings file + cmd_name = self.settings.get_cluster_command() + + if queue_type == "long": + queue_name = self.settings.get_long_queue_name() + elif queue_type == "short": + queue_name = self.settings.get_short_queue_name() + else: + print "Warning: Unknown queue type: %s" %(queue_type) + queue_name = queue_type + + if queue_type is None: + print " - queue type: unspecified" + else: + print " - queue type: %s" %(queue_type) + if queue_name is None: + print " - queue name unspecified" + else: + print " - queue name: %s" %(queue_name) + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'qsub -o \"%s\" -e \"%s\"' %(scripts_output_dir, + scripts_output_dir) + # Add queue type if given one + if queue_name != None: + cluster_call += ' -q \"%s\"' %(queue_name) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=60): + # Handle qsub + while True: + output = \ + subprocess.Popen("qstat %i" %(job_id), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() + if "Unknown Job" in output[1]: + break + time.sleep(delay) + time.sleep(delay) + + + + def launch_job(self, cluster_cmd): + """ + Execute cluster_cmd and return its job ID if + it can be fetched. + """ + print "Executing: %s" %(cluster_cmd) + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output = proc.communicate() + job_id = None + if "." in output[0][:-1] and ">" not in output[0]: + job_id = int(output[0].split(".")[0]) + return job_id + + + +class SlurmClusterEngine(AbstractClusterEngine): + ''' + Run jobs on a Slurm cluster + ''' + + def __init__(self,settings_filename): + ''' + Get the slurm job template file and load it + ''' + super(SlurmClusterEngine,self).__init__(settings_filename) + + if not 'slurm_template' in self.settings: + raise Exception('slurm_template must be defined in settings to use Slurm') + + self.squeue_max_attempts = 10 + if 'squeue_max_attempts' in self.settings: + self.squeue_max_attempts = int(self.settings['squeue_max_attempts']) + + template_filename = self.settings['slurm_template'] + if not os.path.exists(template_filename): + raise Exception('Cannot find slurm template file %s' % template_filename) + + with open(template_filename,'r') as f: + self.template = f.read() + + if self.template.strip() == '': + raise Exception('slurm template file %s is empty' % template_filename) + + + + def make_bash_script(self,script_name,cmd): + ''' + Use the template to write out a sbatch submission script + ''' + scripttxt = self.template.format(cmd=cmd) + with open(script_name,'w') as script: + script.write(scripttxt + '\n') + + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None): + ''' + Composes job script and launches job + ''' + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'sbatch -D \"%s\"' %(scripts_output_dir) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=10): + ''' + Wait until job is done. Uses squeue first, then sacct. + Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. + Max attempts is needed to ensure that squeue information is available. + ''' + squeue_cmd = 'squeue --noheader --format %%T -j %d' % job_id + sacct_cmd = 'sacct --noheader --format State -j %d.batch' % job_id + + done = False + squeue_attempts = 0 + state = None + + while not done: + + # If we've tried squeue_max_attempts and gotten no information, then quit + squeue_attempts += 1 + if squeue_attempts == self.squeue_max_attempts and state is None: + raise Exception('Attempted to query squeue /sacct %d times and retrieved no result' % squeue_attempts) + + proc = subprocess.Popen(squeue_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('squeue command %s failed: %s' % (squeue_cmd,err)) + + if output.strip() != '': + state = output.strip() + else: + # Try sacct. The job may be done and so disappeared from squeue + proc = subprocess.Popen(sacct_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('sacct command %s failed: %s' % (sacct_cmd,err)) + + if output.strip() != '': + state = output.strip() + + if state is not None: + if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: + done = True + print state + + time.sleep(10) + + + + def launch_job(self, cluster_cmd): + ''' + Runs cluster command and returns the job id + ''' + print 'Running command %s' % cluster_cmd + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output,err = proc.communicate() + if proc.returncode != 0 or err.strip() != '': + raise Exception('Error launching job with %s: %s' % (cluster_cmd,err)) + + job_id = output.strip().replace('Submitted batch job ','') + try: + job_id = int(job_id) + except Exception as e: + raise Exception('Returned job id %s is not a number ?!?!' % job_id) + + return job_id + \ No newline at end of file diff --git a/misopy/cluster/slurm_template.txt b/misopy/cluster/slurm_template.txt new file mode 100644 index 0000000..e3b4669 --- /dev/null +++ b/misopy/cluster/slurm_template.txt @@ -0,0 +1,13 @@ +#!/bin/bash + +#SBATCH -p serial_requeue +#SBATCH --mem 4000 +#SBATCH -t 0-1:00 +#SBATCH -n 1 +#SBATCH -N 1 + +source new-modules.sh +module load python/2.7.6-fasrc01 + +{cmd} + diff --git a/misopy/miso.py b/misopy/miso.py index a062518..82cc0e1 100644 --- a/misopy/miso.py +++ b/misopy/miso.py @@ -22,6 +22,7 @@ from misopy.settings import Settings, load_settings from misopy.settings import miso_path as miso_settings_path import misopy.cluster_utils as cluster_utils +from misopy.cluster import getClusterEngine miso_path = os.path.dirname(os.path.abspath(__file__)) manual_url = "http://genes.mit.edu/burgelab/miso/docs/" @@ -245,6 +246,7 @@ def run(self, delay_constant=0.9): chunk=self.chunk_jobs) # End SGE case return + # All cluster jobs cluster_jobs = [] for batch_num, cmd_info in enumerate(all_miso_cmds): @@ -265,6 +267,12 @@ def run(self, delay_constant=0.9): print " - Submitted thread %s" %(thread_id) self.threads[thread_id] = p else: + # Setup cluster engine + Settings.load(self.settings_fname) + clustercmd = Settings.get_cluster_command() + + self.cluster_engine = getClusterEngine(clustercmd,self.settings_fname) + # Run on cluster if batch_size >= self.long_thresh: queue_type = "long" @@ -274,11 +282,10 @@ def run(self, delay_constant=0.9): job_name = "gene_psi_batch_%d" %(batch_num) print "Submitting to cluster: %s" %(cmd_to_run) job_id = \ - cluster_utils.run_on_cluster(cmd_to_run, + self.cluster_engine.run_on_cluster(cmd_to_run, job_name, self.output_dir, - queue_type=queue_type, - settings_fname=self.settings_fname) + queue_type=queue_type) if job_id is not None: cluster_jobs.append(job_id) time.sleep(delay_constant) @@ -297,7 +304,7 @@ def run(self, delay_constant=0.9): "system.") # Try to wait on jobs no matter what; though if 'cluster_jobs' # is empty here, it will not wait - cluster_utils.wait_on_jobs(cluster_jobs, + self.cluster_engine.wait_on_jobs(cluster_jobs, self.cluster_cmd) else: if self.use_cluster: @@ -348,7 +355,8 @@ def compute_all_genes_psi(gff_dir, bam_filename, read_len, job_name="misojob", num_proc=None, prefilter=False, - wait_on_jobs=True): + wait_on_jobs=True, + cluster_type=None): """ Compute Psi values for genes using a GFF and a BAM filename. @@ -574,6 +582,7 @@ def main(): if options.overhang_len != None: overhang_len = options.overhang_len + # Whether to wait on cluster jobs or not wait_on_jobs = not options.no_wait compute_all_genes_psi(gff_filename, bam_filename, diff --git a/misopy/run_events_analysis.py b/misopy/run_events_analysis.py index 09e0447..1ec78ec 100644 --- a/misopy/run_events_analysis.py +++ b/misopy/run_events_analysis.py @@ -19,6 +19,7 @@ from misopy.settings import Settings, load_settings from misopy.settings import miso_path as miso_settings_path import misopy.cluster_utils as cluster_utils +from misopy.miso_sampler import get_logger miso_path = os.path.dirname(os.path.abspath(__file__)) manual_url = "http://genes.mit.edu/burgelab/miso/docs/" @@ -168,11 +169,11 @@ def check_gff_and_bam(gff_dir, bam_filename, main_logger, if bam_starts_with_chr != gff_starts_with_chr: mismatch_found = True if mismatch_found: - miso_logger.warning("It looks like your GFF annotation file and your BAM " \ + main_logger.warning("It looks like your GFF annotation file and your BAM " \ "file might not have matching headers (chromosome names.) " \ "If this is the case, your run will fail as no reads from " \ "the BAM could be matched up with your annotation.") - miso_logger.warning("Please see:\n\t%s\n for more information." %(manual_url)) + main_logger.warning("Please see:\n\t%s\n for more information." %(manual_url)) # Default: assume BAM starts with chr headers chr_containing = "BAM file (%s)" %(bam_filename) not_chr_containing = "GFF annotation (%s)" %(gff_dir) @@ -180,16 +181,16 @@ def check_gff_and_bam(gff_dir, bam_filename, main_logger, # BAM does not start with chr, GFF does chr_containing, not_chr_containing = \ not_chr_containing, chr_containing - miso_logger.warning("It looks like your %s contains \'chr\' chromosomes (UCSC-style) " \ + main_logger.warning("It looks like your %s contains \'chr\' chromosomes (UCSC-style) " \ "while your %s does not." %(chr_containing, not_chr_containing)) - miso_logger.warning("The first few BAM chromosomes were: %s" \ + main_logger.warning("The first few BAM chromosomes were: %s" \ %(",".join(bam_chroms.keys()))) print "BAM references: " print bam_file.references - miso_logger.warning("The first few GFF chromosomes were: %s" \ + main_logger.warning("The first few GFF chromosomes were: %s" \ %(",".join(gff_chroms.keys()))) - miso_logger.warning("Run is likely to fail or produce empty output. Proceeding " \ + main_logger.warning("Run is likely to fail or produce empty output. Proceeding " \ "anyway...") time.sleep(15) diff --git a/misopy/settings/miso_settings.txt b/misopy/settings/miso_settings.txt index d9adfa0..1ca01a4 100644 --- a/misopy/settings/miso_settings.txt +++ b/misopy/settings/miso_settings.txt @@ -3,7 +3,8 @@ filter_results = True min_event_reads = 20 [cluster] -cluster_command = qsub +cluster_command = sbatch +slurm_template = /home/akitzmiller/workspace/MISO/misopy/cluster/slurm_template.txt [sampler] burn_in = 500 diff --git a/setup.py b/setup.py index 8dbb48e..4ce2fde 100644 --- a/setup.py +++ b/setup.py @@ -101,6 +101,7 @@ # Tell distutils to look for pysplicing in the right directory package_dir = {'pysplicing': 'pysplicing/pysplicing'}, packages = ['misopy', + 'misopy.cluster', 'misopy.sashimi_plot', 'misopy.sashimi_plot.plot_utils', 'pysplicing'],