Skip to content

Commit

Permalink
Merge pull request #29 from umccr/pld-update
Browse files Browse the repository at this point in the history
Better support for OrcaBus workflow manager payloads
  • Loading branch information
pdiakumis authored Dec 2, 2024
2 parents c720ba9 + 433eaa2 commit 83be5b3
Show file tree
Hide file tree
Showing 27 changed files with 910 additions and 146 deletions.
1 change: 0 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ LazyData: true
Roxygen: list(markdown = TRUE,
roclets = c("namespace", "rd", "roxytest::testthat_roclet"))
RoxygenNote: 7.3.1
VignetteBuilder: knitr
Depends:
R (>= 4.1)
Remotes:
Expand Down
12 changes: 12 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,20 @@ export(orca_query_url)
export(orca_wfrid2payload)
export(orca_wfrid2state)
export(orca_workflow_list)
export(pld_bclconvert)
export(pld_bclconvertinteropqc)
export(pld_bsshfastqcopy)
export(pld_cttsov2)
export(pld_oawgtsdna)
export(pld_oawgtsdnarna)
export(pld_oawgtsrna)
export(pld_oracompression)
export(pld_pieriandx)
export(pld_rnasum)
export(pld_sash)
export(pld_tumornormal)
export(pld_umccrise)
export(pld_wgtsqc)
export(pld_wts)
export(portaldb_query)
export(portaldb_query_fastqlistrow)
Expand Down
44 changes: 0 additions & 44 deletions R/meta_tso_ctdna_tumor_only.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,47 +51,3 @@ meta_tso_ctdna_tumor_only <- function(pmeta, status = c("Succeeded")) {
cttso_rerun = "rerun"
)
}

