Skip to content

Commit

Permalink
Enhancement 1895: Fully parallelise processing in read_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
alexowens90 committed Oct 31, 2024
1 parent f4f2dda commit ec834d6
Show file tree
Hide file tree
Showing 33 changed files with 760 additions and 585 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ if(${TEST})
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_has_valid_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
processing/test/test_parallel_processing.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
Expand Down
6 changes: 0 additions & 6 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,6 @@ folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descr
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{});
}

folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descriptor_for_incompletes(
const entity::VariantKey &key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override {
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorForIncompletesTask{});
}

folly::Future<bool> key_exists(entity::VariantKey &&key) {
return async::submit_io_task(KeyExistsTask{std::move(key), library_});
}
Expand Down
22 changes: 0 additions & 22 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,28 +551,6 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
}
};

struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorForIncompletesTask)

DecodeTimeseriesDescriptorForIncompletesTask() = default;

std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorForIncompletesTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(
log::storage(),
"DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
std::move(*maybe_desc));
}
};

struct DecodeMetadataAndDescriptorTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataAndDescriptorTask)

Expand Down
12 changes: 6 additions & 6 deletions cpp/arcticdb/column_store/test/ingestion_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) {
ro.allow_sparse_ = true;
ro.set_dynamic_schema(true);
ro.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data);
Expand Down Expand Up @@ -213,8 +213,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) {
read_options.set_dynamic_schema(true);
read_options.set_allow_sparse(true);
read_options.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data);
Expand Down Expand Up @@ -266,8 +266,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) {
read_options.set_dynamic_schema(true);
read_options.set_allow_sparse(true);
read_options.set_incompletes(true);
ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data();
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/frame_slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace arcticdb::pipelines {

void SliceAndKey::ensure_segment(const std::shared_ptr<Store>& store) const {
if(!segment_)
segment_ = store->read(*key_).get().second;
segment_ = store->read_sync(*key_).second;
}

SegmentInMemory& SliceAndKey::segment(const std::shared_ptr<Store>& store) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/pipeline_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
auto descriptor = std::make_shared<StreamDescriptor>(frame_and_desc.frame_.descriptor());
pipeline_context->begin()->set_descriptor(std::move(descriptor));
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data();
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data);
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get();
apply_type_handlers(frame_and_desc.frame_, handler_data);
return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc));
}
Expand Down
41 changes: 27 additions & 14 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,30 +630,42 @@ struct ReduceColumnTask : async::BaseTask {
}
};

void reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
std::any& handler_data
folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
std::any& handler_data
) {
ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol)
ARCTICDB_DEBUG(log::version(), "Reduce and fix columns");
if(frame.empty())
return;
return folly::Unit{};

bool dynamic_schema = opt_false(read_options.dynamic_schema_);
auto slice_map = std::make_shared<FrameSliceMap>(context, dynamic_schema);
DecodePathData shared_data;

static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
folly::collect(folly::window(frame.descriptor().fields().size(), [&] (size_t field) {
return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema));
}, batch_size)).via(&async::io_executor()).get();
// This logic mimics that in ReduceColumnTask operator() to identify whether the task will actually do any work
// This is to avoid scheduling work that is a no-op
std::vector<size_t> fields_to_reduce;
for (size_t idx=0; idx<frame.descriptor().fields().size(); ++idx) {
const auto& frame_field = frame.field(idx);
if (dynamic_schema ||
(slice_map->columns_.contains(frame_field.name()) && is_sequence_type(frame_field.type().data_type()))) {
fields_to_reduce.emplace_back(idx);
}
}

static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100);
return folly::collect(
folly::window(std::move(fields_to_reduce),
[context, frame, slice_map, shared_data, dynamic_schema, &handler_data] (size_t field) mutable {
return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema));
}, batch_size)).via(&async::io_executor()).unit();
}

folly::Future<std::vector<VariantKey>> fetch_data(
const SegmentInMemory& frame,
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame,
const std::shared_ptr<PipelineContext> &context,
const std::shared_ptr<stream::StreamSource>& ssource,
bool dynamic_schema,
Expand All @@ -662,7 +674,7 @@ folly::Future<std::vector<VariantKey>> fetch_data(
) {
ARCTICDB_SAMPLE_DEFAULT(FetchSlices)
if (frame.empty())
return {std::vector<VariantKey>{}};
return frame;

std::vector<std::pair<VariantKey, stream::StreamSource::ReadContinuation>> keys_and_continuations;
keys_and_continuations.reserve(context->slice_and_keys_.size());
Expand All @@ -684,7 +696,8 @@ folly::Future<std::vector<VariantKey>> fetch_data(
}
}
ARCTICDB_SUBSAMPLE_DEFAULT(DoBatchReadCompressed)
return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{});
return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{})
.thenValue([frame](auto&&){ return frame; });
}

} // namespace read
6 changes: 3 additions & 3 deletions cpp/arcticdb/pipeline/read_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ void mark_index_slices(
bool dynamic_schema,
bool column_groups);

