Skip to content

Commit

Permalink
Merge pull request #27 from umccr/datashare_s3
Browse files Browse the repository at this point in the history
Datashare: initial S3 support
  • Loading branch information
pdiakumis authored Nov 14, 2024
2 parents bc524ac + 7a62cf4 commit 35f4b4a
Show file tree
Hide file tree
Showing 21 changed files with 657 additions and 69 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Imports:
purrr,
RAthena,
rlang,
stringr,
tibble,
tidyr,
Suggests:
Expand Down
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Generated by roxygen2: do not edit by hand

export(datashare_um)
export(datashare_um_s3)
export(datashare_wts)
export(datashare_wts_s3)
export(envvar_defined)
export(jwt_exp)
export(jwt_validate)
Expand All @@ -21,8 +23,13 @@ export(meta_wts_tumor_only)
export(orca_jwt)
export(orca_libid2workflows)
export(orca_prid2wfpayload)
export(orca_prid2wfrid)
export(orca_query_url)
export(orca_wfrid2payload)
export(orca_wfrid2state)
export(orca_workflow_list)
export(pld_umccrise)
export(pld_wts)
export(portaldb_query)
export(portaldb_query_fastqlistrow)
export(portaldb_query_labmetadata)
Expand Down
217 changes: 215 additions & 2 deletions R/datasharing.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,143 @@
#' Datashare umccrise S3 Results
#'
#' @param libid LibraryID of WGS tumor.
#' @param token JWT.
#' @param wf_page_size Number of results to return when listing workflows this
#' libid is involved in.
#'
#' @return Tibble with presigned URLs.
#' @examples
#' \dontrun{
#' libid <- "L2401591"
#' libid <- "L2401596"
#' token <- orca_jwt() |> jwt_validate()
#' datashare_um_s3(libid, token)
#' }
#' @export
datashare_um_s3 <- function(libid, token, wf_page_size = 50) {
umccrise_pld <- function(libid) {
# get workflows run for libid
wf <- orca_libid2workflows(
libid = libid, token = token, wf_name = NULL, page_size = wf_page_size, stage = "prod"
)
wf_um_raw <- wf |>
dplyr::filter(.data$wf_name == "umccrise", .data$currentStateStatus == "SUCCEEDED")
n_um_runs <- nrow(wf_um_raw)
if (n_um_runs == 0) {
cli::cli_abort("No umccrise results found for {libid}")
} else if (n_um_runs > 1) {
wf_um_raw <- wf_um_raw |> dplyr::slice_head(n = 1)
msg <- glue(
"There are {n_um_runs} > 1 umccrise workflows run for ",
"{libid};\n",
"We use the latest run with portal_run_id=\"{wf_um_raw$portalRunId}\" ",
"with a timestamp of ",
"{wf_um_raw$currentStateTimestamp}."
)
cli::cli_alert_info(msg)
}
# now use wfrid to get the payload with umccrise io
wf_um_raw$orcabusId |>
orca_wfrid2payload(token = token) |>
pld_umccrise()
}
umccrise_files <- dplyr::tribble(
~regex, ~fun,
"multiqc_report\\.html$", "HTML_MultiQC",
"somatic\\.pcgr\\.html$", "HTML_PCGR",
"normal\\.cpsr\\.html$", "HTML_CPSR",
"cancer_report\\.html$", "HTML_CanRep",
"germline\\.predispose_genes\\.vcf\\.gz$", "VCF_Germline",
"germline\\.predispose_genes\\.vcf\\.gz\\.tbi$", "VCFi_Germline",
"somatic-PASS\\.vcf\\.gz$", "VCF_Somatic",
"somatic-PASS\\.vcf\\.gz\\.tbi$", "VCFi_Somatic",
"somatic\\.pcgr\\.snvs_indels\\.tiers\\.tsv$", "TSV_SmallVariantsSomatic",
"manta\\.tsv$", "TSV_StructuralVariantsManta",
"manta\\.vcf\\.gz$", "VCF_StructuralVariantsManta",
"manta\\.vcf\\.gz.tbi$", "VCFi_StructuralVariantsManta",
"purple\\.cnv\\.gene\\.tsv$", "TSV_CopyNumberVariantsPurpleGene",
"purple\\.cnv\\.somatic\\.tsv$", "TSV_CopyNumberVariantsPurpleSegments"
)
tn_files <- dplyr::tribble(
~regex, ~fun,
"tumor\\.bam$", "BAM_tumor",
"tumor\\.bam\\.bai$", "BAMi_tumor",
"tumor\\.bam\\.md5sum$", "BAMmd5sum_tumor",
"normal\\.bam$", "BAM_normal",
"normal\\.bam\\.bai$", "BAMi_normal",
"normal\\.bam\\.md5sum$", "BAMmd5sum_tumor",
)
p <- umccrise_pld(libid)
um_dragen_input_s3 <- p[["input_dragenSomaticOutputUri"]]
um_final_s3 <- p[["output_outputDirectoryUri"]]
um_work_s3 <- file.path(dirname(um_final_s3), "work", basename(um_final_s3))
assertthat::assert_that(all(
grepl("^s3://", c(um_dragen_input_s3, um_final_s3, um_work_s3))
))
libId_tumor <- libid
libId_normal <- p[["normalLibraryId"]]
subjectId <- p[["input_subjectId"]]
amber_dir <- file.path(um_work_s3, "purple/amber")
cobalt_dir <- file.path(um_work_s3, "purple/cobalt")
sigs_dir <- file.path(um_final_s3, "cancer_report_tables/sigs")
expiry <- 604800 # 7days * 24hrs * 60min * 60sec
d_um_urls1 <- um_final_s3 |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = umccrise_files, max_objects = 1000, expiry_sec = expiry
)
d_um_urls_sigs <- sigs_dir |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = tibble::tibble(regex = ".*tsv\\.gz$", fun = "foo"),
max_objects = 100, expiry_sec = expiry
) |>
dplyr::mutate(type = "Signatures") |>
dplyr::select("type", "bname", "size", "path", "presigned_url")
d_um_urls_amber <- amber_dir |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = tibble::tibble(regex = "amber", fun = "foo"),
max_objects = 100, expiry_sec = expiry
) |>
dplyr::mutate(type = "AMBER") |>
dplyr::select("type", "bname", "size", "path", "presigned_url")
d_um_urls_cobalt <- cobalt_dir |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = tibble::tibble(regex = "cobalt", fun = "foo"),
max_objects = 100, expiry_sec = expiry
) |>
dplyr::mutate(type = "COBALT") |>
dplyr::select("type", "bname", "size", "path", "presigned_url")
d_um_urls2 <- um_dragen_input_s3 |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = tn_files, max_objects = 500, expiry_sec = expiry
)
if ((nrow(d_um_urls2) != nrow(tn_files)) | ((nrow(d_um_urls1) != nrow(umccrise_files)))) {
# files were not found? might also have files with the same pattern matching,
# though I have not encountered any such cases. Also: BAMs get deleted.
cli::cli_alert_danger(
text = c(
"There was not a 1-1 match between files requested and files found. ",
"Maybe BAMs have been deleted? ",
"Contact the UMCCR bioinformatics team."
)
)
}
urls_all <- dplyr::bind_rows(d_um_urls1, d_um_urls2) |>
dplyr::arrange(.data$type) |>
dplyr::bind_rows(d_um_urls_amber, d_um_urls_cobalt, d_um_urls_sigs) |>
dplyr::mutate(
libid = libid,
size = trimws(as.character(.data$size)),
filesystem = "s3"
) |>
dplyr::relocate("libid") |>
dplyr::relocate("filesystem", .after = "lastmodified")
urls_all
}

