Skip to content

Commit

Permalink
New workflows
Browse files Browse the repository at this point in the history
  * (for both PacBio, ONT) to split a BAM by read group, and reset the alignment
  * deduplicate and reset ONT aligned bam
  * optionally check primrose runs on PacBio input BAMs
  • Loading branch information
SHuang-Broad committed Dec 28, 2023
1 parent b64ef26 commit a105f00
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ workflows:
- name: ONTFlowcellFromMultipleBasecalls
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/ONTFlowcellFromMultipleBasecalls.wdl
- name: RemoveDuplicateFromMergedONTBamAndSplitByReadgroup
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/RemoveDuplicateFromMergedONTBamAndSplitByReadgroup.wdl
- name: DeduplicateAndResetONTAlignedBam
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/DeduplicateAndResetONTAlignedBam.wdl

###################################################
# PacBio
Expand All @@ -81,6 +87,9 @@ workflows:
- name: PBMASIsoSeqDemultiplex
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/PacBio/Utility/PBMASIsoSeqDemultiplex.wdl
- name: SplitMergedPacBioBamByReadgroup
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/PacBio/Utility/SplitMergedPacBioBamByReadgroup.wdl

###################################################
# TechAgnostic - *mics data processing
Expand Down Expand Up @@ -153,3 +162,6 @@ workflows:
- name: SaveFilesToDestination
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/SaveFilesToDestination.wdl
- name: SplitBamByReadgroup
subclass: wdl
primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/SplitBamByReadgroup.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
version 1.0

import "../../../tasks/Utility/Utils.wdl"
import "../../../tasks/Utility/BAMutils.wdl" as BU