folly::Future<std::vector<VariantKey>> fetch_data(
const SegmentInMemory& frame,
folly::Future<SegmentInMemory> fetch_data(
SegmentInMemory&& frame,
const std::shared_ptr<PipelineContext> &context,
const std::shared_ptr<stream::StreamSource>& ssource,
bool dynamic_schema,
Expand All @@ -92,7 +92,7 @@ void decode_into_frame_dynamic(
const DecodePathData& shared_data,
std::any& handler_data);

void reduce_and_fix_columns(
folly::Future<folly::Unit> reduce_and_fix_columns(
std::shared_ptr<PipelineContext> &context,
SegmentInMemory &frame,
const ReadOptions& read_options,
Expand Down
25 changes: 25 additions & 0 deletions cpp/arcticdb/processing/clause_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,29 @@ std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& enti
return res;
}

std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> split_futures(
std::vector<folly::Future<pipelines::SegmentAndSlice>>&& segment_and_slice_futures) {
std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> res;
res.reserve(segment_and_slice_futures.size());
for (auto&& future: segment_and_slice_futures) {
res.emplace_back(folly::splitFuture(std::move(future)));
}
return res;
}

std::shared_ptr<std::vector<EntityFetchCount>> generate_segment_fetch_counts(
const std::vector<std::vector<size_t>>& processing_unit_indexes,
size_t num_segments) {
auto res = std::make_shared<std::vector<EntityFetchCount>>(num_segments, 0);
for (const auto& list: processing_unit_indexes) {
for (auto idx: list) {
res->at(idx)++;
}
}
debug::check<ErrorCode::E_ASSERTION_FAILURE>(
std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }),
"All segments should be needed by at least one ProcessingUnit");
return res;
}

}
9 changes: 9 additions & 0 deletions cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <string>
#include <unordered_set>

#include <folly/futures/FutureSplitter.h>

#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/processing/component_manager.hpp>
#include <arcticdb/processing/processing_unit.hpp>
Expand Down Expand Up @@ -242,4 +244,11 @@ std::vector<EntityId> push_entities(ComponentManager& component_manager, Process

std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& entity_ids_vec);

std::vector<folly::FutureSplitter<pipelines::SegmentAndSlice>> split_futures(
std::vector<folly::Future<pipelines::SegmentAndSlice>>&& segment_and_slice_futures);

