Skip to content

Commit

Permalink
Fixes for makePSOCK cluster, including placeholder event on slave procs
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanKierans committed Mar 26, 2024
1 parent c0d80d7 commit b0daaef
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 106 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export(instrumentation_disable)
export(instrumentation_enable)
export(instrumentation_finalize)
export(instrumentation_init)
export(instrumentation_wrapper)
export(is_instrumentation_enabled)
export(print_function_from_index)
export(skip_function)
Expand Down
37 changes: 22 additions & 15 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,6 @@ print_errnos <- function() {
.Call('_rTrace_print_errnos', PACKAGE = 'rTrace')
}

#' get_pid
get_pid <- function() {
.Call('_rTrace_get_pid', PACKAGE = 'rTrace')
}

#' get_tid
get_tid <- function() {
.Call('_rTrace_get_tid', PACKAGE = 'rTrace')
}

#' get_ppid
get_ppid <- function() {
.Call('_rTrace_get_ppid', PACKAGE = 'rTrace')
}

#' get_regionRef_array_master
#' @description Signal to server to send regionRef array to new procs
#' @param nprocs Number of new procs to update
Expand All @@ -144,6 +129,13 @@ get_regionRef_array_master <- function(nprocs) {
.Call('_rTrace_get_regionRef_array_master', PACKAGE = 'rTrace', nprocs)
}

#' stopCluster_master
#' @description Signal to end cluster
#' @return R_NilValue
stopCluster_master <- function() {
.Call('_rTrace_stopCluster_master', PACKAGE = 'rTrace')
}

#' get_regionRef_array_slave
#' @description Requests regionRef array from logger proc
#' @param num_funcs Total number of functions in R namespace
Expand All @@ -152,3 +144,18 @@ get_regionRef_array_slave <- function(num_funcs) {
.Call('_rTrace_get_regionRef_array_slave', PACKAGE = 'rTrace', num_funcs)
}

#' get_pid
get_pid <- function() {
.Call('_rTrace_get_pid', PACKAGE = 'rTrace')
}

#' get_tid
get_tid <- function() {
.Call('_rTrace_get_tid', PACKAGE = 'rTrace')
}

#' get_ppid
get_ppid <- function() {
.Call('_rTrace_get_ppid', PACKAGE = 'rTrace')
}

5 changes: 5 additions & 0 deletions R/r_exception_list.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ get_function_exception_list <- function() {
get('body<-'),
tryCatch, # Clobbering trace results
append)

# These functions contain on.exit() and blocks instrumentation insert
on.exit_functions <- c(R.utils::read.table)
function_exception_list <- append(function_exception_list, on.exit_functions)

