Skip to content

Commit d62b98e

Browse files
committed
New workflows
* (for both PacBio, ONT) to split a BAM by read group, and reset the alignment * deduplicate and reset ONT aligned bam * optionally check primrose runs on PacBio input BAMs
1 parent ce90a22 commit d62b98e

File tree

6 files changed

+431
-0
lines changed

6 files changed

+431
-0
lines changed

.dockstore.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ workflows:
6060
- name: ONTFlowcellFromMultipleBasecalls
6161
subclass: wdl
6262
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/ONTFlowcellFromMultipleBasecalls.wdl
63+
- name: RemoveDuplicateFromMergedONTBamAndSplitByReadgroup
64+
subclass: wdl
65+
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/RemoveDuplicateFromMergedONTBamAndSplitByReadgroup.wdl
66+
- name: DeduplicateAndResetONTAlignedBam
67+
subclass: wdl
68+
primaryDescriptorPath: /wdl/pipelines/ONT/Preprocessing/DeduplicateAndResetONTAlignedBam.wdl
6369

6470
###################################################
6571
# PacBio
@@ -81,6 +87,9 @@ workflows:
8187
- name: PBMASIsoSeqDemultiplex
8288
subclass: wdl
8389
primaryDescriptorPath: /wdl/pipelines/PacBio/Utility/PBMASIsoSeqDemultiplex.wdl
90+
- name: SplitMergedPacBioBamByReadgroup
91+
subclass: wdl
92+
primaryDescriptorPath: /wdl/pipelines/PacBio/Utility/SplitMergedPacBioBamByReadgroup.wdl
8493

