From bb462a63ef4b3c5abc6a6e901cc1c6464ac6595f Mon Sep 17 00:00:00 2001 From: Vuk Manojlovic Date: Tue, 21 May 2024 14:04:17 +0200 Subject: [PATCH] CTX-5939: Changed qiime multithreading to allow for exact thread count for command executions --- coretex/bioinformatics/ctx_qiime2/__init__.py | 71 ++++++++++++------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/coretex/bioinformatics/ctx_qiime2/__init__.py b/coretex/bioinformatics/ctx_qiime2/__init__.py index fb4df077..ade31333 100644 --- a/coretex/bioinformatics/ctx_qiime2/__init__.py +++ b/coretex/bioinformatics/ctx_qiime2/__init__.py @@ -18,6 +18,8 @@ from typing import Optional, Union from pathlib import Path +import os + from .utils import compressGzip, createSample, getDemuxSamples, getDenoisedSamples, \ getFastqDPSamples, getFastqMPSamples, getImportedSamples, getMetadata, getPhylogeneticTreeSamples, \ isDemultiplexedSample, isDenoisedSample, isFastqDPSample, isFastqMPSample, \ @@ -89,8 +91,6 @@ def demuxEmpPaired( Output path to the resulting feature table errorCorretctionDetailsPath : Union[str, Path] Output path to the statistics of the denoising - multithreading : bool - Whether to use multithreading. One thread per CPU core """ if isinstance(sequencesPath, Path): @@ -131,7 +131,7 @@ def dada2DenoiseSingle( representativeSequencesPath: Union[str, Path], tablePath: Union[str, Path], denoisingStatsPath: Union[str, Path], - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: """ Wrapper for QIIME's DADA2 denoise-single, which performs denoising on the input paired-end reads. @@ -159,8 +159,9 @@ def dada2DenoiseSingle( Output path to the resulting feature table denoisingStatsPath : Union[str, Path] Output path to the statistics of the denoising - multithreading : bool - Whether to use multithreading. One thread per CPU core + threads : Optional[int] + Number of threads to use for the process. If None, multithreading will not be used. + Set to -1 (defaul) to use a thread for each CPU core """ if isinstance(inputPath, Path): @@ -185,8 +186,10 @@ def dada2DenoiseSingle( "--o-denoising-stats", denoisingStatsPath ] - if multithreading: + if threads == -1: args.extend(["--p-n-threads", "0"]) + elif threads is not None: + args.extend(["--p-n-threads", str()]) command(args) @@ -200,7 +203,7 @@ def dada2DenoisePaired( representativeSequencesPath: Union[str, Path], tablePath: Union[str, Path], denoisingStatsPath: Union[str, Path], - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: """ Wrapper for QIIME's DADA2 denoise-paired, which performs denoising on the input paired-end reads. @@ -239,8 +242,9 @@ def dada2DenoisePaired( Output path to the resulting feature table denoisingStatsPath : Union[str, Path] Output path to the statistics of the denoising - multithreading : bool - Whether to use multithreading. One thread per CPU core + threads : Optional[int] + Number of threads to use for the process. If None, multithreading will not be used. + Set to -1 (defaul) to use a thread for each CPU core """ if isinstance(inputPath, Path): @@ -267,8 +271,10 @@ def dada2DenoisePaired( "--o-denoising-stats", denoisingStatsPath ] - if multithreading: + if threads == -1: args.extend(["--p-n-threads", "0"]) + elif threads is not None: + args.extend(["--p-n-threads", str(threads)]) command(args) @@ -304,7 +310,7 @@ def phylogenyAlignToTreeMafftFasttree( maskedAligmentPath: str, unrootedTreePath: str, rootedTreePath: str, - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: args = [ @@ -316,8 +322,10 @@ def phylogenyAlignToTreeMafftFasttree( "--o-rooted-tree", rootedTreePath ] - if multithreading: + if threads == -1: args.extend(["--p-n-threads", "auto"]) + elif threads is not None: + args.extend(["--p-n-threads", str(threads)]) command(args) @@ -328,7 +336,7 @@ def diversityCoreMetricsPhylogenetic( samplingDepth: int, metadataPath: str, outputDir: str, - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: args = [ @@ -340,8 +348,10 @@ def diversityCoreMetricsPhylogenetic( "--output-dir", outputDir ] - if multithreading: + if threads == -1: args.extend(["--p-n-jobs-or-threads", "auto"]) + elif threads is not None: + args.extend(["--p-n-jobs-or-threads", str(threads)]) command(args) @@ -416,7 +426,7 @@ def featureClassifierClassifySklearn( classifierPath: str, readsPath: str, classificationPath: str, - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: args = [ @@ -426,8 +436,10 @@ def featureClassifierClassifySklearn( "--o-classification", classificationPath ] - if multithreading: + if threads == -1: args.extend(["--p-n-jobs", "-1"]) + elif threads is not None: + args.extend(["--p-n-jobs", str(threads)]) command(args) @@ -509,7 +521,7 @@ def vsearchClusterDeNovo( percIdentity: float, clusteredTablePath: Union[str, Path], clusteredSequencesPath: Union[str, Path], - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: """ Wrapper for QIIME2's vsearch de novo clusteing command. @@ -527,7 +539,8 @@ def vsearchClusterDeNovo( clusteredSequencesPath : Union[str, Path] Path to the output clustered sequences multithreading : bool - Whether to use multithreading. One thread per CPU core + Number of threads to use for the process. If None, multithreading will not be used. + Set to -1 (defaul) to use a thread for each CPU core """ if isinstance(tablePath, Path): @@ -551,8 +564,10 @@ def vsearchClusterDeNovo( "--o-clustered-sequences", clusteredSequencesPath ] - if multithreading: + if threads == -1: args.extend(["--p-threads", "0"]) + elif threads is not None: + args.extend(["--p-threads", str(threads)]) command(args) @@ -565,7 +580,7 @@ def vsearchClusterClosedReference( clusteredTablePath: Union[str, Path], clusteredSequencesPath: Union[str, Path], unmatchedSequencesPath: Union[str, Path], - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: """ Wrapper for QIIME2's vsearch closed reference clusteing command. @@ -587,7 +602,8 @@ def vsearchClusterClosedReference( unmatchedSequencesPath : Union[str, Path] Path to the output unmatched sequences multithreading : bool - Whether to use multithreading. One thread per CPU core + Number of threads to use for the process. If None, multithreading will not be used. + Set to -1 (defaul) to use a thread for each CPU core """ if isinstance(tablePath, Path): @@ -619,8 +635,10 @@ def vsearchClusterClosedReference( "--o-unmatched-sequences", unmatchedSequencesPath ] - if multithreading: + if threads == -1: args.extend(["--p-threads", "0"]) + elif threads is not None: + args.extend(["--p-threads", str(threads)]) command(args) @@ -633,7 +651,7 @@ def vsearchClusterOpenReference( clusteredTablePath: Union[str, Path], clusteredSequencesPath: Union[str, Path], newReferenceSequencesPath: Union[str, Path], - multithreading: bool = True + threads: Optional[int] = -1 ) -> None: """ Wrapper for QIIME2's vsearch open reference clusteing command. @@ -655,7 +673,8 @@ def vsearchClusterOpenReference( newReferenceSequencesPath : Union[str, Path] Path to the output new reference sequences multithreading : bool - Whether to use multithreading. One thread per CPU core + Number of threads to use for the process. If None, multithreading will not be used. + Set to -1 (defaul) to use a thread for each CPU core """ if isinstance(tablePath, Path): @@ -687,7 +706,9 @@ def vsearchClusterOpenReference( "--o-new-reference-sequences", newReferenceSequencesPath ] - if multithreading: + if threads == -1: args.extend(["--p-threads", "0"]) + elif threads is not None: + args.extend(["--p-threads", str(threads)]) command(args)