Skip to content

Commit

Permalink
v1.2.2.1
Browse files Browse the repository at this point in the history
RAM and CPU management on peptide scoring.
  • Loading branch information
qzhang503 committed Mar 18, 2023
1 parent 27eb93b commit c3a9354
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 93 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: proteoM
Type: Package
Title: Database Searches of Proteomic Mass-spectrometrirc Data
Version: 1.2.2.0
Version: 1.2.2.1
Authors@R:
person(given = "Qiang",
family = "Zhang",
Expand Down
236 changes: 144 additions & 92 deletions R/scores.R
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ calc_pepscores <- function (topn_ms2ions = 100L, type_ms2ions = "by",

# Check priors
pat_i <- "^ion_matches_"

list_i <- find_targets(out_path, pattern = pat_i)$files
len_i <- length(list_i)

Expand Down Expand Up @@ -683,7 +682,6 @@ calc_pepscores <- function (topn_ms2ions = 100L, type_ms2ions = "by",
rm(list = c("cache_pars", "call_pars"))
}

n_cores <- detect_cores(16L)
d2 <- calc_threeframe_ppm(ppm_ms2) * 1E-6

for (fi in list_i) {
Expand Down Expand Up @@ -804,25 +802,21 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
return (dfb)
}

tempdir <- create_dir(file.path(out_path, "sc_temp"))

df[["uniq_id"]] <- paste(df[["scan_num"]], df[["raw_file"]], sep = "@")
esscols <- c("ms2_moverz", "ms2_int", "matches", "ms2_n", "uniq_id")
path_df2 <- file.path(out_path, "df2_sc_temp.rda")
path_df2 <- file.path(tempdir, "df2_sc_temp.rda")
df2 <- df[, -which(names(df) %in% esscols), drop = FALSE]
qs::qsave(df2, path_df2, preset = "fast")
df <- df[, esscols, drop = FALSE]
rm(list = "df2")
gc()

df <- df[, esscols, drop = FALSE]

# otherwise, chunksplit return NULL
# -> res[[i]] <- NULL
# -> length(res) shortened by 1

n_cores <- detect_cores(32L)

## don't change (RAM)
n_chunks <- n_cores^2

if (n_rows <= 5000L) {
probs <- calc_pepprobs_i(
df,
Expand All @@ -837,37 +831,78 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
digits = digits)
}
else {
if (n_rows > 100000L) {
n_cores2 <- ceiling(n_cores * 1.5)
n_chunks2 <- n_cores2^2
path_df <- file.path(tempdir, "df_sc_temp.rda")
max_rows <- 100000L

n_cores <- detect_cores(48L)
cl <- parallel::makeCluster(getOption("cl.cores", n_cores))
parallel::clusterExport(cl, list("calc_pepprobs_i", "scalc_pepprobs",
"calc_probi", "calc_probi_bypep",
"calc_probi_byvmods", "add_seions",
"find_ppm_outer_bycombi", "match_ex2th2",
"add_primatches"),
envir = environment(proteoM:::scalc_pepprobs))

if (n_rows > max_rows) {
dfs <- suppressWarnings(chunksplit(df, ceiling(n_rows/max_rows), "row"))
len <- length(dfs)
nms <- paste0("sc", 1:len, ".rds")
mapply(qs::qsave, dfs, file.path(tempdir, nms), MoreArgs = list(preset = "fast"))
rm(list = "dfs")
gc()

df <- df[, -which(names(df) == "matches"), drop = FALSE]
qs::qsave(df, path_df, preset = "fast")
rm(list = "df", envir = environment())
gc()

probs <- vector("list", len)

for (i in seq_len(len)) {
dfi <- suppressWarnings(
chunksplit(qs::qread(file.path(tempdir, nms[[i]])), n_cores, "row"))

probs[[i]] <- parallel::clusterApply(cl, dfi,
calc_pepprobs_i,
topn_ms2ions = topn_ms2ions,
type_ms2ions = type_ms2ions,
ppm_ms2 = ppm_ms2,
soft_secions = soft_secions,
out_path = out_path,
min_ms2mass = min_ms2mass,
d2 = d2,
index_mgf_ms2 = index_mgf_ms2,
digits = digits)
probs[[i]] <- dplyr::bind_rows(probs[[i]])
}

parallel::stopCluster(cl)
rm(list = c("dfi"))
gc()

probs <- dplyr::bind_rows(probs)

df <- qs::qread(path_df)
}
else {
n_cores2 <- n_cores
n_chunks2 <- n_chunks
}

dfs <- if (is.null(df))
list(df)
else
suppressWarnings(chunksplit(df, n_chunks2, "row"))

path_df <- file.path(out_path, "df_sc_temp.rda")
df <- df[, -which(names(df) == "matches"), drop = FALSE]
qs::qsave(df, path_df, preset = "fast")
rm(list = "df", envir = environment())
gc()

if ((length(dfs) >= n_chunks2)) {
cl <- parallel::makeCluster(getOption("cl.cores", n_cores2))

parallel::clusterExport(cl, list("calc_pepprobs_i", "scalc_pepprobs",
"calc_probi", "calc_probi_bypep",
"calc_probi_byvmods", "add_seions",
"find_ppm_outer_bycombi", "match_ex2th2",
"add_primatches"),
envir = environment(proteoM:::scalc_pepprobs))
dfs <- suppressWarnings(chunksplit(df, n_cores, "row"))

probs <- parallel::clusterApplyLB(cl, dfs,
# a case that `chunksplit` did not successfully split
if (is.data.frame(dfs)) {
probs <- calc_pepprobs_i(
dfs,
topn_ms2ions = topn_ms2ions,
type_ms2ions = type_ms2ions,
ppm_ms2 = ppm_ms2,
soft_secions = soft_secions,
out_path = out_path,
min_ms2mass = min_ms2mass,
d2 = d2,
index_mgf_ms2 = index_mgf_ms2,
digits = digits)
}
else {
probs <- parallel::clusterApply(cl, dfs,
calc_pepprobs_i,
topn_ms2ions = topn_ms2ions,
type_ms2ions = type_ms2ions,
Expand All @@ -878,39 +913,14 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
d2 = d2,
index_mgf_ms2 = index_mgf_ms2,
digits = digits)

parallel::stopCluster(cl)
rm(list = c("cl", "dfs"))
gc()

probs <- dplyr::bind_rows(probs)
}
else {
# a case that `chunksplit` did not successfully split
if (is.data.frame(dfs))
dfs <- list(dfs)

probs <- lapply(dfs, calc_pepprobs_i,
topn_ms2ions = topn_ms2ions,
type_ms2ions = type_ms2ions,
ppm_ms2 = ppm_ms2,
soft_secions = soft_secions,
out_path = out_path,
min_ms2mass = min_ms2mass,
d2 = d2,
index_mgf_ms2 = index_mgf_ms2,
digits = digits)

probs <- dplyr::bind_rows(probs)

rm(list = c("dfs"))
gc()

parallel::stopCluster(cl)
rm(list = c("dfs"))
gc()

probs <- dplyr::bind_rows(probs)
}
}

df <- qs::qread(path_df)
unlink(path_df)
rm(list = c("path_df", "n_cores2", "n_chunks2"))
gc()
}

## Reassemble `df`
Expand All @@ -920,8 +930,6 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
gc()

df2 <- qs::qread(path_df2)
unlink(path_df2)

df <- dplyr::bind_cols(df, df2)
rm(list = c("df2", "path_df2"))
gc()
Expand All @@ -936,25 +944,11 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
## Outputs
qs::qsave(df[, cols_lt, drop = FALSE], file_lt, preset = "fast")

if (nrow(df) > 10000L) {
np_cores <- 8L
cl <- parallel::makeCluster(getOption("cl.cores", np_cores))

df <- parallel::clusterApply(
cl,
suppressWarnings(chunksplit(df, np_cores^2, "row")),
add_primatches,
add_ms2theos = add_ms2theos,
add_ms2theos2 = add_ms2theos2,
add_ms2moverzs = add_ms2moverzs,
add_ms2ints = add_ms2ints,
index_mgf_ms2 = index_mgf_ms2)

df <- dplyr::bind_rows(df)

parallel::stopCluster(cl)
}
else {
message("\tAdding theoretical MS2 m/z and intensity values: ", Sys.time())

n_rows <- nrow(df)

if (n_rows <= 10000L) {
df <- add_primatches(
df = df,
add_ms2theos = add_ms2theos,
Expand All @@ -963,6 +957,62 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
add_ms2ints = add_ms2ints,
index_mgf_ms2 = index_mgf_ms2)
}
else {
cl <- parallel::makeCluster(getOption("cl.cores", n_cores))
parallel::clusterExport(cl, list("add_primatches"),
envir = environment(proteoM:::add_primatches))

max_theos <- 500000L

if (n_rows > max_theos) {
dfs <- suppressWarnings(chunksplit(df, ceiling(n_rows/max_theos), "row"))
rm(list = c("df"), envir = environment())
gc()

len <- length(dfs)
nms <- paste0("ms2theos", 1:len, ".rds")
mapply(qs::qsave, dfs, file.path(tempdir, nms), MoreArgs = list(preset = "fast"))
rm(list = c("dfs"), envir = environment())
gc()

df <- vector("list", len)

for (i in seq_len(len)) {
df[[i]] <- suppressWarnings(
chunksplit(qs::qread(file.path(tempdir, nms[[i]])), n_cores, "row"))

df[[i]] <- parallel::clusterApply(cl, df[[i]],
add_primatches,
add_ms2theos = add_ms2theos,
add_ms2theos2 = add_ms2theos2,
add_ms2moverzs = add_ms2moverzs,
add_ms2ints = add_ms2ints,
index_mgf_ms2 = index_mgf_ms2)
df[[i]] <- dplyr::bind_rows(df[[i]])
}

parallel::stopCluster(cl)
gc()

df <- dplyr::bind_rows(df)
}
else {
df <- parallel::clusterApply(cl,
suppressWarnings(chunksplit(df, n_cores, "row")),
add_primatches,
add_ms2theos = add_ms2theos,
add_ms2theos2 = add_ms2theos2,
add_ms2moverzs = add_ms2moverzs,
add_ms2ints = add_ms2ints,
index_mgf_ms2 = index_mgf_ms2)
parallel::stopCluster(cl)
gc()

df <- dplyr::bind_rows(df)
}
}

message("\tCompleted theoretical MS2 m/z and intensity values: ", Sys.time())

df[["pep_isdecoy"]] <- ifelse(is.na(df[["pep_ivmod"]]), TRUE, FALSE)

Expand All @@ -972,6 +1022,8 @@ calcpepsc <- function (file, topn_ms2ions = 100L, type_ms2ions = "by",
df <- df[, cols_sc, drop = FALSE]
qs::qsave(df, file_sc, preset = "fast")

unlink(tempdir, recursive = TRUE)

invisible(df)
}

Expand Down Expand Up @@ -1053,7 +1105,7 @@ add_primatches <- function (df, add_ms2theos = FALSE, add_ms2theos2 = FALSE,
m2s[[i]] <- mt2$m
p1s.[[i]] <- ps1

# if (i %% 5000L == 0L) gc()
if (i %% 5000L == 0L) gc()
}

if (index_mgf_ms2) {
Expand Down

0 comments on commit c3a9354

Please sign in to comment.