Skip to content

Commit

Permalink
Address last comment and fix test for windows
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Feb 6, 2025
1 parent 1d40d5c commit 2b490ad
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 75 deletions.
6 changes: 3 additions & 3 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,10 @@ TEST_CASE_METHOD(
write_1d_fragment(coords, &coords_size, data, &data_size);
}

// Two result tiles (2 * (2736 + 8)) = 5488 will be bigger than the per
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result
// tile will be loaded each time.
total_budget_ = "50000";
total_budget_ = "60000";
ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -1313,7 +1313,7 @@ TEST_CASE_METHOD(
write_1d_fragment(coords, &coords_size, data, &data_size);
}

// Two result tiles (2 * (2736 + 8)) = 5488 will be bigger than the per
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded
// each time.
total_budget_ = "40000";
Expand Down
3 changes: 1 addition & 2 deletions tiledb/sm/tile/generic_tile_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ shared_ptr<Tile> GenericTileIO::read_generic(
filtered_data.data(),
header.persisted_size,
memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO),
ThreadPool::SharedTask(),
true);
std::nullopt);

// Read the tile.
throw_if_not_ok(resources_.vfs().read(
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/tile/test/unit_tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TEST_CASE("Tile: Test basic IO", "[Tile][basic_io]") {
nullptr,
0,
tracker,
ThreadPool::SharedTask());
std::nullopt);
CHECK(tile.size() == tile_size);

// Create a buffer to write to the test Tile.
Expand Down
50 changes: 15 additions & 35 deletions tiledb/sm/tile/tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ shared_ptr<Tile> Tile::from_generic(
nullptr,
0,
memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO),
ThreadPool::SharedTask());
std::nullopt);
}