std::shared_ptr<std::vector<EntityFetchCount>> generate_segment_fetch_counts(
const std::vector<std::vector<size_t>>& processing_unit_indexes,
size_t num_segments);

}//namespace arcticdb
23 changes: 16 additions & 7 deletions cpp/arcticdb/processing/component_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,26 @@ namespace arcticdb {

std::vector<EntityId> ComponentManager::get_new_entity_ids(size_t count) {
std::vector<EntityId> ids(count);
std::lock_guard<std::mutex> lock(mtx_);
std::unique_lock lock(mtx_);
registry_.create(ids.begin(), ids.end());
return ids;
}

void ComponentManager::erase_entity(EntityId id) {
// Ideally would call registry_.destroy(id), or at least registry_.erase<std::shared_ptr<SegmentInMemory>>(id)
// at this point. However, they are both slower than this, so just decrement the ref count of the only
// sizeable component, so that when the shared pointer goes out of scope in the calling function, the
// memory is freed
registry_.get<std::shared_ptr<SegmentInMemory>>(id).reset();
void ComponentManager::decrement_entity_fetch_count(EntityId id) {
if (registry_.get<std::atomic<EntityFetchCount>>(id).fetch_sub(1) == 0) {
// This entity will never be accessed again
// Ideally would call registry_.destroy(id), or at least registry_.erase<std::shared_ptr<SegmentInMemory>>(id)
// at this point. However, they are both slower than this, and would require taking a unique_lock on the
// shared_mutex, so just decrement the ref count of the only sizeable component, so that when the shared pointer
// goes out of scope in the calling function, the memory is freed
registry_.get<std::shared_ptr<SegmentInMemory>>(id).reset();
debug::check<ErrorCode::E_ASSERTION_FAILURE>(!registry_.get<std::shared_ptr<SegmentInMemory>>(id),
"SegmentInMemory memory retained in ComponentManager");
}
}

void ComponentManager::update_entity_fetch_count(EntityId id, EntityFetchCount count) {
registry_.get<std::atomic<EntityFetchCount>>(id).store(count);
}


Expand Down
41 changes: 19 additions & 22 deletions cpp/arcticdb/processing/component_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <atomic>
#include <shared_mutex>

#include <entt/entity/registry.hpp>

Expand Down Expand Up @@ -35,16 +36,16 @@ class ComponentManager {
// Add a single entity with the components defined by args
template<class... Args>
void add_entity(EntityId id, Args... args) {
std::lock_guard<std::mutex> lock(mtx_);
std::unique_lock lock(mtx_);
([&]{
registry_.emplace<Args>(id, args);
// Store the initial entity fetch count component as a "first-class" entity, accessible by
// registry_.get<EntityFetchCount>(id), as this is external facing (used by resample)
// The remaining entity fetch count below will be decremented each time an entity is fetched, but is never
// accessed externally, so make this a named component.
// accessed externally. Stored as an atomic to minimise the requirement to take the shared_mutex with a
// unique_lock.
if constexpr (std::is_same_v<Args, EntityFetchCount>) {
auto&& remaining_entity_fetch_count_registry = registry_.storage<EntityFetchCount>(remaining_entity_fetch_count_id);
remaining_entity_fetch_count_registry.emplace(id, args);
registry_.emplace<std::atomic<EntityFetchCount>>(id, args);
}
}(), ...);
}
Expand All @@ -55,7 +56,7 @@ class ComponentManager {
std::vector<EntityId> add_entities(Args... args) {
std::vector<EntityId> ids;
size_t entity_count{0};
std::lock_guard<std::mutex> lock(mtx_);
std::unique_lock lock(mtx_);
([&]{
if (entity_count == 0) {
// Reserve memory for the result on the first pass
Expand All @@ -70,33 +71,33 @@ class ComponentManager {
}
registry_.insert<typename Args::value_type>(ids.cbegin(), ids.cend(), args.begin());
if constexpr (std::is_same_v<typename Args::value_type, EntityFetchCount>) {
auto&& remaining_entity_fetch_count_registry = registry_.storage<EntityFetchCount>(remaining_entity_fetch_count_id);
remaining_entity_fetch_count_registry.insert(ids.cbegin(), ids.cend(), args.begin());
for (auto&& [idx, id]: folly::enumerate(ids)) {
registry_.emplace<std::atomic<EntityFetchCount>>(id, args[idx]);
}
}
}(), ...);
return ids;
}

template<typename T>
void replace_entities(const std::vector<EntityId>& ids, T value) {
std::unique_lock lock(mtx_);
for (auto id: ids) {
registry_.replace<T>(id, value);
if constexpr (std::is_same_v<T, EntityFetchCount>) {
auto&& remaining_entity_fetch_count_registry = registry_.storage<EntityFetchCount>(remaining_entity_fetch_count_id);
// For some reason named storages don't have a replace API
remaining_entity_fetch_count_registry.patch(id, [value](EntityFetchCount& entity_fetch_count){ entity_fetch_count = value; });
update_entity_fetch_count(id, value);
}
}
}

template<typename T>
void replace_entities(const std::vector<EntityId>& ids, const std::vector<T>& values) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(ids.size() == values.size(), "Received vectors of differing lengths in ComponentManager::replace_entities");
std::unique_lock lock(mtx_);
for (auto [idx, id]: folly::enumerate(ids)) {
registry_.replace<T>(id, values[idx]);
if constexpr (std::is_same_v<T, EntityFetchCount>) {
auto&& remaining_entity_fetch_count_registry = registry_.storage<EntityFetchCount>(remaining_entity_fetch_count_id);
// For some reason named storages don't have a replace API
remaining_entity_fetch_count_registry.patch(id, [&values, idx](EntityFetchCount& entity_fetch_count){ entity_fetch_count = values[idx]; });
update_entity_fetch_count(id, values[idx]);
}
}
}
Expand All @@ -107,20 +108,15 @@ class ComponentManager {
std::vector<std::tuple<Args...>> tuple_res;
tuple_res.reserve(ids.size());
{
std::lock_guard<std::mutex> lock(mtx_);
auto&& remaining_entity_fetch_count_registry = registry_.storage<EntityFetchCount>(remaining_entity_fetch_count_id);
std::shared_lock lock(mtx_);
// Using view.get theoretically and empirically faster than registry_.get
auto view = registry_.view<const Args...>();
for (auto id: ids) {
tuple_res.emplace_back(std::move(view.get(id)));
}
if (decrement_fetch_count) {
for (auto id: ids) {
auto& remaining_entity_fetch_count = remaining_entity_fetch_count_registry.get(id);
// This entity will never be accessed again
if (--remaining_entity_fetch_count == 0) {
erase_entity(id);
}
decrement_entity_fetch_count(id);
}
}
}
Expand All @@ -138,10 +134,11 @@ class ComponentManager {
}

private:
void erase_entity(EntityId id);
void decrement_entity_fetch_count(EntityId id);
void update_entity_fetch_count(EntityId id, EntityFetchCount count);

entt::registry registry_;
std::mutex mtx_;
std::shared_mutex mtx_;
};

} // namespace arcticdb
Loading

0 comments on commit ec834d6

Please sign in to comment.