if (R.utils::isPackageLoaded("R.utils")){
package_function_exception_list <- c(R.utils::isPackageLoaded)
function_exception_list <- append(function_exception_list, package_function_exception_list)
Expand Down
42 changes: 25 additions & 17 deletions R/r_fork_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# description: Unique functions wrappers for functions such as makeForkCluster which
# spawn new forked R procs. Extra work required to duplicate zeromq objects safely
# across multiple proces.
# TODO: massively clean up functions and naming convention


#' get_fork_function_list
Expand Down Expand Up @@ -207,8 +208,8 @@ get_fork_wrapper_expression <- function() {

# Renable instrumentation if necessary
if (INSTRUMENTATION_ENABLED_BEFORE){
instrumentation_enable(flag_ignore_depth=TRUE);
clusterEvalQ(cl, instrumentation_enable(flag_ignore_depth=TRUE));
instrumentation_enable();
clusterEvalQ(cl, instrumentation_enable(flag_reset_depth=TRUE));

pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH - 1
if ( pkg.env$FUNCTION_DEPTH < pkg.env$MAX_FUNCTION_DEPTH){
Expand All @@ -227,7 +228,7 @@ get_fork_wrapper_expression <- function() {

if (pkg.env$INSTRUMENTATION_ENABLED) {
NULL
## Append to depth counter
## Increment depth counter
pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH + 1

if (pkg.env$FUNCTION_DEPTH <= pkg.env$MAX_FUNCTION_DEPTH )
Expand All @@ -236,7 +237,7 @@ get_fork_wrapper_expression <- function() {
evtWriter_Write_client(X_regionRef_X,T)
}

instrumentation_disable(flag_ignore_depth=TRUE)
instrumentation_disable(flag_check_depth=F)
}

# Close socket on master before forking
Expand All @@ -250,21 +251,21 @@ get_fork_wrapper_expression <- function() {
#' get_end_fork_wrapper_expression
#' @description Returns wrapper expression
get_end_fork_wrapper_expression <- function() {
entry_exp <- expression( { ; # Sneaky ; here for debugging
entry_exp <- expression( {
# Save instrumentation state
INSTRUMENTATION_ENABLED_BEFORE <- is_instrumentation_enabled()

if (pkg.env$INSTRUMENTATION_ENABLED) {
# Append to depth counter
# Increment depth counter
pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH + 1

if (pkg.env$FUNCTION_DEPTH <= pkg.env$MAX_FUNCTION_DEPTH ) {
evtWriter_Write_client(X_regionRef_X,T) # OTF2 Enter event
}

## Disable instrumentation on all procs
clusterEvalQ(cl, { instrumentation_disable(flag_ignore_depth=TRUE) } )
instrumentation_disable(flag_ignore_depth=TRUE)
clusterEvalQ(cl, { instrumentation_disable(flag_update_measurement=F) } )
instrumentation_disable(flag_check_depth=F)

}

Expand All @@ -274,15 +275,18 @@ get_end_fork_wrapper_expression <- function() {
} )

exit_exp <- expression( {
on.exit( { ; # Sneaky ; here for debugging
on.exit( {
# Reopen sockets on Master clientside
open_EvtWriterSocket_client()

# End slave placeholder event
stopCluster_master()

# Restore instrumentation state
if (INSTRUMENTATION_ENABLED_BEFORE){
instrumentation_enable(flag_ignore_depth=TRUE)
instrumentation_enable()

# Deduct from depth counter
# Decrement depth counter
pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH - 1

if (pkg.env$FUNCTION_DEPTH <= pkg.env$MAX_FUNCTION_DEPTH ) {
Expand All @@ -308,24 +312,25 @@ get_psock_wrapper_expression <- function() {
INSTRUMENTATION_ENABLED_BEFORE <- is_instrumentation_enabled()

if (pkg.env$INSTRUMENTATION_ENABLED) {
## Append to depth counter
## Increment depth counter
pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH + 1

if (pkg.env$FUNCTION_DEPTH <= pkg.env$MAX_FUNCTION_DEPTH ) {
evtWriter_Write_client(X_regionRef_X,T)
}

instrumentation_disable(flag_ignore_depth=TRUE)
instrumentation_disable(flag_check_depth=F)
}
} )

exit_exp <- expression( {
on.exit({
# NOTE: Keep this in exit expression
nnodes <- length(names)

## DEBUGGING
print(paste0("makePSOCKcluster nnodes: ", nnodes))
#print(paste0("makePSOCKcluster names: ", names))
#print(paste0("makePSOCKcluster nnodes: ", nnodes))
#clusterEvalQ(cl, { print(paste0("FORK makeCluster - pid: ", get_pid(), ", tid: ", get_tid(), ", locationRef id: ", get_locationRef())) })

# Import required packages on slave
Expand All @@ -348,11 +353,13 @@ get_psock_wrapper_expression <- function() {

# Renable instrumentation if necessary
if (INSTRUMENTATION_ENABLED_BEFORE){
instrumentation_enable(flag_ignore_depth=TRUE);
clusterEvalQ(cl, instrumentation_enable(flag_ignore_depth=TRUE));
#instrumentation_enable(flag_reset_depth=TRUE);
instrumentation_enable();
clusterEvalQ(cl, instrumentation_enable(flag_reset_depth=TRUE));

# Decrement depth
pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH - 1
clusterEvalQ(cl, pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH-1);
#clusterEvalQ(cl, pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH-1);

if ( pkg.env$FUNCTION_DEPTH < pkg.env$MAX_FUNCTION_DEPTH){
evtWriter_Write_client(X_regionRef_X,F)
Expand Down Expand Up @@ -398,6 +405,7 @@ master_init_slave <- function(cl) {
parallel::clusterExport(cl, c("vars"), envir=environment())
parallel::clusterExport(cl, vars, envir=pkg.env)
parallel::clusterEvalQ(cl, {
FUNCTION_DEPTH <- 0 # Reset function depth
unlock_envs("rTrace")
for(n in vars) { assign(n, get(n, .GlobalEnv), pkg.env) }
lock_envs("rTrace")
Expand Down
45 changes: 34 additions & 11 deletions R/r_instrument_hl.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,42 @@
# SECTION - HIGH LEVEL, ENABLE/DISABLE INSTRUMENTATION
#######################################################################

# @TODO: zmq this
#' instrumentation_enable
#' @param flag_ignore_depth Boolean - Intended for developers, suppress depth warning
#' @param flag_reset_depth Boolean - Intended for developers, suppress depth warning
#' @description Enable instrumentation and reset function depth
#' @export
instrumentation_enable <- function(flag_ignore_depth=FALSE){
instrumentation_enable <- function(flag_reset_depth=FALSE){
if (is_instrumentation_enabled()){
message("Instrumentation already enabled!")
warning("Warning: Instrumentation already enabled!")
} else {
if (!flag_ignore_depth){ pkg.env$FUNCTION_DEPTH <- 0 }
evtWriter_MeasurementOnOff_client(TRUE)
}

# Reset depth counter if specified
if (flag_reset_depth){ pkg.env$FUNCTION_DEPTH <- 0 }

pkg.env$INSTRUMENTATION_ENABLED <- TRUE
invisible(NULL)
}

# @TODO: zmq this

#' instrumentation_disable
#' @description Disable instrumentation
#' @param flag_ignore_depth Boolean - Intended for developers, suppress depth warning
#' @param flag_update_measurement Boolean - Intended for developers, update measurement mode
#' @param flag_check_depth Boolean - Intended for developers, suppress depth warning if false
#' @export
instrumentation_disable <- function(flag_ignore_depth=FALSE){
instrumentation_disable <- function(flag_check_depth=T, flag_update_measurement=TRUE){
if (!is_instrumentation_enabled()){
warning("Warning: Instrumentation already disabled!")
}
else {
if ( (pkg.env$FUNCTION_DEPTH != 0 ) && !flag_ignore_depth ){
if ( (pkg.env$FUNCTION_DEPTH != 0) && flag_check_depth) {
warning(paste0("Warning: Function depth non-zero relative to start region. Depth: ", pkg.env$FUNCTION_DEPTH) )
}
pkg.env$INSTRUMENTATION_ENABLED <- FALSE
evtWriter_MeasurementOnOff_client(FALSE)
if (flag_update_measurement){
evtWriter_MeasurementOnOff_client(FALSE)
}
}
invisible(NULL)
}
Expand All @@ -49,7 +54,6 @@ is_instrumentation_enabled <- function() {
}


# @TODONE: zmq this
#' instrumentation_init
#' @description Create otf2 objs for instrumentation, and initiate global vars
#' @param flag_user_functions Boolean - TRUE to include user functions in dataframe
Expand Down Expand Up @@ -125,3 +129,22 @@ instrumentation_debug <- function(print_func_indexes = pkg.env$PRINT_FUNC_INDEXE
pkg.env$UNLOCK_ENVS <- unlock_env
invisible()
}

#' instrumentation_wrapper
#' @description Simple function to provider wrapper for instrumenting a single function call.
#' Ideal use if program contained in main(). Not intended to be used multiple times in one script
#' @param func Object - Function to call
#' @param ... Args - Function args
#' @export
instrumentation_wrapper <- function(func, ...)
{
instrumentation_init()
instrument_all_functions()

instrumentation_enable()
ret <- func(...) # Call actual function
instrumentation_disable()

instrumentation_finalize()
ret
}
6 changes: 4 additions & 2 deletions man/instrumentation_disable.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/instrumentation_enable.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions man/instrumentation_wrapper.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions man/stopCluster_master.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b0daaef

Please sign in to comment.