#' Datashare umccrise Results
#'
#' @param sid SubjectID.
#' @param lid LibraryID of WTS tumor.
#' @param lid LibraryID of WGS tumor.
#' @param token_ica ICA_ACCESS_TOKEN.
#'
#' @return Tibble with presigned URLs.
Expand Down Expand Up @@ -209,7 +345,6 @@ datashare_wts <- function(sid, lid, wfrn_prefix = "umccr__automated__wts_tumor_o
"ORDER BY \"start\" DESC;"
)
d_wts_raw <- portaldb_query_workflow(query_wts)

n_wts_runs <- nrow(d_wts_raw)
if (n_wts_runs == 0) {
cli::cli_abort("No WTS results found for {sid_lid}")
Expand Down Expand Up @@ -242,3 +377,81 @@ datashare_wts <- function(sid, lid, wfrn_prefix = "umccr__automated__wts_tumor_o
dplyr::relocate("sbjid_libid")
d_wts_urls
}

#' Datashare WTS S3 Results
#'
#' @param libid LibraryID of WTS tumor.
#' @param wf_page_size Number of results to return when listing workflows this
#' libid is involved in.
#' @param token JWT.
#'
#' @return Tibble with presigned URLs.
#' @examples
#' \dontrun{
#' libid <- "L2401585"
#' token <- orca_jwt() |> jwt_validate()
#' datashare_wts_s3(libid, token)
#' }
#' @export
datashare_wts_s3 <- function(libid, token, wf_page_size = 50) {
wts_pld <- function(libid) {
# get workflows run for libid
wf <- orca_libid2workflows(
libid = libid, token = token, wf_name = NULL, page_size = wf_page_size, stage = "prod"
)
wf_raw <- wf |>
dplyr::filter(.data$wf_name == "wts", .data$currentStateStatus == "SUCCEEDED")
n_runs <- nrow(wf_raw)
if (n_runs == 0) {
cli::cli_abort("No WTS results found for {libid}")
} else if (n_runs > 1) {
wf_raw <- wf_raw |> dplyr::slice_head(n = 1)
msg <- glue(
"There are {n_runs} > 1 WTS workflows run for ",
"{libid};\n",
"We use the latest run with portal_run_id=\"{wf_raw$portalRunId}\" ",
"with a timestamp of ",
"{wf_raw$currentStateTimestamp}."
)
cli::cli_alert_info(msg)
}
# now use wfrid to get the payload with wts io
p <- wf_raw$orcabusId |>
orca_wfrid2payload(token = token) |>
pld_wts()
}
wts_files <- dplyr::tribble(
~regex, ~fun,
"\\.bam$", "BAM_WTS_tumor",
"\\.bam\\.bai$", "BAMi_WTS_tumor",
"\\.bam\\.md5sum$", "BAMmd5sum_WTS_tumor",
"fusion_candidates\\.final$", "TSV_WTS_FusionCandidatesDragen",
"quant\\.genes\\.sf$", "TSV_WTS_QuantificationGenes",
"quant\\.sf", "TSV_WTS_Quantification",
)
wts_arriba_files <- dplyr::tribble(
~regex, ~fun,
"fusions\\.pdf$", "PDF_WTS_FusionsArriba",
"fusions\\.tsv$", "TSV_WTS_FusionsArriba",
)
p <- wts_pld(libid)
expiry <- 604800 # 7days * 24hrs * 60min * 60sec
d_wts_urls1 <- p[["output_dragenTranscriptomeOutputUri"]] |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = wts_files, max_objects = 500, expiry_sec = expiry
)
d_wts_urls2 <- p[["output_arribaOutputUri"]] |>
dracarys::s3_list_files_filter_relevant(
presign = TRUE, regexes = wts_arriba_files, max_objects = 500, expiry_sec = expiry
)
urls_all <- dplyr::bind_rows(d_wts_urls1, d_wts_urls2) |>
dplyr::arrange(.data$type) |>
dplyr::mutate(
libid = libid,
size = trimws(as.character(.data$size)),
filesystem = "s3"
) |>
dplyr::relocate("libid") |>
dplyr::relocate("filesystem", .after = "lastmodified")
return(urls_all)
}
41 changes: 41 additions & 0 deletions R/meta_umccrise.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,44 @@ meta_umccrise <- function(pmeta, status = "Succeeded") {
"gds_infile_genomes_tar"
)
}

