Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Bioluigi definitions for RSEM tasks #70

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions rnaseq_pipeline/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,6 @@
from requests.auth import HTTPBasicAuth
from .gemma import GemmaApi

class RsemReference(luigi.Target):
"""
Represents the target of rsem-prepare-reference script.
"""
def __init__(self, prefix, taxon):
self.prefix = prefix
self.taxon = taxon

def exists(self):
exts = ['grp', 'ti', 'seq', 'chrlist']
return all(exists(join(self.prefix, '{}_0.{}'.format(self.taxon, ext)))
for ext in exts)

class GemmaDatasetPlatform(luigi.Target):
"""
Represents a platform associated to a Gemma dataset.
Expand Down
126 changes: 37 additions & 89 deletions rnaseq_pipeline/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import requests
import yaml
from bioluigi.scheduled_external_program import ScheduledExternalProgramTask
from bioluigi.tasks import fastqc, multiqc
from bioluigi.tasks import fastqc, multiqc, rsem
from bioluigi.tasks.utils import DynamicTaskWithOutputMixin, TaskWithOutputMixin, DynamicWrapperTask
from luigi.task import flatten, flatten_output, WrapperTask
from luigi.util import requires
Expand All @@ -22,7 +22,7 @@
from .sources.geo import DownloadGeoSample, DownloadGeoSeries, ExtractGeoSeriesBatchInfo
from .sources.local import DownloadLocalSample, DownloadLocalExperiment
from .sources.sra import DownloadSraProject, DownloadSraExperiment, ExtractSraProjectBatchInfo
from .targets import GemmaDatasetPlatform, GemmaDatasetFactor, RsemReference
from .targets import GemmaDatasetPlatform, GemmaDatasetFactor
from .utils import no_retry, IlluminaFastqHeader, TaskWithPriorityMixin, RerunnableTaskMixin, remove_task_output
from .gemma import GemmaTask

Expand Down Expand Up @@ -179,110 +179,58 @@ def run(self):
for dst in download_sample_tasks]

@no_retry
class PrepareReference(ScheduledExternalProgramTask):
"""
Prepare a STAR/RSEM reference.

:param taxon: Taxon
:param reference_id: Reference annotation build to use (i.e. ensembl98, hg38_ncbi)
@requires(TrimSample)
class AlignSample(luigi.Task):
"""
taxon = luigi.Parameter(default='human')
reference_id = luigi.Parameter(default='hg38_ncbi')

cpus = 16
memory = 32

def input(self):
genome_dir = join(cfg.OUTPUT_DIR, cfg.GENOMES, self.reference_id)
gtf_files = glob(join(genome_dir, '*.gtf'))
fasta_files = glob(join(genome_dir, '*.fa'))
if len(gtf_files) != 1:
raise ValueError('Only one GTF file is expected in {}.'.format(genome_dir))
return [luigi.LocalTarget(gtf_files[0]),
[luigi.LocalTarget(f) for f in fasta_files]]

def program_args(self):
gtf, genome_fasta = self.input()
args = [join(cfg.RSEM_DIR, 'rsem-prepare-reference')]

args.extend(['--gtf', gtf.path])
Prepare a STAR/RSEM reference and align the sample against it.

args.extend([
'--star',
'-p', self.cpus])

args.extend([t.path for t in genome_fasta])

args.append(join(self.output().prefix))

return args

def run(self):
os.makedirs(os.path.dirname(self.output().prefix), exist_ok=True)
return super().run()

def output(self):
return RsemReference(join(cfg.OUTPUT_DIR, cfg.REFERENCES, self.reference_id), self.taxon)

@no_retry
@requires(TrimSample, PrepareReference)
class AlignSample(ScheduledExternalProgramTask):
"""
The output of the task is a pair of isoform and gene quantification results
processed by STAR and RSEM.