shared_ptr<WriterTile> WriterTile::from_generic(
Expand Down Expand Up @@ -109,15 +109,13 @@ TileBase::TileBase(
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool skip_waiting_on_io_task)
tdb::pmr::memory_resource* resource)
: resource_(resource)
, data_(tdb::pmr::make_unique<std::byte>(resource_, size))
, size_(size)
, cell_size_(cell_size)
, format_version_(format_version)
, type_(type)
, skip_waiting_on_io_task_(skip_waiting_on_io_task) {
, type_(type) {
/*
* We can check for a bad allocation after initialization without risk
* because none of the other member variables use its value for their own
Expand All @@ -137,8 +135,7 @@ Tile::Tile(
void* filtered_data,
uint64_t filtered_size,
shared_ptr<MemoryTracker> memory_tracker,
ThreadPool::SharedTask data_io_task,
const bool skip_waiting_on_io_task)
std::optional<ThreadPool::SharedTask> data_io_task)
: Tile(
format_version,
type,
Expand All @@ -148,8 +145,7 @@ Tile::Tile(
filtered_data,
filtered_size,
memory_tracker->get_resource(MemoryType::TILE_DATA),
std::move(data_io_task),
skip_waiting_on_io_task) {
std::move(data_io_task)) {
}

Tile::Tile(
Expand All @@ -161,15 +157,8 @@ Tile::Tile(
void* filtered_data,
uint64_t filtered_size,
tdb::pmr::memory_resource* resource,
ThreadPool::SharedTask filtered_data_io_task,
const bool skip_waiting_on_io_task)
: TileBase(
format_version,
type,
cell_size,
size,
resource,
skip_waiting_on_io_task)
std::optional<ThreadPool::SharedTask> filtered_data_io_task)
: TileBase(format_version, type, cell_size, size, resource)
, zipped_coords_dim_num_(zipped_coords_dim_num)
, filtered_data_(filtered_data)
, filtered_size_(filtered_size)
Expand All @@ -181,15 +170,13 @@ WriterTile::WriterTile(
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
shared_ptr<MemoryTracker> memory_tracker,
const bool skip_waiting_on_io_task)
shared_ptr<MemoryTracker> memory_tracker)
: TileBase(
format_version,
type,
cell_size,
size,
memory_tracker->get_resource(MemoryType::WRITER_TILE_DATA),
skip_waiting_on_io_task)
memory_tracker->get_resource(MemoryType::WRITER_TILE_DATA))
, filtered_buffer_(0) {
}

Expand All @@ -198,15 +185,8 @@ WriterTile::WriterTile(
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool skip_waiting_on_io_task)
: TileBase(
format_version,
type,
cell_size,
size,
resource,
skip_waiting_on_io_task)
tdb::pmr::memory_resource* resource)
: TileBase(format_version, type, cell_size, size, resource)
, filtered_buffer_(0) {
}

Expand Down Expand Up @@ -306,10 +286,10 @@ uint64_t Tile::load_chunk_data(
ChunkData& unfiltered_tile, uint64_t expected_original_size) {
assert(filtered());

if (!skip_waiting_on_io_task_) {
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.valid()) {
throw_if_not_ok(filtered_data_io_task_.wait());
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.has_value()) {
if (filtered_data_io_task_.value().valid()) {
throw_if_not_ok(filtered_data_io_task_.value().wait());
} else {
throw std::future_error(std::future_errc::no_state);
}
Expand Down
54 changes: 20 additions & 34 deletions tiledb/sm/tile/tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class TileBase {
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool skip_waiting_on_io_task);
tdb::pmr::memory_resource* resource);

DISABLE_COPY_AND_COPY_ASSIGN(TileBase);
DISABLE_MOVE_AND_MOVE_ASSIGN(TileBase);
Expand Down Expand Up @@ -182,11 +181,6 @@ class TileBase {

/** The tile data type. */
Datatype type_;

/**
* Whether to block waiting for io data to be ready before accessing data()
*/
const bool skip_waiting_on_io_task_;
};

/**
Expand Down Expand Up @@ -220,9 +214,6 @@ class Tile : public TileBase {
* @param filtered_size The filtered size to allocate.
* @param memory_tracker The memory resource to use.
* @param filtered_data_io_task The I/O task to wait on for data to be valid.
* @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and
* directly access data() or block. By default is false, so by default we
* block waiting. Used when we create generic tiles or in testing.
*/
Tile(
const format_version_t format_version,
Expand All @@ -233,8 +224,7 @@ class Tile : public TileBase {
void* filtered_data,
uint64_t filtered_size,
shared_ptr<MemoryTracker> memory_tracker,
ThreadPool::SharedTask filtered_data_io_task,
const bool skip_waiting_on_io_task = false);
std::optional<ThreadPool::SharedTask> filtered_data_io_task);

/**
* Constructor.
Expand All @@ -249,9 +239,6 @@ class Tile : public TileBase {
* @param filtered_size The filtered size to allocate.
* @param resource The memory resource to use.
* @param filtered_data_io_task The I/O task to wait on for data to be valid.
* @param skip_waiting_on_io_task whether to skip waiting on I/O tasks and
* directly access data() or block. By default is false, so by default we
* block waiting. Used when we create generic tiles or in testing.
*/
Tile(
const format_version_t format_version,
Expand All @@ -262,8 +249,7 @@ class Tile : public TileBase {
void* filtered_data,
uint64_t filtered_size,
tdb::pmr::memory_resource* resource,
ThreadPool::SharedTask filtered_data_io_task,
const bool skip_waiting_on_io_task = false);
std::optional<ThreadPool::SharedTask> filtered_data_io_task);

DISABLE_MOVE_AND_MOVE_ASSIGN(Tile);
DISABLE_COPY_AND_COPY_ASSIGN(Tile);
Expand Down Expand Up @@ -295,10 +281,10 @@ class Tile : public TileBase {

/** Returns the buffer that contains the filtered, on-disk format. */
inline char* filtered_data() {
if (!skip_waiting_on_io_task_) {
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.valid()) {
throw_if_not_ok(filtered_data_io_task_.wait());
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.has_value()) {
if (filtered_data_io_task_.value().valid()) {
throw_if_not_ok(filtered_data_io_task_.value().wait());
} else {
throw std::future_error(std::future_errc::no_state);
}
Expand All @@ -310,21 +296,23 @@ class Tile : public TileBase {
template <class T>
inline T* filtered_data_as() {
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.valid()) {
throw_if_not_ok(filtered_data_io_task_.wait());
} else {
throw std::future_error(std::future_errc::no_state);
if (filtered_data_io_task_.has_value()) {
if (filtered_data_io_task_.value().valid()) {
throw_if_not_ok(filtered_data_io_task_.value().wait());
} else {
throw std::future_error(std::future_errc::no_state);
}
}

return static_cast<T*>(filtered_data_);
}

/** Clears the filtered buffer. */
void clear_filtered_buffer() {
if (!skip_waiting_on_io_task_) {
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.valid()) {
throw_if_not_ok(filtered_data_io_task_.wait());
std::scoped_lock<std::recursive_mutex> lock{filtered_data_io_task_mtx_};
if (filtered_data_io_task_.has_value()) {
if (filtered_data_io_task_.value().valid()) {
throw_if_not_ok(filtered_data_io_task_.value().wait());
} else {
throw std::future_error(std::future_errc::no_state);
}
Expand Down Expand Up @@ -425,7 +413,7 @@ class Tile : public TileBase {
uint64_t filtered_size_;

/** I/O task to check and block on if filtered data is ready. */
mutable ThreadPool::SharedTask filtered_data_io_task_;
mutable std::optional<ThreadPool::SharedTask> filtered_data_io_task_;

/**
* Lock for checking task, since this tile can be used by multiple threads.
Expand Down Expand Up @@ -484,8 +472,7 @@ class WriterTile : public TileBase {
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
shared_ptr<MemoryTracker> memory_tracker,
const bool skip_waiting_on_io_task = false);
shared_ptr<MemoryTracker> memory_tracker);

/**
* Constructor.
Expand All @@ -501,8 +488,7 @@ class WriterTile : public TileBase {
const Datatype type,
const uint64_t cell_size,
const uint64_t size,
tdb::pmr::memory_resource* resource,
const bool skip_waiting_on_io_task = false);
tdb::pmr::memory_resource* resource);

DISABLE_COPY_AND_COPY_ASSIGN(WriterTile);
DISABLE_MOVE_AND_MOVE_ASSIGN(WriterTile);
Expand Down

0 comments on commit 2b490ad

Please sign in to comment.