#' Payload for umccrise workflow
#'
#' @param pld List with umccrise workflow parameters.
#'
#' @return A tidy tibble.
#' @export
pld_umccrise <- function(pld) {
payload_okay(pld)
pdata <- pld[["data"]]
id <- pld[["orcabusId"]]
# collapse FastqListRowIds into single string
pdata[["tags"]][["tumorFastqListRowIds"]] <- pdata[["tags"]][["tumorFastqListRowIds"]] |>
paste(collapse = ", ")
pdata[["tags"]][["normalFastqListRowIds"]] <- pdata[["tags"]][["normalFastqListRowIds"]] |>
paste(collapse = ", ")
tags <- pdata[["tags"]] |>
tibble::as_tibble_row() |>
dplyr::mutate(orcabusId = id)
# remove trailing slashes from S3 directories
inputs <- pdata[["inputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("input_{x}")) |>
dplyr::mutate(orcabusId = id)
outputs <- pdata[["outputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("output_{x}")) |>
dplyr::mutate(orcabusId = id)
engpar <- pdata[["engineParameters"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("engparam_{x}")) |>
dplyr::mutate(orcabusId = id)
d <- tags |>
dplyr::left_join(inputs, by = "orcabusId") |>
dplyr::left_join(outputs, by = "orcabusId") |>
dplyr::left_join(engpar, by = "orcabusId")
return(d)
}
37 changes: 37 additions & 0 deletions R/meta_wts_tumor_only.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,40 @@ meta_wts_tumor_only <- function(pmeta, status = "Succeeded") {
"gds_outdir_qualimap"
)
}

#' Payload for WTS workflow
#'
#' @param pld List with WTS workflow parameters.
#'
#' @return A tidy tibble.
#' @export
pld_wts <- function(pld) {
payload_okay(pld)
pdata <- pld[["data"]]
id <- pld[["orcabusId"]]
# collapse FastqListRowIds into single string
pdata[["tags"]][["tumorFastqListRowIds"]] <- pdata[["tags"]][["tumorFastqListRowIds"]] |>
paste(collapse = ", ")
tags <- pdata[["tags"]] |>
tibble::as_tibble_row() |>
dplyr::mutate(orcabusId = id)
inputs <- pdata[["inputs"]] |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("input_{x}")) |>
dplyr::mutate(orcabusId = id)
outputs <- pdata[["outputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("output_{x}")) |>
dplyr::mutate(orcabusId = id)
engpar <- pdata[["engineParameters"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("engparam_{x}")) |>
dplyr::mutate(orcabusId = id)
d <- tags |>
dplyr::left_join(inputs, by = "orcabusId") |>
dplyr::left_join(outputs, by = "orcabusId") |>
dplyr::left_join(engpar, by = "orcabusId")
return(d)
}
Loading

0 comments on commit 35f4b4a

Please sign in to comment.