:attr strand_specific: Indicate if the RNA-Seq data is stranded
"""
taxon = luigi.Parameter(default='human',positional=False, description='Taxon')
reference_id = luigi.Parameter(default='hg38_ncbi', positional=False, description='Reference annotation build to use (i.e. ensembl98, hg38_ncbi)')

# TODO: handle strand-specific reads
strand_specific = luigi.BoolParameter(default=False, positional=False)
strand_specific = luigi.BoolParameter(default=False, positional=False, description='Indicate if the RNA-Seq data is strand-specific.')

scope = luigi.Parameter(default='genes', positional=False)

cpus = 8
memory = 32
walltime = datetime.timedelta(days=1)

# cleanup unused shared memory objects before and after the task is run
scheduler_extra_args = ['--task-prolog', abspath(cfg.STAR_CLEANUP_SCRIPT), '--task-epilog', abspath(cfg.STAR_CLEANUP_SCRIPT)]

def run(self):
self.output().makedirs()
return super().run()

def _get_output_prefix(self):
return join(cfg.OUTPUT_DIR, cfg.ALIGNDIR, self.reference_id, self.experiment_id, self.sample_id)

def program_args(self):
args = [join(cfg.RSEM_DIR, 'rsem-calculate-expression'), '-p', self.cpus]

args.extend([
'--time',
'--star',
'--star-gzipped-read-file',
'--no-bam-output'])

if self.strand_specific:
args.append('--strand-specific')

sample_run, reference = self.input()

def run(self):
sample_run = self.input()
genome_dir = join(cfg.OUTPUT_DIR, cfg.GENOMES, self.reference_id)
gtf_files = glob(join(genome_dir, '*.gtf'))
fasta_files = glob(join(genome_dir, '*.fna'))
reference_name = join(cfg.OUTPUT_DIR, cfg.REFERENCES, self.reference_id, f'{self.taxon}_0')
fastqs = [mate.path for mate in sample_run]
sample_name = self._get_output_prefix()
extra_args = ['--time', '--no-bam-output']
if self.strand_specific:
extra_args.append('--strand-specific')

if len(fastqs) == 1:
args.append(fastqs[0])
elif len(fastqs) == 2:
args.append('--paired-end')
args.extend(fastqs)
else:
raise NotImplementedError('Alignment of more than two input FASTQs is not supported.')
if len(fasta_files) < 1:
raise ValueError('At least one FASTA file is expected in {}.'.format(genome_dir))

# reference for alignments and quantifications
args.append(join(reference.prefix, '{}_0'.format(self.taxon)))
if len(gtf_files) != 1:
raise ValueError('Exactly one GTF file is expected in {}.'.format(genome_dir))

# output prefix
args.append(self._get_output_prefix())
self.output().makedirs()

return args
yield rsem.CalculateExpression(
gtf_files[0],
fasta_files,
reference_name,
fastqs,
sample_name,
aligner='star',
extra_args=extra_args,
cpus=8,
memory=32,
walltime=datetime.timedelta(days=1),
# cleanup unused shared memory objects before and after the task is run
scheduler_extra_args=['--task-prolog', abspath(cfg.STAR_CLEANUP_SCRIPT), '--task-epilog', abspath(cfg.STAR_CLEANUP_SCRIPT)])

def output(self):
return luigi.LocalTarget(self._get_output_prefix() + f'.{self.scope}.results')
Expand Down
5 changes: 4 additions & 1 deletion tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def test_platform_retrieval_by_name_when_unknown_instrument():
def test_align_sample_task():
task = AlignSample('GSE', 'GSM', reference_id='hg38_ncbi', scope='genes')
assert task.output().path == join(cfg.OUTPUT_DIR, cfg.ALIGNDIR, 'hg38_ncbi', 'GSE', 'GSM.genes.results')
assert task.walltime == datetime.timedelta(days=1)
yielded_calculate_expression_task = next(task.run())
assert yielded_calculate_expression_task.cpus == 8
assert yielded_calculate_expression_task.memory == 32
assert yielded_calculate_expression_task.walltime == datetime.timedelta(days=1)

def test_gemma_task():
gemma_task = GemmaTask('GSE110256')
Expand Down