8594
###################################################
8695
# TechAgnostic - *mics data processing
@@ -153,3 +162,6 @@ workflows:
153162
- name: SaveFilesToDestination
154163
subclass: wdl
155164
primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/SaveFilesToDestination.wdl
165+
- name: SplitBamByReadgroup
166+
subclass: wdl
167+
primaryDescriptorPath: /wdl/pipelines/TechAgnostic/Utility/SplitBamByReadgroup.wdl
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
version 1.0
2+
3+
import "../../../tasks/Utility/Utils.wdl"
4+
import "../../../tasks/Utility/BAMutils.wdl" as BU
5+
6+
import "../../../tasks/Utility/ONTBamShardResetAndDeduplicate.wdl" as CleanOneShard
7+
8+
workflow DeduplicateAndResetONTAlignedBam {
9+
meta {
10+
desciption: "Removes duplicate records from an aligned ONT bam, while resetting the alignment information."
11+
}
12+
13+
parameter_meta {
14+
scatter_scheme: "A txt file holding how to scatter the WGS bam. Example (this example size-balance among the shards): ...\nchr5,chr19\nchr6,chrY,chrM\n..."
15+
}
16+
17+
input {
18+
File aligned_bam
19+
File? aligned_bai
20+
File scatter_scheme
21+
}
22+
23+
output {
24+
File result = Merge.res
25+
}
26+
27+
# step 1, split input bam by the T2T size-balanced scheme, then for each (don't forget unmapped) shard
28+
call BU.ShardAlignedBam { input: aligned_bam = aligned_bam, aligned_bai = aligned_bai, scatter_scheme = scatter_scheme }
29+
30+
# for each shard, step 2
31+
scatter (shard_bam in ShardAlignedBam.split_bams) {
32+
call CleanOneShard.Work as DeShard { input: shard_bam = shard_bam }
33+
}
34+
call CleanOneShard.Work as DeUmap { input: shard_bam = ShardAlignedBam.unmapped_reads }
35+
36+
# step 3 gather
37+
Array[File] fixed_shards = flatten([[DeUmap.clean_bam], DeShard.clean_bam])
38+
Array[String] basenames_for_merging = flatten([[DeUmap.basename_of_clean_bam], DeShard.basename_of_clean_bam])
39+
call Utils.ComputeAllowedLocalSSD as SizeIt { input: intended_gb = 50 + ceil(size(fixed_shards, "GB"))}
40+
call BU.MergeBamsQuerynameSortedWithPicard as Merge { input:
41+
qns_bams = fixed_shards, base_names = basenames_for_merging,
42+
out_prefix = basename(aligned_bam, ".bam") + ".DSP-fixed", num_ssds = SizeIt.numb_of_local_ssd
43+
}
44+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
version 1.0
2+
3+
import "DeduplicateAndResetONTAlignedBam.wdl" as FixAndReset
4+
5+
import "../../TechAgnostic/Utility/SplitBamByReadgroup.wdl" as Major
6+
7+
import "../../../tasks/Utility/Utils.wdl"
8+
import "../../../tasks/Utility/BAMutils.wdl" as BU
9+
import "../../../tasks/Utility/ONTUtils.wdl" as OU
10+
11+
workflow RemoveDuplicateFromMergedONTBamAndSplitByReadgroup {
12+
13+
meta {
14+
desciption: "Remove duplicate records from an ONT alinged BAM, drop alignment information, and split by the bam by read groups."
15+
}
16+
parameter_meta {
17+
fix_bam_header: "Sometimes, the bam given to us contains a specific error mode. We fix it here."
18+
scatter_scheme: "A txt file holding how to scatter the WGS bam. Example (this example size-balance among the shards): ...\nchr5,chr19\nchr6,chrY,chrM\n..."
19+
}
20+
21+
input {
22+
File input_bam
23+
File? input_bai
24+
25+
Boolean fix_bam_header
26+
File scatter_scheme
27+
28+
String gcs_out_root_dir
29+
}
30+
31+
output {
32+
Map[String, String]? runid_2_ont_basecall_model = GetBasecallModel.runid_2_model
33+
34+
Map[String, String] rgid_2_bam = WORKHORSE.rgid_2_bam
35+
Map[String, String] rgid_2_PU = WORKHORSE.rgid_2_PU
36+
Map[String, String]? rgid_2_ubam_emptyness = WORKHORSE.rgid_2_ubam_emptyness
37+
Boolean rgid_2_bam_are_aligned = WORKHORSE.rgid_2_bam_are_aligned
38+
39+
String last_processing_date = WORKHORSE.last_processing_date
40+
}
41+
42+
##############################################################################################################################
43+
# input file validation and fixing
44+
call BU.GatherBamMetadata {
45+
input: bam = input_bam
46+
}
47+
if ('coordinate' != GatherBamMetadata.sort_order) {
48+
call Utils.StopWorkflow { input: reason = "Input bam isn't coordinate-sorted, but rather sorted by ~{GatherBamMetadata.sort_order}" }
49+
}
50+
51+
# reality of life--submitted files sometimes need fixings in their headers
52+
if (fix_bam_header) {
53+
call FixParticularBamHeaderIssue { input: bam = input_bam }
54+
}
55+
56+
call FixAndReset.DeduplicateAndResetONTAlignedBam as Dedup { input:
57+
aligned_bam = select_first([FixParticularBamHeaderIssue.fixed, input_bam]), aligned_bai = input_bai, scatter_scheme = scatter_scheme
58+
}
59+
60+
File ok_input_bam = Dedup.result
61+
62+
##############################################################################################################################
63+
# delegate
64+
call Major.SplitBamByReadgroup as WORKHORSE {
65+
input:
66+
input_bam = ok_input_bam,
67+
68+
unmap_bam = false, # already done above
69+
convert_to_fq = false, # no need for ONT data, usually
70+
71+
validate_output_bams = true,
72+
73+
gcs_out_root_dir = gcs_out_root_dir,
74+
debug_mode = false
75+
}
76+
77+
call OU.GetBasecallModel { input: bam = ok_input_bam }
78+
}
79+
80+
task FixParticularBamHeaderIssue {
81+
meta {
82+
description: "Someone submitted to us BAMs with irregular headers. Fix that here."
83+
}
84+
input {
85+
File bam
86+
}
87+
output {
88+
File fixed = "~{prefix}.rg.fixed.bam"
89+
}
90+
91+
Int disk_size = 100 + 2 * ceil(size(bam, 'GiB'))
92+
93+
String prefix = basename(bam, '.bam')
94+
95+
command <<<
96+
set -eux
97+
98+
samtools view -H ~{bam} > original.header.txt
99+
cat original.header.txt
100+
101+
# strip away the unwanted @RG line
102+
grep -vE "^@RG[[:space:]]SM:" original.header.txt > to.add.sample.name.header.txt
103+
diff original.header.txt to.add.sample.name.header.txt || true
104+
105+
# add back the sample name to the correct @RG lines
106+
formatted_sample_name=$(grep -E "^@RG[[:space:]]SM:" original.header.txt | tr '\t' '\n' | grep "^SM:")
107+
TAB=$'\t' # sed doesn't officially recoganize \t as tab
108+
for line_num in `grep -n "^@RG" to.add.sample.name.header.txt | awk -F ':' '{print $1}'`
109+
do
110+
echo "${line_num}"
111+
sed -i.bak "${line_num}s/$/""${TAB}""${formatted_sample_name}""/" to.add.sample.name.header.txt
112+
done
113+
cat to.add.sample.name.header.txt
114+
mv to.add.sample.name.header.txt fixed.header.txt
115+
116+
samtools reheader fixed.header.txt ~{bam} > ~{prefix}.rg.fixed.bam
117+
>>>
118+
119+
runtime {
120+
cpu: 1
121+
memory: "4 GiB"
122+
disks: "local-disk ~{disk_size} LOCAL"
123+
preemptible: 2
124+
maxRetries: 1
125+
docker: "us.gcr.io/broad-dsp-lrma/lr-gcloud-samtools:0.1.3"
126+
}
127+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
version 1.0
2+
3+
import "../../TechAgnostic/Utility/SplitBamByReadgroup.wdl" as Major
4+
5+
import "../../../tasks/Utility/Utils.wdl"
6+
import "../../../tasks/Utility/BAMutils.wdl" as BU
7+
import "../../../tasks/Utility/PBUtils.wdl"
8+
9+
workflow SplitMergedPacBioBamByReadgroup {
10+
meta {
11+
description: "Split a BAM file that was aggregated, for the same sample, into pieces by read group."
12+
}
13+
14+
input {
15+
File input_bam
16+
File? input_bai
17+
18+
Boolean unmap_bam
19+
Boolean convert_to_fq = false
20+
Boolean disable_primrose_check = false
21+
22+
String gcs_out_root_dir
23+
}
24+
25+
output {
26+
Map[String, String] rgid_2_bam = WORKHORSE.rgid_2_bam
27+
Map[String, String] rgid_2_PU = WORKHORSE.rgid_2_PU
28+
Map[String, String]? rgid_2_ubam_emptyness = WORKHORSE.rgid_2_ubam_emptyness
29+
Boolean rgid_2_bam_are_aligned = WORKHORSE.rgid_2_bam_are_aligned
30+
Map[String, String]? rgid_2_fastq = WORKHORSE.rgid_2_fastq
31+
32+
String last_processing_date = WORKHORSE.last_processing_date
33+
}
34+
35+
parameter_meta {
36+
input_bam: "BAM to be split by read group; doesn't necessarily need to be aligned or sorted."
37+
input_bai: "(optional) BAI accompanying the BAM"
38+
platform: "long reads platform the BAM was generated on; must be one of [PB, ONT]"
39+
convert_to_fq: "user option to convert to FASTQ (gz) or not"
40+
gcs_out_root_dir: "place to store the result files"
41+
disable_primrose_check: "when true, disable a QC check making sure primrose is run on all readgroups; only do this when you know what you're doing"
42+
}
43+
44+
##############################################################################################################################
45+
# input BAM most basic QC check
46+
call BU.GatherBamMetadata {
47+
input: bam = input_bam
48+
}
49+
if ('coordinate' != GatherBamMetadata.sort_order) {
50+
call Utils.StopWorkflow { input: reason = "Input bam isn't coordinate-sorted, but rather sorted by ~{GatherBamMetadata.sort_order}" }
51+
}
52+
53+
# this guarantees that there are no read groups missing primrose runs
54+
if (!disable_primrose_check) {
55+
call PBUtils.VerifyPacBioBamHasAppropriatePrimroseRuns as PrimroseCheck { input: bam = input_bam }
56+
if (0!= length(PrimroseCheck.readgroups_missing_primrose)) {
57+
call Utils.StopWorkflow as MissingPrimrose { input: reason = "Input BAM file has some of its read groups missing primrose calls."}
58+
}
59+
}
60+
61+
# delegate
62+
String workflow_name = 'SplitMergedPacBioBamByReadgroup'
63+
call Major.SplitBamByReadgroup as WORKHORSE {
64+
input:
65+
input_bam = input_bam,
66+
input_bai = input_bai,
67+
68+
unmap_bam = unmap_bam,
69+
convert_to_fq = convert_to_fq,
70+
71+
validate_output_bams = true,
72+
73+
gcs_out_root_dir = gcs_out_root_dir,
74+
override_workflow_name = workflow_name,
75+
debug_mode = false
76+
}
77+
}

0 commit comments

Comments
 (0)