From 19481489976b31afb3a3a7e5a89e0c86b6fa83d4 Mon Sep 17 00:00:00 2001 From: abriz97 Date: Wed, 25 Sep 2024 14:42:29 +0100 Subject: [PATCH] [+] Improve implementation of multiple runners. - Addressed different path specifications for different runners. In summary, a `gsub` is performed to replace paths written in the PBS files. - Path specifications are to be written in the jobs_runner.csv file. - Fix a couple small bugs and mis-naming in different scripts. --- .../Rk1521_03_make_deep_sequence_alignments.R | 32 ++++++++-- .../Rk1521_04_make_trees.R | 59 ++++++++++++++----- misc_data_analysis_RCCS1521/jobs_runner.csv | 5 ++ misc_data_analysis_RCCS1521/runall_1521.sh | 20 +++++-- misc_data_analysis_RCCS1521/runall_dryrun.sh | 20 +++++-- misc_data_analysis_RCCS1521/utility.R | 35 +++++++++++ 6 files changed, 137 insertions(+), 34 deletions(-) create mode 100644 misc_data_analysis_RCCS1521/jobs_runner.csv diff --git a/misc_data_analysis_RCCS1521/Rk1521_03_make_deep_sequence_alignments.R b/misc_data_analysis_RCCS1521/Rk1521_03_make_deep_sequence_alignments.R index e10f013..4c65a9e 100644 --- a/misc_data_analysis_RCCS1521/Rk1521_03_make_deep_sequence_alignments.R +++ b/misc_data_analysis_RCCS1521/Rk1521_03_make_deep_sequence_alignments.R @@ -625,7 +625,7 @@ if(args$walltime_idx == 3 & args$pqeelab){ }] } -if (args$split_jobs_by_n == 1 | nrow(djob) <= 1000 ){ +if (split_jobs_by_n == 1 | nrow(djob) <= 1000 ){ ids <- submit_jobs_from_djob(djob, output_type = "id") # qsub alignment step again, # to check whether everything has run... @@ -638,13 +638,33 @@ if (args$split_jobs_by_n == 1 | nrow(djob) <= 1000 ){ }else{ # split djob in split_jobs_by_n data.tables # then perform the above individually - person_to_run <- rep(1:args$split_jobs_by_n, length.out = nrow(djob)) + person_to_run <- rep(1:split_jobs_by_n, length.out = nrow(djob)) + submit_user_script <- NA_character_ - for (person in 1:args$split_djobs_list){ - # Submit job for each person, and write a sh file they can run to submit + for (person in 1:split_jobs_by_n){ + + # Subset to jobs for specific person djob_person <- djob[person_to_run == person] + # Adapt the job specifications according to the person/runner + djob_person <- adapt_jobspecs_to_runner( + djob_person, + drunners, + idx = person + ) + # write the pbs files pbs_file_person <- submit_jobs_from_djob(djob_person, output_type = "outfile") - write(pbs_file_person, - file = file.path(args$out.dir.work, paste0('submit_jobs_', person, '.sh'))) + # Append the pbs files to the script that each user can submit to queue them + submit_user_script <- append_pbs_file_person( + script = submit_user_script, + pbs = pbs_file_person, + usr = drunners[index == person, user_name] + ) } + + # Write the script that each user can submit + outfile = file.path(args$out.dir.work, 'submit_user_readali.sh') + write( submit_user_script, file = outfile) + } + +cat("End of script\n") diff --git a/misc_data_analysis_RCCS1521/Rk1521_04_make_trees.R b/misc_data_analysis_RCCS1521/Rk1521_04_make_trees.R index c64f048..338ae63 100644 --- a/misc_data_analysis_RCCS1521/Rk1521_04_make_trees.R +++ b/misc_data_analysis_RCCS1521/Rk1521_04_make_trees.R @@ -94,11 +94,11 @@ option_list <- list( dest = "walltime_idx" ), optparse::make_option( - "--split_jobs_by_n", - type = "integer", - default = 0L, - help = "Number of people that can run scripts. This will create at least `split_jobs_by_n` scripts (unless remaining tasks < 500)", - dest = "split_jobs_by_n" + "--csv-runners", + type = "character", + default = NA_character_, + help = "Path to csv file containing the specifications for each person running the job. If NA, the jobs will be run by the user running this script. (defaults to NA)", + dest = "runners" ), optparse::make_option( "--dryrun", @@ -351,7 +351,15 @@ djob[, JOB_ID := rep(1:n_jobs, each = max.per.run)[idx] ] djob2 <- djob[, .(CMD=.write.job(.SD)), by=JOB_ID] -if ( args$split_jobs_by_n == 1 | nrow(djob) <= 1000 ) +# Load runners specifications +split_jobs_by_n <- 1 +if(!is.na(args$runners)) +{ + drunners <- fread(args$runners) + split_jobs_by_n <- nrow(drunners) +} + +if ( split_jobs_by_n == 1 | nrow(djob) <= 1000 ) { ids <- submit_jobs_from_djob(djob2, output_type = "id", prefix = "srx") @@ -366,21 +374,40 @@ if ( args$split_jobs_by_n == 1 | nrow(djob) <= 1000 ) # split djob in split_jobs_by_n data.tables # then perform the above individually - person_to_run <- rep(1:args$split_jobs_by_n, length.out = nrow(djob2)) + # by first taking care of different path specifications among runners - for (person in 1:args$split_djobs_list){ - # Submit job for each person, and write a sh file they can run to submit - djob_person <- djob2[person_to_run == person] - pbs_file_person <- submit_jobs_from_djob(djob_person, output_type = "outfile", prefix = "srx") + person_to_run <- rep(1:split_jobs_by_n, length.out = nrow(djob2)) + submit_user_script <- NA_character_ - # TODO: add something that changes the PBS specifications for each person. + for (person in 1:split_jobs_by_n){ - write(pbs_file_person, - file = file.path(args$out.dir.work, paste0('submit_jobs_', person, '.sh'))) + # Subset to jobs for specific person + djob_person <- djob2[person_to_run == person] + # Adapt the specifications according to the person/runner + djob_person <- adapt_jobspecs_to_runner( + djob_person, + drunners, + idx = person + ) + # write the pbs files + pbs_file_person <- submit_jobs_from_djob( + djob_person, + output_type = "outfile", + prefix = "srx" + ) + # Append the pbs files to the script that each user can submit to queue them + submit_user_script <- append_pbs_file_person( + script = submit_user_script, + pbs = pbs_file_person, + usr = drunners[index == person, user_name] + ) } - # TODO: write a script that each person can submit: + # Write the script that each user can submit + outfile = file.path(args$out.dir.work, 'submit_user_srx.sh') + write( submit_user_script, file = outfile) +} +cat("End of script\n") -} diff --git a/misc_data_analysis_RCCS1521/jobs_runner.csv b/misc_data_analysis_RCCS1521/jobs_runner.csv new file mode 100644 index 0000000..de4bfff --- /dev/null +++ b/misc_data_analysis_RCCS1521/jobs_runner.csv @@ -0,0 +1,5 @@ +index, user_name, path_software, path_phyloscanner +1, ab1820, /rds/general/user/ab1820/home/git/Phyloscanner.R.utilities, /rds/general/user/ab1820/home/git/phyloscanner +2, ablenkin, /rds/general/user/ablenkin/TODO/Phyloscanner.R.utilities, /rds/general/user/TODO/phyloscanner +3, xx4515, /rds/general/user/xx4515/TODO/Phyloscanner.R.utilities, /rds/general/user/TODO/phyloscanner +4, or105, /rds/general/user/or105/TODO/Phyloscanner.R.utilities, /rds/general/user/TODO/phyloscanner diff --git a/misc_data_analysis_RCCS1521/runall_1521.sh b/misc_data_analysis_RCCS1521/runall_1521.sh index c33ea5f..57c1a00 100644 --- a/misc_data_analysis_RCCS1521/runall_1521.sh +++ b/misc_data_analysis_RCCS1521/runall_1521.sh @@ -14,13 +14,14 @@ then echo "OPTIONs:" echo " STEP : one of ali, btr, atr [default: none]" echo " RES: determines resources fr pbs jobs (1 to 3) [default: 1]" - echo " N_JOB_RUNNER: number of people that can submit jobs [default: 1]" + echo " MULTI_RUNNER: indicates whether many runners job runners are required as specified by jobs_runner.csv [default: FALSE]" exit 1 fi ${RES:=1} ${REDO:=0} -${N_JOB_RUNNER:=1} +${MULTI_RUNNER:=FALSE} + echo "running '${STEP:=none}' analysis" # This includes all code necessary to run PHSC pipeline to produce TSI estimates @@ -38,6 +39,13 @@ controller="$software_path/$PBS_JOBNAME" #current script location # For TSI's 25 is fine, for networks 10: sliding_width=10 +if [ "$MULTI_RUNNER" = "TRUE" ]; then + runner_cmd = "--csv-runners $software_path/jobs_runner.csv" +else + runner_cmd = "" +fi + + CLUSIZE='50' DATE='2024-09-23' @@ -73,8 +81,8 @@ case $STEP in --controller $controller \ --walltime_idx $RES \ --phsc-runs $input_samples \ + $runner_cmd \ --tsi_analysis FALSE \ - --split_jobs_by_n $N_JOB_RUNNER else Rscript $software_path/Rk1521_03_make_deep_sequence_alignments.R \ --out_dir_base $out_dir_base \ @@ -92,8 +100,8 @@ case $STEP in --walltime_idx $RES \ --phsc-runs $input_samples \ --date $DATE \ - --tsi_analysis FALSE \ - --split_jobs_by_n $N_JOB_RUNNER + $runner_cmd \ + --tsi_analysis FALSE fi ;; @@ -108,8 +116,8 @@ case $STEP in --env_name "phylostan" \ --date $DATE \ --controller $controller \ + $runner_cmd \ --walltime_idx $RES \ - --split_jobs_by_n $N_JOB_RUNNER ;; # DOUBLE CHECK HERE AGAINST ORIGINAL!!! diff --git a/misc_data_analysis_RCCS1521/runall_dryrun.sh b/misc_data_analysis_RCCS1521/runall_dryrun.sh index 0ec79fd..65eace7 100644 --- a/misc_data_analysis_RCCS1521/runall_dryrun.sh +++ b/misc_data_analysis_RCCS1521/runall_dryrun.sh @@ -13,13 +13,14 @@ then echo "OPTIONs:" echo " STEP : one of ali, btr, atr [default: none]" echo " RES: determines resources fr pbs jobs (1 to 3) [default: 1]" - echo " N_JOB_RUNNER: number of people that can submit jobs [default: 1]" + echo " MULTI_RUNNER: indicates whether many runners job runners are required as specified by jobs_runner.csv [default: FALSE]" exit 1 fi ${RES:=1} ${REDO:=0} -${N_JOB_RUNNER:=1} +${MULTI_RUNNER:=FALSE} + echo "running '${STEP:=none}' analysis" # This includes all code necessary to run PHSC pipeline to produce TSI estimates @@ -37,6 +38,13 @@ controller="$software_path/$PBS_JOBNAME" #current script location # For TSI's 25 is fine, for networks 10: sliding_width=10 +if [ "$MULTI_RUNNER" = "TRUE" ]; then + runner_cmd = "--csv-runners $software_path/jobs_runner.csv" +else + runner_cmd = "" +fi + + CLUSIZE='50' DATE='2024-09-23' @@ -72,8 +80,8 @@ case $STEP in --controller $controller \ --walltime_idx $RES \ --tsi_analysis FALSE \ - --runs $input_samples \ - --split_jobs_by_n $N_JOB_RUNNER \ + --phsc-runs $input_samples \ + $runner_cmd \ --dryrun else @@ -93,8 +101,8 @@ case $STEP in --walltime_idx $RES \ --date $DATE \ --tsi_analysis FALSE \ - --runs $input_samples \ - --split_jobs_by_n $N_JOB_RUNNER \ + --phsc-runs $input_samples \ + $runner_cmd \ --dryrun fi ;; diff --git a/misc_data_analysis_RCCS1521/utility.R b/misc_data_analysis_RCCS1521/utility.R index 0b32d42..3c0f704 100644 --- a/misc_data_analysis_RCCS1521/utility.R +++ b/misc_data_analysis_RCCS1521/utility.R @@ -505,3 +505,38 @@ submit_jobs_from_djob <- function(DT, output_type="id", prefix = "readali"){ cat('Submitted job ids are:', pbs_files, '...\n') return(pbs_files) } + + +adapt_jobspecs_to_runner <- function(DT, DR, idx) +{ + # DT usually a djob; + # DR usually a data.table with the runner specs + # idx is the index of the runner for which to adapt the job specs + + values_to_replace <- names(DR)[names(DR) %like% 'path'] + + for (name in names(DR)){ + pat <- DR[index == 1, get(name)] + rep <- DR[index == idx, get(name)] + DT$CMD <- gsub( + x=DT$CMD, + pattern = pat, + replacemnet = rep) + } + + return(DT) +} + +append_pbs_file_person <- function(script, pbs, usr) +{ + stopifnot("Only one user allowed in `append_pbs_file_person`" = length(usr) == 1) + pbs <- paste("qsub", pbs) + pbs <- paste(pbs, collapse = '\n') + to_append <- sprintf( + 'if [ $USER == "%s"]; then + %s + fi', + usr, pbs) + new_script <- paste(script, to_append, sep = '\n\n') + return(new_script) +}