From c3a935464d1e48afdda27a39b408d9130ccc1360 Mon Sep 17 00:00:00 2001 From: Qiang Zhang <45829450+qzhang503@users.noreply.github.com> Date: Sat, 18 Mar 2023 14:37:32 -0500 Subject: [PATCH] v1.2.2.1 RAM and CPU management on peptide scoring. --- DESCRIPTION | 2 +- R/scores.R | 236 ++++++++++++++++++++++++++++++++-------------------- 2 files changed, 145 insertions(+), 93 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index b6a76a0..8b5e977 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: proteoM Type: Package Title: Database Searches of Proteomic Mass-spectrometrirc Data -Version: 1.2.2.0 +Version: 1.2.2.1 Authors@R: person(given = "Qiang", family = "Zhang", diff --git a/R/scores.R b/R/scores.R index 40a8a73..3a7ce7f 100644 --- a/R/scores.R +++ b/R/scores.R @@ -634,7 +634,6 @@ calc_pepscores <- function (topn_ms2ions = 100L, type_ms2ions = "by", # Check priors pat_i <- "^ion_matches_" - list_i <- find_targets(out_path, pattern = pat_i)$files len_i <- length(list_i) @@ -683,7 +682,6 @@ calc_pepscores <- function (topn_ms2ions = 100L, type_ms2ions = "by", rm(list = c("cache_pars", "call_pars")) } - n_cores <- detect_cores(16L) d2 <- calc_threeframe_ppm(ppm_ms2) * 1E-6 for (fi in list_i) { @@ -804,25 +802,21 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", return (dfb) } + tempdir <- create_dir(file.path(out_path, "sc_temp")) + df[["uniq_id"]] <- paste(df[["scan_num"]], df[["raw_file"]], sep = "@") esscols <- c("ms2_moverz", "ms2_int", "matches", "ms2_n", "uniq_id") - path_df2 <- file.path(out_path, "df2_sc_temp.rda") + path_df2 <- file.path(tempdir, "df2_sc_temp.rda") df2 <- df[, -which(names(df) %in% esscols), drop = FALSE] qs::qsave(df2, path_df2, preset = "fast") + df <- df[, esscols, drop = FALSE] rm(list = "df2") gc() - - df <- df[, esscols, drop = FALSE] # otherwise, chunksplit return NULL # -> res[[i]] <- NULL # -> length(res) shortened by 1 - n_cores <- detect_cores(32L) - - ## don't change (RAM) - n_chunks <- n_cores^2 - if (n_rows <= 5000L) { probs <- calc_pepprobs_i( df, @@ -837,37 +831,78 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", digits = digits) } else { - if (n_rows > 100000L) { - n_cores2 <- ceiling(n_cores * 1.5) - n_chunks2 <- n_cores2^2 + path_df <- file.path(tempdir, "df_sc_temp.rda") + max_rows <- 100000L + + n_cores <- detect_cores(48L) + cl <- parallel::makeCluster(getOption("cl.cores", n_cores)) + parallel::clusterExport(cl, list("calc_pepprobs_i", "scalc_pepprobs", + "calc_probi", "calc_probi_bypep", + "calc_probi_byvmods", "add_seions", + "find_ppm_outer_bycombi", "match_ex2th2", + "add_primatches"), + envir = environment(proteoM:::scalc_pepprobs)) + + if (n_rows > max_rows) { + dfs <- suppressWarnings(chunksplit(df, ceiling(n_rows/max_rows), "row")) + len <- length(dfs) + nms <- paste0("sc", 1:len, ".rds") + mapply(qs::qsave, dfs, file.path(tempdir, nms), MoreArgs = list(preset = "fast")) + rm(list = "dfs") + gc() + + df <- df[, -which(names(df) == "matches"), drop = FALSE] + qs::qsave(df, path_df, preset = "fast") + rm(list = "df", envir = environment()) + gc() + + probs <- vector("list", len) + + for (i in seq_len(len)) { + dfi <- suppressWarnings( + chunksplit(qs::qread(file.path(tempdir, nms[[i]])), n_cores, "row")) + + probs[[i]] <- parallel::clusterApply(cl, dfi, + calc_pepprobs_i, + topn_ms2ions = topn_ms2ions, + type_ms2ions = type_ms2ions, + ppm_ms2 = ppm_ms2, + soft_secions = soft_secions, + out_path = out_path, + min_ms2mass = min_ms2mass, + d2 = d2, + index_mgf_ms2 = index_mgf_ms2, + digits = digits) + probs[[i]] <- dplyr::bind_rows(probs[[i]]) + } + + parallel::stopCluster(cl) + rm(list = c("dfi")) + gc() + + probs <- dplyr::bind_rows(probs) + + df <- qs::qread(path_df) } else { - n_cores2 <- n_cores - n_chunks2 <- n_chunks - } - - dfs <- if (is.null(df)) - list(df) - else - suppressWarnings(chunksplit(df, n_chunks2, "row")) - - path_df <- file.path(out_path, "df_sc_temp.rda") - df <- df[, -which(names(df) == "matches"), drop = FALSE] - qs::qsave(df, path_df, preset = "fast") - rm(list = "df", envir = environment()) - gc() - - if ((length(dfs) >= n_chunks2)) { - cl <- parallel::makeCluster(getOption("cl.cores", n_cores2)) - - parallel::clusterExport(cl, list("calc_pepprobs_i", "scalc_pepprobs", - "calc_probi", "calc_probi_bypep", - "calc_probi_byvmods", "add_seions", - "find_ppm_outer_bycombi", "match_ex2th2", - "add_primatches"), - envir = environment(proteoM:::scalc_pepprobs)) + dfs <- suppressWarnings(chunksplit(df, n_cores, "row")) - probs <- parallel::clusterApplyLB(cl, dfs, + # a case that `chunksplit` did not successfully split + if (is.data.frame(dfs)) { + probs <- calc_pepprobs_i( + dfs, + topn_ms2ions = topn_ms2ions, + type_ms2ions = type_ms2ions, + ppm_ms2 = ppm_ms2, + soft_secions = soft_secions, + out_path = out_path, + min_ms2mass = min_ms2mass, + d2 = d2, + index_mgf_ms2 = index_mgf_ms2, + digits = digits) + } + else { + probs <- parallel::clusterApply(cl, dfs, calc_pepprobs_i, topn_ms2ions = topn_ms2ions, type_ms2ions = type_ms2ions, @@ -878,39 +913,14 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", d2 = d2, index_mgf_ms2 = index_mgf_ms2, digits = digits) - - parallel::stopCluster(cl) - rm(list = c("cl", "dfs")) - gc() - - probs <- dplyr::bind_rows(probs) - } - else { - # a case that `chunksplit` did not successfully split - if (is.data.frame(dfs)) - dfs <- list(dfs) - - probs <- lapply(dfs, calc_pepprobs_i, - topn_ms2ions = topn_ms2ions, - type_ms2ions = type_ms2ions, - ppm_ms2 = ppm_ms2, - soft_secions = soft_secions, - out_path = out_path, - min_ms2mass = min_ms2mass, - d2 = d2, - index_mgf_ms2 = index_mgf_ms2, - digits = digits) - - probs <- dplyr::bind_rows(probs) - - rm(list = c("dfs")) - gc() + + parallel::stopCluster(cl) + rm(list = c("dfs")) + gc() + + probs <- dplyr::bind_rows(probs) + } } - - df <- qs::qread(path_df) - unlink(path_df) - rm(list = c("path_df", "n_cores2", "n_chunks2")) - gc() } ## Reassemble `df` @@ -920,8 +930,6 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", gc() df2 <- qs::qread(path_df2) - unlink(path_df2) - df <- dplyr::bind_cols(df, df2) rm(list = c("df2", "path_df2")) gc() @@ -936,25 +944,11 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", ## Outputs qs::qsave(df[, cols_lt, drop = FALSE], file_lt, preset = "fast") - if (nrow(df) > 10000L) { - np_cores <- 8L - cl <- parallel::makeCluster(getOption("cl.cores", np_cores)) - - df <- parallel::clusterApply( - cl, - suppressWarnings(chunksplit(df, np_cores^2, "row")), - add_primatches, - add_ms2theos = add_ms2theos, - add_ms2theos2 = add_ms2theos2, - add_ms2moverzs = add_ms2moverzs, - add_ms2ints = add_ms2ints, - index_mgf_ms2 = index_mgf_ms2) - - df <- dplyr::bind_rows(df) - - parallel::stopCluster(cl) - } - else { + message("\tAdding theoretical MS2 m/z and intensity values: ", Sys.time()) + + n_rows <- nrow(df) + + if (n_rows <= 10000L) { df <- add_primatches( df = df, add_ms2theos = add_ms2theos, @@ -963,6 +957,62 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", add_ms2ints = add_ms2ints, index_mgf_ms2 = index_mgf_ms2) } + else { + cl <- parallel::makeCluster(getOption("cl.cores", n_cores)) + parallel::clusterExport(cl, list("add_primatches"), + envir = environment(proteoM:::add_primatches)) + + max_theos <- 500000L + + if (n_rows > max_theos) { + dfs <- suppressWarnings(chunksplit(df, ceiling(n_rows/max_theos), "row")) + rm(list = c("df"), envir = environment()) + gc() + + len <- length(dfs) + nms <- paste0("ms2theos", 1:len, ".rds") + mapply(qs::qsave, dfs, file.path(tempdir, nms), MoreArgs = list(preset = "fast")) + rm(list = c("dfs"), envir = environment()) + gc() + + df <- vector("list", len) + + for (i in seq_len(len)) { + df[[i]] <- suppressWarnings( + chunksplit(qs::qread(file.path(tempdir, nms[[i]])), n_cores, "row")) + + df[[i]] <- parallel::clusterApply(cl, df[[i]], + add_primatches, + add_ms2theos = add_ms2theos, + add_ms2theos2 = add_ms2theos2, + add_ms2moverzs = add_ms2moverzs, + add_ms2ints = add_ms2ints, + index_mgf_ms2 = index_mgf_ms2) + df[[i]] <- dplyr::bind_rows(df[[i]]) + } + + parallel::stopCluster(cl) + gc() + + df <- dplyr::bind_rows(df) + } + else { + df <- parallel::clusterApply(cl, + suppressWarnings(chunksplit(df, n_cores, "row")), + add_primatches, + add_ms2theos = add_ms2theos, + add_ms2theos2 = add_ms2theos2, + add_ms2moverzs = add_ms2moverzs, + add_ms2ints = add_ms2ints, + index_mgf_ms2 = index_mgf_ms2) + parallel::stopCluster(cl) + gc() + + df <- dplyr::bind_rows(df) + } + } + + message("\tCompleted theoretical MS2 m/z and intensity values: ", Sys.time()) df[["pep_isdecoy"]] <- ifelse(is.na(df[["pep_ivmod"]]), TRUE, FALSE) @@ -972,6 +1022,8 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by", df <- df[, cols_sc, drop = FALSE] qs::qsave(df, file_sc, preset = "fast") + unlink(tempdir, recursive = TRUE) + invisible(df) } @@ -1053,7 +1105,7 @@ add_primatches <- function (df, add_ms2theos = FALSE, add_ms2theos2 = FALSE, m2s[[i]] <- mt2$m p1s.[[i]] <- ps1 - # if (i %% 5000L == 0L) gc() + if (i %% 5000L == 0L) gc() } if (index_mgf_ms2) {