Skip to content

Commit

Permalink
Improve readers by parallelizing I/O and compute operations (#5401)
Browse files Browse the repository at this point in the history
Today when a reader issues the I/O request to VFS, we block waiting for
all I/O to finish before moving to unfiltering. We then block again
waiting for unfiltering to be done for all tiles and then continue to
processing the results.

This PR is part1 of the effort to minimize wait all points in reader
code : it removes the need to wait for all I/O to be done, and uses
async tasks to signal when a tile is done reading so that it can proceed
to unfiltering.

Part2 will come in a future PR for using async tasks for unfiltering as
well in order to remove then need to wait for a tile is done unfiltering
so that it can proceed to result processing before copying to the user
buffers.

[sc-59605]

---
TYPE: IMPROVEMENT
DESC: Improve readers by parallelizing I/O and compute operations

---------

Co-authored-by: Seth Shelnutt <[email protected]>
Co-authored-by: Seth Shelnutt <[email protected]>
  • Loading branch information
3 people authored Feb 10, 2025
1 parent 3c617e3 commit 1ca277c
Show file tree
Hide file tree
Showing 23 changed files with 416 additions and 143 deletions.
5 changes: 4 additions & 1 deletion test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down Expand Up @@ -685,7 +685,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
20 changes: 16 additions & 4 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -230,7 +233,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -326,7 +332,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -343,7 +352,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
12 changes: 7 additions & 5 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,9 +1993,10 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~3000 + 8) will be bigger than the per fragment
// budget (1000).
memory_.total_budget_ = "35000";
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result
// tile will be loaded each time.
memory_.total_budget_ = "60000";
memory_.ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -2518,8 +2519,9 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~4000 + 8) will be bigger than the per fragment
// budget (1000).
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded
// each time.
memory_.total_budget_ = "40000";
memory_.ratio_coords_ = "0.22";
update_config();
Expand Down
7 changes: 5 additions & 2 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,12 @@ TEST_CASE_METHOD(

if (one_frag) {
CHECK(1 == loop_num->second);
} else {
CHECK(9 == loop_num->second);
}
/**
* We can't do a similar check for multiple fragments as it is architecture
* dependent how many tiles fit in the memory budget. And thus also
* architecture dependent as to how many internal loops we have.
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords(
auto output_view = span<std::byte>(
reinterpret_cast<std::byte*>(output_buffer->data()), uncompressed_size);
auto offsets_view = span<uint64_t>(
offsets_tile->data_as<offsets_t>(), uncompressed_offsets_size);
offsets_tile->data_as_unsafe<offsets_t>(), uncompressed_offsets_size);

if (compressor_ == Compressor::RLE) {
uint8_t rle_len_bytesize, string_len_bytesize;
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ Status FilterPipeline::run_reverse(
// If the pipeline is empty, just copy input to output.
if (filters_.empty()) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer));
continue;
}
Expand All @@ -484,7 +484,7 @@ Status FilterPipeline::run_reverse(
bool last_filter = filter_idx == 0;
if (last_filter) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(output_data.set_fixed_allocation(
output_chunk_buffer, chunk.unfiltered_data_size_));
reader_stats->add_counter(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker};
tracker,
std::nullopt};
}

void run_reverse(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker);
memory_tracker,
std::nullopt);
}

/** Returns the size of the original unfiltered data. */
Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -135,7 +136,8 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -147,7 +149,8 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
59 changes: 32 additions & 27 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ Status DenseReader::dense_read() {
// processing.
if (qc_coords_mode_) {
t_start = t_end;
if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}
continue;
}

Expand Down Expand Up @@ -769,8 +772,8 @@ DenseReader::compute_result_space_tiles(
const auto fragment_num = (unsigned)frag_tile_domains.size();
const auto& tile_coords = subarray.tile_coords();

// Keep track of the required memory to load the result space tiles. Split up
// filtered versus unfiltered. The memory budget is combined for all
// Keep track of the required memory to load the result space tiles. Split
// up filtered versus unfiltered. The memory budget is combined for all
// query condition attributes.
uint64_t required_memory_query_condition_unfiltered = 0;
std::vector<uint64_t> required_memory_unfiltered(
Expand All @@ -786,28 +789,28 @@ DenseReader::compute_result_space_tiles(
aggregate_only_field[n - condition_names.size()] = aggregate_only(name);
}

// Here we estimate the size of the tile structures. First, we have to account
// the size of the space tile structure. We could go deeper in the class to
// account for other things but for now we keep it simpler. Second, we try to
// account for the tile subarray (DenseTileSubarray). This class will have a
// vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we
// choose 32 for the size of the vector to anticipate the conversion to a PMR
// vector. We also add dim_num * 2 * sizeof(DimType) to account for at least
// one range per dimension (this should be improved by accounting for the
// exact number of ranges). Finally for the original range index member, we
// have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per
// dimension (this can also be improved by accounting for the
// exact number of ranges).
// Here we estimate the size of the tile structures. First, we have to
// account the size of the space tile structure. We could go deeper in the
// class to account for other things but for now we keep it simpler. Second,
// we try to account for the tile subarray (DenseTileSubarray). This class
// will have a vector of ranges per dimensions, so 1 + dim_num *
// sizeof(vector). Here we choose 32 for the size of the vector to
// anticipate the conversion to a PMR vector. We also add dim_num * 2 *
// sizeof(DimType) to account for at least one range per dimension (this
// should be improved by accounting for the exact number of ranges). Finally
// for the original range index member, we have to add 1 + dim_num *
// sizeof(vector) as well and one uint64_t per dimension (this can also be
// improved by accounting for the exact number of ranges).
uint64_t est_tile_structs_size =
sizeof(ResultSpaceTile<DimType>) + (1 + dim_num) * 2 * 32 +
dim_num * (2 * sizeof(DimType) + sizeof(uint64_t));

// Create the vector of result tiles to operate on. We stop once we reach
// the end or the memory budget. We either reach the tile upper memory limit,
// which is only for unfiltered data, or the limit of the available budget,
// which is for filtered data, unfiltered data and the tile structs. We try to
// process two tile batches at a time so the available memory is half of what
// we have available.
// the end or the memory budget. We either reach the tile upper memory
// limit, which is only for unfiltered data, or the limit of the available
// budget, which is for filtered data, unfiltered data and the tile structs.
// We try to process two tile batches at a time so the available memory is
// half of what we have available.
uint64_t t_end = t_start;
bool wait_compute_task_before_read = false;
bool done = false;
Expand Down Expand Up @@ -895,8 +898,8 @@ DenseReader::compute_result_space_tiles(
uint64_t tile_memory_filtered = 0;
uint64_t r_idx = n - condition_names.size();

// We might not need to load this tile into memory at all for aggregation
// only.
// We might not need to load this tile into memory at all for
// aggregation only.
if (aggregate_only_field[r_idx] &&
can_aggregate_tile_with_frag_md(
names[n], result_space_tile, tiles_cell_num[t_end])) {
Expand Down Expand Up @@ -953,13 +956,14 @@ DenseReader::compute_result_space_tiles(
required_memory_unfiltered[r_idx] +
est_tile_structs_size;

// Disable the multiple iterations if the tiles don't fit in the iteration
// budget.
// Disable the multiple iterations if the tiles don't fit in the
// iteration budget.
if (total_memory > available_memory_iteration) {
wait_compute_task_before_read = true;
}

// If a single tile doesn't fit in the available memory, we can't proceed.
// If a single tile doesn't fit in the available memory, we can't
// proceed.
if (total_memory > available_memory) {
throw DenseReaderException(
"Cannot process a single tile requiring " +
Expand Down Expand Up @@ -1003,7 +1007,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(
const auto& tile_coords = subarray.tile_coords();
const bool agg_only = name.has_value() && aggregate_only(name.value());

// If the result is already loaded in query condition, return the empty list;
// If the result is already loaded in query condition, return the empty
// list;
std::vector<ResultTile*> ret;
if (name.has_value() && condition_names.count(name.value()) != 0) {
return ret;
Expand Down Expand Up @@ -1033,8 +1038,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(

/**
* Apply the query condition. The computation will be pushed on the compute
* thread pool in `compute_task`. Callers should wait on this task before using
* the results of the query condition.
* thread pool in `compute_task`. Callers should wait on this task before
* using the results of the query condition.
*/
template <class DimType, class OffType>
Status DenseReader::apply_query_condition(
Expand Down
Loading

0 comments on commit 1ca277c

Please sign in to comment.