Skip to content

Commit

Permalink
[r,c++] Support dense nd array write case (#2730)
Browse files Browse the repository at this point in the history
* Support dense nd array write case

* With miscellaneous test updates

* Adjust test command
  • Loading branch information
eddelbuettel authored Jun 20, 2024
1 parent 7814aaf commit 9cfbdca
Show file tree
Hide file tree
Showing 32 changed files with 188 additions and 195 deletions.
24 changes: 14 additions & 10 deletions .github/workflows/r-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,30 @@ jobs:
# - name: Update Packages
# run: Rscript -e 'update.packages(ask=FALSE)'

# - name: Build Package
# run: cd apis/r && R CMD build --no-build-vignettes --no-manual .
- name: Build Package
run: cd apis/r && R CMD build --no-build-vignettes --no-manual .

# - name: Install Package
# run: cd apis/r && R CMD INSTALL $(ls -1tr *.tar.gz | tail -1)
- name: Install Package
run: cd apis/r && R CMD INSTALL $(ls -1tr *.tar.gz | tail -1)

# - name: Diagnostics
# run: Rscript -e 'print(Sys.info())'

# - name: Downgrade TileDB-R if needed
# run: cd apis/r && Rscript tools/controlled_downgrade.R

#- name: Test
# if: ${{ matrix.covr == 'no' }}
# run: |
# cd apis/r
# Rscript -e "install.packages('devtools')" \
# -e "devtools::install(upgrade = FALSE)" \
# -e "testthat::test_local('tests/testthat', load_package = 'installed')"

- name: Test
if: ${{ matrix.covr == 'no' }}
run: |
cd apis/r
Rscript -e "install.packages('devtools')" \
-e "devtools::install(upgrade = FALSE)" \
-e "testthat::test_local('tests/testthat', load_package = 'installed')"
run: cd apis/r/tests && Rscript testthat.R

- name: Coverage
if: ${{ matrix.os == 'ubuntu-latest' && matrix.covr == 'yes' && github.event_name == 'workflow_dispatch' }}
run: apis/r/tools/r-ci.sh coverage
Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ createSchemaFromArrow <- function(uri, nasp, nadimap, nadimsp, sparse, datatype,
invisible(.Call(`_tiledbsoma_createSchemaFromArrow`, uri, nasp, nadimap, nadimsp, sparse, datatype, pclst, ctxptr))
}

writeArrayFromArrow <- function(uri, naap, nasp, config = NULL) {
invisible(.Call(`_tiledbsoma_writeArrayFromArrow`, uri, naap, nasp, config))
writeArrayFromArrow <- function(uri, naap, nasp, arraytype = "", config = NULL) {
invisible(.Call(`_tiledbsoma_writeArrayFromArrow`, uri, naap, nasp, arraytype, config))
}

reindex_create <- function() {
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/SOMADataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ SOMADataFrame <- R6::R6Class(
df <- as.data.frame(values)[schema_names]
arr <- self$object

writeArrayFromArrow(self$uri, naap, nasp)
writeArrayFromArrow(self$uri, naap, nasp, "SOMADataFrame")

invisible(self)
},
Expand Down
18 changes: 14 additions & 4 deletions apis/r/R/SOMADenseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ SOMADenseNDArray <- R6::R6Class(

tbl <- self$read_arrow_table(coords = coords, result_order = result_order, log_level = log_level)
m <- matrix(as.numeric(tbl$GetColumnByName("soma_data")),
nrow = nrow,
ncol = ncol,
nrow = nrow, ncol = ncol,
byrow = result_order == "ROW_MAJOR")

},
Expand All @@ -111,6 +110,7 @@ SOMADenseNDArray <- R6::R6Class(
write = function(values, coords = NULL) {
private$check_open_for_write()

spdl::debug("[SOMADenseNDArray::write] entered")
stopifnot(
"'values' must be a matrix" = is.matrix(values)
)
Expand All @@ -128,11 +128,21 @@ SOMADenseNDArray <- R6::R6Class(

arr <- self$object
tiledb::query_layout(arr) <- "COL_MAJOR"
arr[] <- values
spdl::debug("[SOMADenseNDArray::write] about to call write")
arrsch <- arrow::schema(arrow::field("soma_data", private$.type))
tbl <- arrow::arrow_table(soma_data = values, schema = arrsch)

spdl::debug("[SOMADenseNDArray::write] array created")
naap <- nanoarrow::nanoarrow_allocate_array()
nasp <- nanoarrow::nanoarrow_allocate_schema()
arrow::as_record_batch(tbl)$export_to_c(naap, nasp)
#arr[] <- values
writeArrayFromArrow(self$uri, naap, nasp, "SOMADenseNDArray")
spdl::debug("[SOMADenseNDArray::write] written")

# tiledb-r always closes the array after a write operation so we need to
# manually reopen it until close-on-write is optional
self$open("WRITE", internal_use_only = "allowed_use")
#self$open("WRITE", internal_use_only = "allowed_use")
invisible(self)
}
),
Expand Down
126 changes: 37 additions & 89 deletions apis/r/R/SOMANDArrayBase.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,104 +29,52 @@ SOMANDArrayBase <- R6::R6Class(
"factory method as e.g. 'SOMASparseNDArrayCreate()'."), call. = FALSE)
}

tdb_schema <- private$.build_tiledb_schema(
type = type,
shape = shape,
is_sparse = private$.is_sparse,
platform_config = platform_config
)

# create array
tiledb::tiledb_array_create(uri = self$uri, schema = tdb_schema)
self$open("WRITE", internal_use_only = "allowed_use")
private$write_object_type_metadata()
self
}
),
## .is_sparse field is being set by dense and sparse private initialisers, respectively
private$.type <- type # Arrow schema type of data

