Skip to content

Commit

Permalink
Tmp files for PSOCK cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanKierans committed Mar 18, 2024
1 parent 1478129 commit 721f21b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 6 deletions.
70 changes: 70 additions & 0 deletions R/makePSOCKcluster.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Functions for makePSOCKcluster, and potential makeSOCKcluster

insert_instrumentation_on_new_proc <- function()
{
flag_debug <- FALSE

func_ptrs <- get_function_list()
num_func_ptrs <- sum(get_num_functions())

function_exception_list <- get_function_exception_list()
function_methods_exception_list <- get_function_methods(function_exception_list)

assign_regionRef_array_client(num_function_ptrs)
open_otf2_regionRef_sockets()

## Starting new here
for (func_index in 1:num_func_ptrs){

func_ptr <- func_ptrs[[func_index]]
func_name <- names(func_ptrs)[[func_index]]
#package_name <- names(func_ptrs)[[func_index]]
env <- environment(func_ptrs[[func_index]])
package_name <- environmentName(env)

## DEBUGGING - Display current function (before checks)
if (flag_debug) {
# print("#######################################")
# print(paste0("func_index: ", func_index))
print(func_ptr)
print(func_name)
print(env)
print(paste0("package: ", package_name, ", function: ", func_name))
}

flag_user_function=FALSE
if (env==.GlobalEnv){flag_user_function=TRUE}

#if (env == NULL){ print(paste0("NULL env, func_name: ", func_name)) }
print(paste0("func_name: ", func_name))

## Test if function should be skipped
if ( skip_function(func_ptr, func_name, env, function_exception_list, function_methods_exception_list)) {
print(paste0("Skipping: ", func_name))
next; # skip to next loop
}

## Get otf2 regionRef
regionRef <- get_regionReg_from_array_client(func_index)
if (pkg.env$PRINT_INSTRUMENTS) {
print(paste0("INSTRUMENTING: function `", func_name,"`",
", regionRef: ", regionRef))
}

## Wrap function with debug info
insert_instrumentation(func_ptr, func_name, func_index,
regionRef, package_name,
env_is_locked=!pkg.env$UNLOCK_ENVS,
flag_user_function=flag_user_function)

}
close_otf2_regionRef_sockets()
}


#if (FALSE){


# ## Label as instrumented in instrumentation dataframe
# pkg.env$PROFILE_INSTRUMENTATION_DF[["function_instrumented"]][func_global_index] <- TRUE
#
12 changes: 6 additions & 6 deletions R/r_instrument_ll.R
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ skip_function <- function(func_ptr, func_name, env,
return(TRUE)
}

## 3 - Skip if primitive function - DEBUGGING (some are problematic)
if ( is.primitive(func_ptr) ) {
if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` is PRIMITVE function"))
return(TRUE)
}

## Skip if function not defined in current package
if ( !exists(func_name, envir = env, inherits=T)) {
if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` DOES NOT exist in package env: ", env))
Expand All @@ -455,12 +461,6 @@ skip_function <- function(func_ptr, func_name, env,
return(TRUE)
}

## 3 - Skip if primitive function - DEBUGGING (some are problematic)
if ( is.primitive(func_ptr) ) {
if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` is PRIMITVE function"))
return(TRUE)
}

## 4 - Skip if not language body - DEBUGGING (symbol in na.null() was causing issues)
if ( typeof(body(func_ptr)) != "language" ) {
if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` body is type: ", typeof(body(func_ptr))))
Expand Down
38 changes: 38 additions & 0 deletions src/makePSOCKcluster.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
void *context;
void *regionRef_socket_client;
void *regionRef_socket_server;
int *regionReg_array;

//int *regionReg_array = malloc(num_func * sizeof(*region_ref_vs_func_index));
//free(regionReg_array);

// Confirm num_functions, then send all regionRef in order
void open_otf2_regionRef_sockets_server(){
context = zmq_ctx_new ();
regionRef_socket_server = zmq_socket (context, ZMQ_PUSH);
int rc = zmq_bind (regionRef_socket_server, "tcp://*:5558");

int zmq_ret = zmq_send(regionRef_socket_server, regionReg_array, num_functions*sizeof(*region_ref_vs_func_index));
}

RcppExport SEXP open_otf2_regionRef_sockets_clients(){
context = zmq_ctx_new ();
regionRef_socket_client = zmq_socket (context, ZMQ_PULL);
zmq_bind (regionRef_socket_client, "tcp://localhost:5558");

int zmq_ret = zmq_recv();
}

RcppExport SEXP assign_regionRef_array_client(int num_function_ptrs){
regionReg_array = malloc(num_function_ptrs*sizeof(*regionRef_array));
return(R_NilValue);
}

RcppExport SEXP get_regionRef_from_array_client(int func_index){
return(regionRef_array[func_index]);
}

RcppExport SEXP free_regionRef_array_client(){
free(regionReg_array);
return(R_NilValue);
}

0 comments on commit 721f21b

Please sign in to comment.