workflow DeduplicateAndResetONTAlignedBam {
meta {
desciption: "Removes duplicate records from an aligned ONT bam, while resetting the alignment information."
}

parameter_meta {
scatter_scheme: "A txt file holding how to scatter the WGS bam. Example (this example size-balance among the shards): ...\nchr5,chr19\nchr6,chrY,chrM\n..."
}

input {
File aligned_bam
File? aligned_bai
File scatter_scheme
}

output {
File result = Merge.res
}

# step 1, split input bam by the T2T size-balanced scheme, then for each (don't forget unmapped) shard
call BU.ShardAlignedBam { input: aligned_bam = aligned_bam, aligned_bai = aligned_bai, scatter_scheme = scatter_scheme }

# for each shard, step 2
scatter (shard_bam in ShardAlignedBam.split_bams) {
# task 2, samtools reset and querysort by picard
# task 3, deduplicate
call BU.SamtoolsReset as Magic { input: bam = shard_bam }
call BU.QuerynameSortBamWithPicard as SortUnaligned { input: bam = Magic.res }
call BU.DeduplicateQuerynameSortedBam as DeQS { input: qnorder_bam = SortUnaligned.qnsort_bam}
String base_for_each = basename(DeQS.dedup_bam)
call BU.GetDuplicateReadnamesInQnameSortedBam as CheckDedupShard { input: qns_bam = DeQS.dedup_bam }
if ( CheckDedupShard.result_may_be_corrupted || 0!=length(read_lines(CheckDedupShard.dup_names_txt)) ) {
call Utils.StopWorkflow as DedupShardFail { input: reason = "Deduplication isn't successful for ~{shard_bam}."}
}
}
# for unmapped reads, step 2
call BU.SamtoolsReset as RemoveDict { input: bam = ShardAlignedBam.unmapped_reads } # consistent processing with the mapped ones (practically, take away the @SQ lines in the header)
call BU.QuerynameSortBamWithPicard as QNSortUnammpedReads { input: bam = RemoveDict.res }
call BU.DeduplicateQuerynameSortedBam as DeU { input: qnorder_bam = QNSortUnammpedReads.qnsort_bam }
String base_for_unmap = basename(DeU.dedup_bam)
call BU.GetDuplicateReadnamesInQnameSortedBam as CheckDedupUnmapped { input: qns_bam = DeU.dedup_bam }
if ( CheckDedupUnmapped.result_may_be_corrupted || 0!=length(read_lines(CheckDedupUnmapped.dup_names_txt)) ) {
call Utils.StopWorkflow as DedupUnmappedFail { input: reason = "Deduplication isn't successful for ~{DeU.dedup_bam}."}
}

# step 3 gather
Array[File] fixed_shards = flatten([[DeU.dedup_bam], DeQS.dedup_bam])
Array[String] basenames_for_merging = flatten([[base_for_unmap], base_for_each])
call Utils.ComputeAllowedLocalSSD as SizeIt { input: intended_gb = 50 + ceil(size(fixed_shards, "GB"))}
call BU.MergeBamsQuerynameSortedWithPicard as Merge { input:
qns_bams = fixed_shards, base_names = basenames_for_merging,
out_prefix = basename(aligned_bam, ".bam") + ".DSP-fixed", num_ssds = SizeIt.numb_of_local_ssd
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
version 1.0

import "DeduplicateAndResetONTAlignedBam.wdl" as FixAndReset

import "../../TechAgnostic/Utility/SplitBamByReadgroup.wdl" as Major

import "../../../tasks/Utility/Utils.wdl"
import "../../../tasks/Utility/BAMutils.wdl" as BU
import "../../../tasks/Utility/ONTUtils.wdl" as OU

workflow RemoveDuplicateFromMergedONTBamAndSplitByReadgroup {

meta {
desciption: "Remove duplicate records from an ONT alinged BAM, drop alignment information, and split by the bam by read groups."
}
parameter_meta {
fix_bam_header: "Sometimes, the bam given to us contains a specific error mode. We fix it here."
scatter_scheme: "A txt file holding how to scatter the WGS bam. Example (this example size-balance among the shards): ...\nchr5,chr19\nchr6,chrY,chrM\n..."
}

input {
File input_bam
File? input_bai

Boolean fix_bam_header
File scatter_scheme

String gcs_out_root_dir
}

output {
Map[String, String]? runid_2_ont_basecall_model = GetBasecallModel.runid_2_model

Map[String, String] rgid_2_bam = WORKHORSE.rgid_2_bam
Map[String, String] rgid_2_PU = WORKHORSE.rgid_2_PU
Map[String, String]? rgid_2_ubam_emptyness = WORKHORSE.rgid_2_ubam_emptyness
Boolean rgid_2_bam_are_aligned = WORKHORSE.rgid_2_bam_are_aligned

String last_processing_date = WORKHORSE.last_processing_date
}

##############################################################################################################################
# input file validation and fixing
call BU.GatherBamMetadata {
input: bam = input_bam
}
if ('coordinate' != GatherBamMetadata.sort_order) {
call Utils.StopWorkflow { input: reason = "Input bam isn't coordinate-sorted, but rather sorted by ~{GatherBamMetadata.sort_order}" }
}

# reality of life--submitted files sometimes need fixings in their headers
if (fix_bam_header) {
call FixParticularBamHeaderIssue { input: bam = input_bam }
}

call FixAndReset.DeduplicateAndResetONTAlignedBam as Dedup { input:
aligned_bam = select_first([FixParticularBamHeaderIssue.fixed, input_bam]), aligned_bai = input_bai, scatter_scheme = scatter_scheme
}

File ok_input_bam = Dedup.result

##############################################################################################################################
# delegate
call Major.SplitBamByReadgroup as WORKHORSE {
input:
input_bam = ok_input_bam,

unmap_bam = false, # already done above
convert_to_fq = false, # no need for ONT data, usually
validate_output_bams = true,

gcs_out_root_dir = gcs_out_root_dir,
debug_mode = false
}

call OU.GetBasecallModel { input: bam = ok_input_bam }
}

task FixParticularBamHeaderIssue {
meta {
description: "Someone submitted to us BAMs with irregular headers. Fix that here."
}
input {
File bam
}
output {
File fixed = "~{prefix}.rg.fixed.bam"
}

Int disk_size = 100 + 2 * ceil(size(bam, 'GiB'))

String prefix = basename(bam, '.bam')

command <<<
set -eux

samtools view -H ~{bam} > original.header.txt
cat original.header.txt

# strip away the unwanted @RG line
grep -vE "^@RG[[:space:]]SM:" original.header.txt > to.add.sample.name.header.txt
diff original.header.txt to.add.sample.name.header.txt || true

# add back the sample name to the correct @RG lines
formatted_sample_name=$(grep -E "^@RG[[:space:]]SM:" original.header.txt | tr '\t' '\n' | grep "^SM:")
TAB=$'\t' # sed doesn't officially recoganize \t as tab
for line_num in `grep -n "^@RG" to.add.sample.name.header.txt | awk -F ':' '{print $1}'`
do
echo "${line_num}"
sed -i.bak "${line_num}s/$/""${TAB}""${formatted_sample_name}""/" to.add.sample.name.header.txt
done
cat to.add.sample.name.header.txt
mv to.add.sample.name.header.txt fixed.header.txt

samtools reheader fixed.header.txt ~{bam} > ~{prefix}.rg.fixed.bam
>>>

runtime {
cpu: 1
memory: "4 GiB"
disks: "local-disk ~{disk_size} LOCAL"
preemptible: 2
maxRetries: 1
docker: "us.gcr.io/broad-dsp-lrma/lr-gcloud-samtools:0.1.3"
}
}
77 changes: 77 additions & 0 deletions wdl/pipelines/PacBio/Utility/SplitMergedPacBioBamByReadgroup.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
version 1.0

import "../../TechAgnostic/Utility/SplitBamByReadgroup.wdl" as Major

import "../../../tasks/Utility/Utils.wdl"
import "../../../tasks/Utility/BAMutils.wdl" as BU
import "../../../tasks/Utility/PBUtils.wdl"

workflow SplitMergedPacBioBamByReadgroup {
meta {
description: "Split a BAM file that was aggregated, for the same sample, into pieces by read group."
}

input {
File input_bam
File? input_bai

Boolean unmap_bam
Boolean convert_to_fq = false
Boolean disable_primrose_check = false

String gcs_out_root_dir
}

output {
Map[String, String] rgid_2_bam = WORKHORSE.rgid_2_bam
Map[String, String] rgid_2_PU = WORKHORSE.rgid_2_PU
Map[String, String]? rgid_2_ubam_emptyness = WORKHORSE.rgid_2_ubam_emptyness
Boolean rgid_2_bam_are_aligned = WORKHORSE.rgid_2_bam_are_aligned
Map[String, String]? rgid_2_fastq = WORKHORSE.rgid_2_fastq

String last_processing_date = WORKHORSE.last_processing_date
}

parameter_meta {
input_bam: "BAM to be split by read group; doesn't necessarily need to be aligned or sorted."
input_bai: "(optional) BAI accompanying the BAM"
platform: "long reads platform the BAM was generated on; must be one of [PB, ONT]"
convert_to_fq: "user option to convert to FASTQ (gz) or not"
gcs_out_root_dir: "place to store the result files"
disable_primrose_check: "when true, disable a QC check making sure primrose is run on all readgroups; only do this when you know what you're doing"
}

##############################################################################################################################
# input BAM most basic QC check
call BU.GatherBamMetadata {
input: bam = input_bam
}
if ('coordinate' != GatherBamMetadata.sort_order) {
call Utils.StopWorkflow { input: reason = "Input bam isn't coordinate-sorted, but rather sorted by ~{GatherBamMetadata.sort_order}" }
}

# this guarantees that there are no read groups missing primrose runs
if (!disable_primrose_check) {
call PBUtils.VerifyPacBioBamHasAppropriatePrimroseRuns as PrimroseCheck { input: bam = input_bam }
if (0!= length(PrimroseCheck.readgroups_missing_primrose)) {
call Utils.StopWorkflow as MissingPrimrose { input: reason = "Input BAM file has some of its read groups missing primrose calls."}
}
}

# delegate
String workflow_name = 'SplitMergedPacBioBamByReadgroup'
call Major.SplitBamByReadgroup as WORKHORSE {
input:
input_bam = input_bam,
input_bai = input_bai,

unmap_bam = unmap_bam,
convert_to_fq = convert_to_fq,

validate_output_bams = true,

gcs_out_root_dir = gcs_out_root_dir,
override_workflow_name = workflow_name,
debug_mode = false
}
}
Loading

0 comments on commit a105f00

Please sign in to comment.