private = list(
.is_sparse = NULL,
#spdl::warn("[SOMANDArrayBase::create] type cached as {}", private$.type)

.build_tiledb_schema = function(
type,
shape,
is_sparse,
platform_config = NULL
) {

stopifnot(
"'type' must be a valid Arrow type" =
is_arrow_data_type(type),
"'shape' must be a vector of positive integers" =
is.vector(shape) && all(shape > 0),
"'is_sparse' must be a scalar logical" = is_scalar_logical(is_sparse)
)
dom_ext_tbl <- get_domain_and_extent_array(shape)

# Parse the tiledb/create/ subkeys of the platform_config into a handy,
# typed, queryable data structure.
tiledb_create_options <- TileDBCreateOptions$new(platform_config)
##print(str(tiledb_create_options$to_list(FALSE)))

## we transfer to the arrow table via a pair of array and schema pointers
dnaap <- nanoarrow::nanoarrow_allocate_array()
dnasp <- nanoarrow::nanoarrow_allocate_schema()
arrow::as_record_batch(dom_ext_tbl)$export_to_c(dnaap, dnasp)

## we need a schema pointer to transfer the schema information
## so we first embed the (single column) 'type' into a schema and
## combine it with domain schema
schema <- arrow::unify_schemas(arrow::schema(dom_ext_tbl),
arrow::schema(arrow::field("soma_data", type)))
nasp <- nanoarrow::nanoarrow_allocate_schema()
schema$export_to_c(nasp)

## create array
ctxptr <- super$tiledbsoma_ctx$context()
createSchemaFromArrow(uri = self$uri, nasp, dnaap, dnasp,
private$.is_sparse, "SOMADenseNDArray",
tiledb_create_options$to_list(FALSE), ctxptr@ptr)

# Default zstd filter to use if none is specified in platform config
default_zstd_filter <- list(
name = "ZSTD",
COMPRESSION_LEVEL = tiledb_create_options$dataframe_dim_zstd_level()
)

# create array dimensions
tdb_dims <- vector(mode = "list", length = length(shape))
for (i in seq_along(shape)) {
dim_info <- private$.dim_capacity_and_extent(
name = paste0("soma_dim_", i - 1L),
shape = shape[i],
create_options = tiledb_create_options
)

tdb_dims[[i]] <- tiledb::tiledb_dim(
name = dim_info$name,
domain = bit64::as.integer64(c(0L, dim_info$capacity - 1L)),
tile = bit64::as.integer64(dim_info$extent),
type = "INT64",
filter_list = tiledb::tiledb_filter_list(
filters = tiledb_create_options$dim_filters(
dim_name = dim_info$name,
default = list(default_zstd_filter)
)
)
)
}
self$open("WRITE", internal_use_only = "allowed_use")
private$write_object_type_metadata()
self
},

# attribute filters
tdb_attr_filters <- tiledb::tiledb_filter_list(
filters = tiledb_create_options$attr_filters(
attr_name = "soma_data",
default = list(default_zstd_filter)
))

# create array attribute
tdb_attr <- tiledb::tiledb_attr(
name = "soma_data",
type = tiledb_type_from_arrow_type(type, is_dim=FALSE),
filter_list = tdb_attr_filters
)
## needed eg after open() to set (Arrow) tyoe
set_data_type = function(type) {
spdl::debug("[SOMANDArrayBase::set_data_type] caching type {}", type$ToString())
private$.type <- type
}
),

