Skip to content

Commit

Permalink
make the dedup verification step resilient to http transient errors
Browse files Browse the repository at this point in the history
  • Loading branch information
SHuang-Broad committed Jan 17, 2024
1 parent bf50b57 commit e397755
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 23 deletions.
65 changes: 44 additions & 21 deletions wdl/tasks/Utility/BAMutils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -716,53 +716,76 @@ task GetDuplicateReadnamesInQnameSortedBam {
}
parameter_meta {
qns_bam: {
desciption: "Query name sorted BAM to be de-duplicated",
localization_optional: true
}
trial_idx: "the n-th time this is being tried for (start from 1), if this value is >= trial_max, the BAM will be localized and the task will use a persistent SSD instead of persistent HDD."
trial_max: "the max number of attempt to perform the duty by streaming in the BAM; this design together with trial_idx is to prevent call-caching preventing retries."
}
input {
File qns_bam
Int trial_idx = 1
Int trial_max = 3
}
output {
File dup_names_txt = "dup_read_names.txt"
Boolean result_may_be_corrupted = read_boolean("samtools.failed.txt")
}
Boolean localize_bam = trial_idx >= trial_max
command <<<
# the way this works is the following:
# 0) relying on the re-auth.sh script to export the credentials
# 1) perform the remote sam-view subsetting in the background
# 2) listen to the PID of the background process, while re-auth every 1200 seconds
source /opt/re-auth.sh
set -euxo pipefail
# assumption
sort_order=$(samtools view -H ~{qns_bam} | grep "^@HD" | tr '\t' '\n' | grep "^SO:" | awk -F ':' '{print $2}')
if [[ "queryname" != "${sort_order}" ]]; then echo -e "Sort order ${sort_oder} isn't the expected 'queryname'." && exit 1; fi
# remote grab read names
echo "false" > samtools.failed.txt
samtools view ~{qns_bam} \
| awk -F '\t' '{print $1}' \
| uniq -d \
> "dup_read_names.txt" \
|| { echo "true" > samtools.failed.txt; exit 77; } &
pid=$!
if ~{localize_bam}; then
time \
gcloud storage cp ~{qns_bam} name_does_not_matter.bam
set +e
count=1
while true; do
sleep 1200 && date && source /opt/re-auth.sh
if [[ ${count} -gt 2 ]]; then exit 0; fi
if ! pgrep -x -P $pid; then exit 0; fi
count=$(( count+1 ))
done
samtools view name_does_not_matter.bam \
| awk -F '\t' '{print $1}' \
| uniq -d \
> "dup_read_names.txt"
echo "false" > samtools.failed.txt
else
# the way this works is the following:
# 0) relying on the re-auth.sh script to export the credentials
# 1) perform the remote sam-view operation in the background
# 2) listen to the PID of the background process, while re-auth every 1200 seconds
# remote grab read names
echo "false" > samtools.failed.txt
samtools view ~{qns_bam} \
| awk -F '\t' '{print $1}' \
| uniq -d \
> "dup_read_names.txt" \
|| { echo "true" > samtools.failed.txt; exit 77; } &
pid=$!
set +e
count=1
while true; do
sleep 1200 && date && source /opt/re-auth.sh
if [[ ${count} -gt 2 ]]; then exit 0; fi
if ! pgrep -x -P $pid; then exit 0; fi
count=$(( count+1 ))
done
fi
>>>
Int disk_size = 5 + (if (localize_bam) then ceil(size(qns_bam, "Gib")) else 0)
String disk_type = if (localize_bam) then "SSD" else "HDD"
runtime {
cpu: 1
memory: "4 GiB"
disks: "local-disk 10 HDD"
disks: "local-disk ~{disk_size} ~{disk_type}"
preemptible: 2
maxRetries: 1
docker: "us.gcr.io/broad-dsp-lrma/lr-gcloud-samtools:0.1.3"
Expand Down
20 changes: 18 additions & 2 deletions wdl/tasks/Utility/ONTBamShardResetAndDeduplicate.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,24 @@ workflow Work {
call BU.SamtoolsReset as Magic { input: bam = shard_bam }
call BU.QuerynameSortBamWithPicard as SortUnaligned { input: bam = Magic.res }
call BU.DeduplicateQuerynameSortedBam as DeQS { input: qnorder_bam = SortUnaligned.qnsort_bam}
call BU.GetDuplicateReadnamesInQnameSortedBam as CheckDedupShard { input: qns_bam = DeQS.dedup_bam }
if ( CheckDedupShard.result_may_be_corrupted || 0!=length(read_lines(CheckDedupShard.dup_names_txt)) ) {

# verify
call BU.GetDuplicateReadnamesInQnameSortedBam as InitialCheckDedupShard { input: qns_bam = DeQS.dedup_bam, trial_idx = 1 }
if ( InitialCheckDedupShard.result_may_be_corrupted ) {
call BU.GetDuplicateReadnamesInQnameSortedBam as RetryCheckDedupShard { input: qns_bam = DeQS.dedup_bam, trial_idx = 2 }
if ( RetryCheckDedupShard.result_may_be_corrupted ) {
call BU.GetDuplicateReadnamesInQnameSortedBam as LastCheckDedupShard { input: qns_bam = DeQS.dedup_bam, trial_idx = 3, trial_max = 3 }
}
}

# do not change order
Boolean CheckOperationFailed = select_first([LastCheckDedupShard.result_may_be_corrupted,
RetryCheckDedupShard.result_may_be_corrupted,
InitialCheckDedupShard.result_may_be_corrupted])
Array[String] dup_names = read_lines(select_first([LastCheckDedupShard.dup_names_txt,
RetryCheckDedupShard.dup_names_txt,
InitialCheckDedupShard.dup_names_txt]))
if ( CheckOperationFailed || 0!=length(dup_names) ) {
call Utils.StopWorkflow as DedupShardFail { input: reason = "Deduplication isn't successful for ~{shard_bam}."}
}
}

0 comments on commit e397755

Please sign in to comment.