#' Payload Tidy tso
#'
#' @param pld List with tso workflow parameters.
#'
#' @return A tidy tibble.
#' @export
pld_cttsov2 <- function(pld) {
payload_okay(pld)
pdata <- pld[["data"]]
id <- pld[["orcabusId"]]
# collapse FastqListRowIds into single string
pdata[["tags"]][["fastqListRowIds"]] <- pdata[["tags"]][["fastqListRowIds"]] |>
paste(collapse = ", ")
tags <- pdata[["tags"]] |>
tibble::as_tibble_row() |>
dplyr::mutate(orcabusId = id)
# ignore verbose inputs$samplesheet
inputs <- pdata[["inputs"]]
inputs[["samplesheet"]] <- NULL
inputs[["fastqListRows"]] <- inputs[["fastqListRows"]] |>
purrr::map(tibble::as_tibble_row) |>
dplyr::bind_rows()
inputs[["fastqListRows"]] <- list(inputs[["fastqListRows"]])
inputs <- inputs |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("input_{x}")) |>
dplyr::mutate(orcabusId = id)
outputs <- pdata[["outputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("output_{x}")) |>
dplyr::mutate(orcabusId = id)
engpar <- pdata[["engineParameters"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("engparam_{x}")) |>
dplyr::mutate(orcabusId = id)
d <- tags |>
dplyr::left_join(inputs, by = "orcabusId") |>
dplyr::left_join(outputs, by = "orcabusId") |>
dplyr::left_join(engpar, by = "orcabusId")
return(d)
}
41 changes: 0 additions & 41 deletions R/meta_umccrise.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,44 +97,3 @@ meta_umccrise <- function(pmeta, status = "Succeeded") {
"gds_infile_genomes_tar"
)
}

#' Payload Tidy umccrise
#'
#' @param pld List with umccrise workflow parameters.
#'
#' @return A tidy tibble.
#' @export
pld_umccrise <- function(pld) {
payload_okay(pld)
pdata <- pld[["data"]]
id <- pld[["orcabusId"]]
# collapse FastqListRowIds into single string
pdata[["tags"]][["tumorFastqListRowIds"]] <- pdata[["tags"]][["tumorFastqListRowIds"]] |>
paste(collapse = ", ")
pdata[["tags"]][["normalFastqListRowIds"]] <- pdata[["tags"]][["normalFastqListRowIds"]] |>
paste(collapse = ", ")
tags <- pdata[["tags"]] |>
tibble::as_tibble_row() |>
dplyr::mutate(orcabusId = id)
# remove trailing slashes from S3 directories
inputs <- pdata[["inputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("input_{x}")) |>
dplyr::mutate(orcabusId = id)
outputs <- pdata[["outputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("output_{x}")) |>
dplyr::mutate(orcabusId = id)
engpar <- pdata[["engineParameters"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("engparam_{x}")) |>
dplyr::mutate(orcabusId = id)
d <- tags |>
dplyr::left_join(inputs, by = "orcabusId") |>
dplyr::left_join(outputs, by = "orcabusId") |>
dplyr::left_join(engpar, by = "orcabusId")
return(d)
}
37 changes: 0 additions & 37 deletions R/meta_wts_tumor_only.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,40 +61,3 @@ meta_wts_tumor_only <- function(pmeta, status = "Succeeded") {
"gds_outdir_qualimap"
)
}

#' Payload Tidy wts
#'
#' @param pld List with wts workflow parameters.
#'
#' @return A tidy tibble.
#' @export
pld_wts <- function(pld) {
payload_okay(pld)
pdata <- pld[["data"]]
id <- pld[["orcabusId"]]
# collapse FastqListRowIds into single string
pdata[["tags"]][["tumorFastqListRowIds"]] <- pdata[["tags"]][["tumorFastqListRowIds"]] |>
paste(collapse = ", ")
tags <- pdata[["tags"]] |>
tibble::as_tibble_row() |>
dplyr::mutate(orcabusId = id)
inputs <- pdata[["inputs"]] |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("input_{x}")) |>
dplyr::mutate(orcabusId = id)
outputs <- pdata[["outputs"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("output_{x}")) |>
dplyr::mutate(orcabusId = id)
engpar <- pdata[["engineParameters"]] |>
purrr::map(\(x) x |> stringr::str_replace("/$", "")) |>
tibble::as_tibble_row() |>
rlang::set_names(\(x) glue("engparam_{x}")) |>
dplyr::mutate(orcabusId = id)
d <- tags |>
dplyr::left_join(inputs, by = "orcabusId") |>
dplyr::left_join(outputs, by = "orcabusId") |>
dplyr::left_join(engpar, by = "orcabusId")
return(d)
}
54 changes: 42 additions & 12 deletions R/orcabus.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ orca_query_url <- function(url, token = NULL) {
#' @return Workflow run ID.
#' @examples
#' \dontrun{
#' token <- orca_jwt() |> jwt_validate()
#' prid <- "20241110c01a1c76"
#' prid <- "202409303ed604f4"
#' token <- orca_jwt() |> jwt_validate()
#' wfrid <- orca_prid2wfrid(prid = prid, token = token)
#' }
#'
Expand Down Expand Up @@ -85,6 +86,15 @@ orca_wfrid2state <- function(wfrid, token, stage = "prod") {
#' wfrid <- "wfr.01JCARAVTXKG5581SRA1HKBTD3"
#' wfrid <- "wfr.01JCA5DZFD0T4MFQX0HHEEFBCH" # wts
#' wfrid <- "wfr.01JBX361HKV0V9WS96RAFG135T" # cttsov2
#' wfrid <- "wfr.01JD20PQGQDAEAWD5S9E6M10J6" # cttsov2
#' wfrid <- "wfr.01JD20PQGQTY8SNSEZTZ8XF5N9" # wgs-tn
#' wfrid <- "wfr.01JE08ZV50519JTVH3Z4N8BQV2" # bsshfastqcopy
#' wfrid <- "wfr.01JDK5068AEV2RDNMH970B5W71" # bclconvert-interop-qc
#' wfrid <- "wfr.01JDE02YAQT441W1D26F0DXZ8J" # oncoanalyser-wgts-dna-rna
#' wfrid <- "wfr.01JDCCGRN6D34XCNE8N2XMQ91Q" # oncoanalyser-wgts-rna
#' wfrid <- "wfr.01JDBW5T3T1SDMCW5GN3SK10F8" # ora-compression
#' wfrid <- "wfr.01JDCPRG7DGH1KA9X90Y4YBYZX" # pieriandx
#' wfrid <- "wfr.01JDGB7G746GBXYPKNH7PRQ39S" # rnasum
#' p <- orca_wfrid2payload(wfrid = wfrid, token = token)
#' }
#'
Expand All @@ -94,6 +104,8 @@ orca_wfrid2payload <- function(wfrid, token, stage = "prod") {
pld <- states |>
dplyr::filter(.data$status == "SUCCEEDED") |>
dplyr::pull("payload")
msg <- glue("For {wfrid} we had {nrow(states)} states and {length(pld)} SUCCEEDED statuses.")
assertthat::assert_that(length(pld) == 1, msg = msg)
ep <- glue("https://workflow.{stage}.umccr.org/api/v1/payload/{pld}")
orca_query_url(ep, token)
}
Expand All @@ -109,9 +121,16 @@ orca_wfrid2payload <- function(wfrid, token, stage = "prod") {
#' @return List of workflow payload.
#' @examples
#' \dontrun{
#' prid <- "20241110c01a1c76" # cttso
#' prid <- "2024111638b77605" # sash
#' prid <- "20241116dfee1aef" # oa-wgts-dna
#' prid <- "2024111514abc96a" # wgs-tn
#' prid <- "20241115cbfdaeea" # wgs-qc-rna
#' prid <- "202411154e2c74f3" # wgs-qc-dna
#' prid <- "202411231acb8163" # wgs-qc-dna
#' prid <- "2024111507e8ca78" # bclconvert
#' prid <- "202411152feba98c" # bclconvert-interopqc
#' token <- orca_jwt() |> jwt_validate()
#' # e.g. for a cttsov2 workflow
#' prid <- "20241110c01a1c76"
#' p <- orca_prid2wfpayload(prid = prid, token = token)
#' }
#'
Expand All @@ -136,12 +155,16 @@ orca_prid2wfpayload <- function(prid, token, stage = "prod") {
#' @return Tibble with results.
#' @examples
#' \dontrun{
#' token <- orca_jwt() |> jwt_validate()
#' libid <- "L2401591" # wgs
#' libid <- "L2401074" # wts # nothing
#' libid <- "L2401577" # wts
#' libid <- "L2401558" # cttsov2
#' libid <- "L2401608" # wgs
#' libid <- "L2401603" # wgs
#' libid <- "L2401610" # oa
#' libid <- "L2401445" # cttsov2
#' wf_name <- NULL
#' token <- orca_jwt() |> jwt_validate()
#' d <- orca_libid2workflows(libid = libid, token = token, wf_name = wf_name, page_size = 20)
#' }
#' @export
Expand Down Expand Up @@ -179,37 +202,44 @@ orca_libid2workflows <- function(libid, token, wf_name = NULL, page_size = 10, s
#' @param token JWT.
#' @param page_size Maximum number of rows to return.
#' @param stage Environment where API is deployed (prod, stg or dev).
#' @param status Run status.
#'
#' @examples
#' \dontrun{
#' token <- orca_jwt() |> jwt_validate()
#' wf_name <- NULL
#' wf_name <- "umccrise"
#' orca_workflow_list(wf_name = wf_name, token = token)
#' token <- orca_jwt() |> jwt_validate()
#' wfs <- orca_workflow_list(wf_name = wf_name, token = token, page_size = 500)
#' }
#' @return Tibble with results.
#'
#' @export
orca_workflow_list <- function(wf_name = NULL, token, page_size = 10, stage = "prod") {
orca_workflow_list <- function(wf_name = NULL, status = "SUCCEEDED", token, page_size = 10, stage = "prod") {
assertthat::assert_that(stage %in% orca_stages())
wf_name_qstring <- ""
if (!is.null(wf_name)) {
wf_name_qstring <- glue("&workflow__workflowName={wf_name}")
}
status_qstring <- ""
if (!is.null(status)) {
status_qstring <- glue("&status={status}")
}
ordering <- "-portal_run_id"
ep <- glue("https://workflow.{stage}.umccr.org/api/v1/workflowrun/")
url <- glue("{ep}?rowsPerPage={page_size}{wf_name_qstring}")
url <- glue("{ep}?rowsPerPage={page_size}&ordering={ordering}{wf_name_qstring}{status_qstring}")
cli::cli_alert_info("Query {url}")
x <- orca_query_url(url, token)
res <- x[["results"]]
d <- tibble::tibble(
orcabusId = res |> purrr::map_chr("orcabusId", .default = NA),
portalRunId = res |> purrr::map_chr("portalRunId", .default = NA),
executionId = res |> purrr::map_chr("executionId", .default = NA),
wfr_name = res |> purrr::map_chr("workflowRunName", .default = NA),
workflowRunName = res |> purrr::map_chr("workflowRunName", .default = NA),
comment = res |> purrr::map_chr("comment", .default = NA),
analysisRun = res |> purrr::map_chr("analysisRun", .default = NA),
wf_id = res |> purrr::map_chr(list("workflow", "orcabusId"), .default = NA),
wf_name = res |> purrr::map_chr(list("workflow", "workflowName"), .default = NA),
wf_version = res |> purrr::map_chr(list("workflow", "workflowVersion"), .default = NA),
workflowId = res |> purrr::map_chr(list("workflow", "orcabusId"), .default = NA),
workflowName = res |> purrr::map_chr(list("workflow", "workflowName"), .default = NA),
workflowVersion = res |> purrr::map_chr(list("workflow", "workflowVersion"), .default = NA),
currentStateOrcabusId = res |> purrr::map_chr(list("currentState", "orcabusId"), .default = NA),
currentStateStatus = res |> purrr::map_chr(list("currentState", "status"), .default = NA),
currentStateTimestamp = res |> purrr::map_chr(list("currentState", "timestamp"), .default = NA)
Expand Down
Loading

0 comments on commit 83be5b3

Please sign in to comment.