# array schema
cell_tile_orders <- tiledb_create_options$cell_tile_orders()
tiledb::tiledb_array_schema(
domain = tiledb::tiledb_domain(tdb_dims),
attrs = tdb_attr,
sparse = private$.is_sparse,
cell_order = cell_tile_orders["cell_order"],
tile_order = cell_tile_orders["tile_order"],
capacity = tiledb_create_options$capacity(),
allows_dups = tiledb_create_options$allows_duplicates(),
offsets_filter_list = tiledb::tiledb_filter_list(
tiledb_create_options$offsets_filters()
),
validity_filter_list = tiledb::tiledb_filter_list(
tiledb_create_options$validity_filters()
)
)
},
private = list(
.is_sparse = NULL,
.type = NULL,

# Given a user-specified shape along a particular dimension, returns a named
# list containing name, capacity, and extent elements.
Expand Down
7 changes: 7 additions & 0 deletions apis/r/R/SOMASparseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ SOMASparseNDArray <- R6::R6Class(
if (!is.null(private$tiledb_timestamp)) {
arr@timestamp <- private$tiledb_timestamp
}
## spdl::debug("[SOMASparseNDArray] '.write_coo_dataframe' layout '{}' is_sparse '{}' ",
## tiledb::query_layout(arr), private$.is_sparse)
## if (!private$.is_sparse && tiledb::query_layout(arr) == "UNORDERED") {
## tiledb::query_layout(arr) <- "GLOBAL_ORDER"
## cat("*********", tiledb::query_layout(arr), "*****\n")
## print(arr)
## }
arr[] <- values
},

Expand Down
5 changes: 5 additions & 0 deletions apis/r/R/TileDBArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ TileDBArray <- R6::R6Class(
private$.tiledb_array <- tiledb::tiledb_array_open_at(self$object, type = mode,
timestamp = private$tiledb_timestamp)
}

## TODO -- cannot do here while needed for array case does not work for data frame case
#tdbtype <- tiledb::datatype(tiledb::attrs(tiledb::schema(private$.tiledb_array))[[1]])
#private$.type <- arrow_type_from_tiledb_type(tdbtype)

private$update_metadata_cache()
self
},
Expand Down
9 changes: 5 additions & 4 deletions apis/r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ BEGIN_RCPP
END_RCPP
}
// writeArrayFromArrow
void writeArrayFromArrow(const std::string& uri, naxpArray naap, naxpSchema nasp, Rcpp::Nullable<Rcpp::CharacterVector> config);
RcppExport SEXP _tiledbsoma_writeArrayFromArrow(SEXP uriSEXP, SEXP naapSEXP, SEXP naspSEXP, SEXP configSEXP) {
void writeArrayFromArrow(const std::string& uri, naxpArray naap, naxpSchema nasp, const std::string arraytype, Rcpp::Nullable<Rcpp::CharacterVector> config);
RcppExport SEXP _tiledbsoma_writeArrayFromArrow(SEXP uriSEXP, SEXP naapSEXP, SEXP naspSEXP, SEXP arraytypeSEXP, SEXP configSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< const std::string& >::type uri(uriSEXP);
Rcpp::traits::input_parameter< naxpArray >::type naap(naapSEXP);
Rcpp::traits::input_parameter< naxpSchema >::type nasp(naspSEXP);
Rcpp::traits::input_parameter< const std::string >::type arraytype(arraytypeSEXP);
Rcpp::traits::input_parameter< Rcpp::Nullable<Rcpp::CharacterVector> >::type config(configSEXP);
writeArrayFromArrow(uri, naap, nasp, config);
writeArrayFromArrow(uri, naap, nasp, arraytype, config);
return R_NilValue;
END_RCPP
}
Expand Down Expand Up @@ -308,7 +309,7 @@ END_RCPP

static const R_CallMethodDef CallEntries[] = {
{"_tiledbsoma_createSchemaFromArrow", (DL_FUNC) &_tiledbsoma_createSchemaFromArrow, 8},
{"_tiledbsoma_writeArrayFromArrow", (DL_FUNC) &_tiledbsoma_writeArrayFromArrow, 4},
{"_tiledbsoma_writeArrayFromArrow", (DL_FUNC) &_tiledbsoma_writeArrayFromArrow, 5},
{"_tiledbsoma_reindex_create", (DL_FUNC) &_tiledbsoma_reindex_create, 0},
{"_tiledbsoma_reindex_map", (DL_FUNC) &_tiledbsoma_reindex_map, 2},
{"_tiledbsoma_reindex_lookup", (DL_FUNC) &_tiledbsoma_reindex_lookup, 2},
Expand Down
24 changes: 17 additions & 7 deletions apis/r/src/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ void createSchemaFromArrow(const std::string& uri,
//ctx_wrap_t* ctxwrap_p = new ContextWrapper(ctxsp); // create wrapper struct
//ctxptr = make_xptr<ctx_wrap_t>(ctxwrap_p, false); // and create and assign extptr
}

// create the ArraySchema
auto as = tdbs::ArrowAdapter::tiledb_schema_from_arrow_schema(ctxsp, std::move(schema),
std::pair(std::move(dimarr),
Expand All @@ -110,6 +109,7 @@ void createSchemaFromArrow(const std::string& uri,

// [[Rcpp::export]]
void writeArrayFromArrow(const std::string& uri, naxpArray naap, naxpSchema nasp,
const std::string arraytype = "",
Rcpp::Nullable<Rcpp::CharacterVector> config = R_NilValue) {

//struct ArrowArray* ap = (struct ArrowArray*) R_ExternalPtrAddr(naap);
Expand All @@ -123,6 +123,14 @@ void writeArrayFromArrow(const std::string& uri, naxpArray naap, naxpSchema nasp
nanoarrow::UniqueArray ap{nanoarrow_array_from_xptr(naap)};
nanoarrow::UniqueSchema sp{nanoarrow_schema_from_xptr(nasp)};

// now move nanoarrow unique arrays (created from objects handed from R) into
// proper unique pointers to arrow schema and array
auto schema = std::make_unique<ArrowSchema>();
sp.move(schema.get());
auto array = std::make_unique<ArrowArray>();
ap.move(array.get());

// if we hae a coonfig, use it
std::shared_ptr<tdbs::SOMAContext> somactx;
if (config.isNotNull()) {
std::map<std::string, std::string> smap;
Expand All @@ -138,14 +146,16 @@ void writeArrayFromArrow(const std::string& uri, naxpArray naap, naxpSchema nasp
somactx = std::make_shared<tdbs::SOMAContext>();
}

auto arrup = tdbs::SOMADataFrame::open(OpenMode::write, uri, somactx);

auto schema = std::make_unique<ArrowSchema>();
sp.move(schema.get());
auto array = std::make_unique<ArrowArray>();
ap.move(array.get());
std::shared_ptr<tdbs::SOMAArray> arrup;
if (arraytype == "SOMADataFrame") {
arrup = tdbs::SOMADataFrame::open(OpenMode::write, uri, somactx);
} else if (arraytype == "SOMADenseNDArray") {
arrup = tdbs::SOMADenseNDArray::open(OpenMode::write, uri, somactx,
"unnamed", {}, "auto", ResultOrder::colmajor);
}

arrup.get()->set_array_data(std::move(schema), std::move(array));
arrup.get()->write();
arrup.get()->close();

}
2 changes: 1 addition & 1 deletion apis/r/tests/testthat.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ library(testthat)
library(tiledbsoma)

tiledbsoma::show_package_versions()
test_check("tiledbsoma", reporter=default_reporter())
test_check("tiledbsoma", reporter=ParallelProgressReporter)
3 changes: 3 additions & 0 deletions apis/r/tests/testthat/helper-test-soma-objects.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ create_and_populate_var <- function(uri, nrows = 10L, seed = 1, mode = NULL) {
)
)

dname <- dirname(uri)
if (!dir.exists(dname)) dir.create(dname)

sdf <- SOMADataFrameCreate(uri, tbl$schema, index_column_names = "soma_joinid")
sdf$write(tbl)

Expand Down
2 changes: 1 addition & 1 deletion apis/r/tests/testthat/test-EphemeralMeasurement.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
test_that("Ephemeral Measurement mechanics", {
# Create the measurement
uri <- withr::local_tempdir("ephemeral-ms")
uri <- tempfile(pattern="ephemeral-ms")
expect_warning(EphemeralMeasurement$new(uri))
expect_no_condition(measurement <- EphemeralMeasurement$new())
expect_true(grepl('^ephemeral-collection:0x[[:digit:]a-f]{6,32}$', measurement$uri))
Expand Down
2 changes: 1 addition & 1 deletion apis/r/tests/testthat/test-Factory.R
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ test_that("SparseNDArray Factory", {
expect_equal(s3$mode(), "CLOSED")
})

test_that("SparseNDArray Factory", {
test_that("DenseNDArray Factory", {
skip_if(!extended_tests())
uri <- tempfile()

Expand Down
Loading

0 comments on commit 9cfbdca

Please sign in to comment.