From a6c0002d7919e6fc1071a72286f6ee0e316aed6e Mon Sep 17 00:00:00 2001 From: Alex Owens <73388657+alexowens90@users.noreply.github.com> Date: Thu, 7 Nov 2024 09:33:24 +0000 Subject: [PATCH] Enhancement 1895: Fully parallelise processing in read_batch (#1950) #### Reference Issues/PRs Closes #1895 Fixes https://github.com/man-group/arcticdb-man/issues/171 Fixes #1936 Fixes #1939 Fixes #1940 #### What does this implement or fix? Schedules all work asynchronously in batch reads when processing is involved, as well as when all symbols are being read directly. Previously, symbols were processed sequentially, leading to idle CPUs when processing lots of smaller symbols. This works by making `read_frame_for_version` schedule work and return futures, rather than actually performing the processing. This implementation can then be used for all 4 combinations of batch/non-batch and direct/with processing reads, significantly simplifying the code and removing the now redundant `async_read_direct` (the fact that there were two different implementations to achieve effectively the same thing is what led to 2 of the bugs in the first place). Several bugs that were discovered during the implementation (flagged above) have also been fixed. Further work in this area covered in #1968 --- cpp/arcticdb/CMakeLists.txt | 1 + cpp/arcticdb/async/async_store.hpp | 6 - cpp/arcticdb/async/tasks.hpp | 22 - .../test/ingestion_stress_test.cpp | 12 +- cpp/arcticdb/pipeline/frame_slice.cpp | 2 +- cpp/arcticdb/pipeline/pipeline_utils.hpp | 2 +- cpp/arcticdb/pipeline/read_frame.cpp | 41 +- cpp/arcticdb/pipeline/read_frame.hpp | 6 +- cpp/arcticdb/processing/clause_utils.cpp | 25 ++ cpp/arcticdb/processing/clause_utils.hpp | 9 + cpp/arcticdb/processing/component_manager.cpp | 23 +- cpp/arcticdb/processing/component_manager.hpp | 43 +- .../test/test_parallel_processing.cpp | 166 ++++++++ cpp/arcticdb/storage/file/file_store.hpp | 9 +- cpp/arcticdb/storage/test/in_memory_store.hpp | 47 ++- .../storage/test/test_memory_storage.cpp | 2 +- cpp/arcticdb/stream/append_map.cpp | 9 +- cpp/arcticdb/stream/stream_source.hpp | 5 - .../version/local_versioned_engine.cpp | 200 +++------ .../version/local_versioned_engine.hpp | 12 +- cpp/arcticdb/version/python_bindings.cpp | 10 +- cpp/arcticdb/version/test/test_sparse.cpp | 46 +-- .../version/test/test_version_store.cpp | 19 +- cpp/arcticdb/version/version_core.cpp | 390 ++++++++---------- cpp/arcticdb/version/version_core.hpp | 38 +- .../version/version_map_batch_methods.cpp | 11 +- cpp/arcticdb/version/version_store_api.cpp | 12 +- cpp/arcticdb/version/version_store_api.hpp | 8 +- cpp/arcticdb/version/versioned_engine.hpp | 2 +- python/arcticdb/version_store/_store.py | 9 +- python/tests/integration/arcticdb/test_s3.py | 7 + .../arcticdb/test_unicode_strings.py | 22 + .../version_store/test_basic_version_store.py | 92 +++-- .../version_store/test_incompletes.py | 48 +++ 34 files changed, 769 insertions(+), 587 deletions(-) create mode 100644 cpp/arcticdb/processing/test/test_parallel_processing.cpp create mode 100644 python/tests/unit/arcticdb/version_store/test_incompletes.py diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 86436146d7..4519233faf 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -910,6 +910,7 @@ if(${TEST}) processing/test/test_filter_and_project_sparse.cpp processing/test/test_has_valid_type_promotion.cpp processing/test/test_operation_dispatch.cpp + processing/test/test_parallel_processing.cpp processing/test/test_resample.cpp processing/test/test_set_membership.cpp processing/test/test_signed_unsigned_comparison.cpp diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index aea666e72c..bd712b8f06 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -251,12 +251,6 @@ folly::Future> read_timeseries_descr return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{}); } -folly::Future> read_timeseries_descriptor_for_incompletes( - const entity::VariantKey &key, - storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override { - return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorForIncompletesTask{}); -} - folly::Future key_exists(entity::VariantKey &&key) { return async::submit_io_task(KeyExistsTask{std::move(key), library_}); } diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index a15e5098b5..a24ab7f23f 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -551,28 +551,6 @@ struct DecodeTimeseriesDescriptorTask : BaseTask { } }; -struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask { - ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorForIncompletesTask) - - DecodeTimeseriesDescriptorForIncompletesTask() = default; - - std::pair operator()(storage::KeySegmentPair &&ks) const { - ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorForIncompletesTask, 0) - auto key_seg = std::move(ks); - ARCTICDB_DEBUG( - log::storage(), - "DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}", - variant_key_view(key_seg.variant_key())); - - auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment()); - - util::check(static_cast(maybe_desc), "Failed to decode timeseries descriptor"); - return std::make_pair( - std::move(key_seg.variant_key()), - std::move(*maybe_desc)); - } -}; - struct DecodeMetadataAndDescriptorTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataAndDescriptorTask) diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index 3fad155a9a..105c9e37ae 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -125,8 +125,8 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { ro.allow_sparse_ = true; ro.set_dynamic_schema(true); ro.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); @@ -213,8 +213,8 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { read_options.set_dynamic_schema(true); read_options.set_allow_sparse(true); read_options.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); @@ -266,8 +266,8 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { read_options.set_dynamic_schema(true); read_options.set_allow_sparse(true); read_options.set_incompletes(true); - ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); diff --git a/cpp/arcticdb/pipeline/frame_slice.cpp b/cpp/arcticdb/pipeline/frame_slice.cpp index a7cb0dff26..d59a66372d 100644 --- a/cpp/arcticdb/pipeline/frame_slice.cpp +++ b/cpp/arcticdb/pipeline/frame_slice.cpp @@ -20,7 +20,7 @@ namespace arcticdb::pipelines { void SliceAndKey::ensure_segment(const std::shared_ptr& store) const { if(!segment_) - segment_ = store->read(*key_).get().second; + segment_ = store->read_sync(*key_).second; } SegmentInMemory& SliceAndKey::segment(const std::shared_ptr& store) { diff --git a/cpp/arcticdb/pipeline/pipeline_utils.hpp b/cpp/arcticdb/pipeline/pipeline_utils.hpp index d07bcd93ae..ae2666ed66 100644 --- a/cpp/arcticdb/pipeline/pipeline_utils.hpp +++ b/cpp/arcticdb/pipeline/pipeline_utils.hpp @@ -46,7 +46,7 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de auto descriptor = std::make_shared(frame_and_desc.frame_.descriptor()); pipeline_context->begin()->set_descriptor(std::move(descriptor)); auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data); + reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get(); apply_type_handlers(frame_and_desc.frame_, handler_data); return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc)); } diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index 471313cc36..de65763748 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -630,30 +630,42 @@ struct ReduceColumnTask : async::BaseTask { } }; - void reduce_and_fix_columns( - std::shared_ptr &context, - SegmentInMemory &frame, - const ReadOptions& read_options, - std::any& handler_data +folly::Future reduce_and_fix_columns( + std::shared_ptr &context, + SegmentInMemory &frame, + const ReadOptions& read_options, + std::any& handler_data ) { ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol) ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); if(frame.empty()) - return; + return folly::Unit{}; bool dynamic_schema = opt_false(read_options.dynamic_schema_); auto slice_map = std::make_shared(context, dynamic_schema); DecodePathData shared_data; - static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); - folly::collect(folly::window(frame.descriptor().fields().size(), [&] (size_t field) { - return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema)); - }, batch_size)).via(&async::io_executor()).get(); + // This logic mimics that in ReduceColumnTask operator() to identify whether the task will actually do any work + // This is to avoid scheduling work that is a no-op + std::vector fields_to_reduce; + for (size_t idx=0; idxcolumns_.contains(frame_field.name()) && is_sequence_type(frame_field.type().data_type()))) { + fields_to_reduce.emplace_back(idx); + } + } + static const auto batch_size = ConfigsMap::instance()->get_int("ReduceColumns.BatchSize", 100); + return folly::collect( + folly::window(std::move(fields_to_reduce), + [context, frame, slice_map, shared_data, dynamic_schema, &handler_data] (size_t field) mutable { + return async::submit_cpu_task(ReduceColumnTask(frame, field, slice_map, context, shared_data, handler_data, dynamic_schema)); + }, batch_size)).via(&async::io_executor()).unit(); } -folly::Future> fetch_data( - const SegmentInMemory& frame, +folly::Future fetch_data( + SegmentInMemory&& frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, @@ -662,7 +674,7 @@ folly::Future> fetch_data( ) { ARCTICDB_SAMPLE_DEFAULT(FetchSlices) if (frame.empty()) - return {std::vector{}}; + return frame; std::vector> keys_and_continuations; keys_and_continuations.reserve(context->slice_and_keys_.size()); @@ -684,7 +696,8 @@ folly::Future> fetch_data( } } ARCTICDB_SUBSAMPLE_DEFAULT(DoBatchReadCompressed) - return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{}); + return ssource->batch_read_compressed(std::move(keys_and_continuations), BatchReadArgs{}) + .thenValue([frame](auto&&){ return frame; }); } } // namespace read diff --git a/cpp/arcticdb/pipeline/read_frame.hpp b/cpp/arcticdb/pipeline/read_frame.hpp index d178b2a6dd..b05868533b 100644 --- a/cpp/arcticdb/pipeline/read_frame.hpp +++ b/cpp/arcticdb/pipeline/read_frame.hpp @@ -70,8 +70,8 @@ void mark_index_slices( bool dynamic_schema, bool column_groups); -folly::Future> fetch_data( - const SegmentInMemory& frame, +folly::Future fetch_data( + SegmentInMemory&& frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, @@ -92,7 +92,7 @@ void decode_into_frame_dynamic( const DecodePathData& shared_data, std::any& handler_data); -void reduce_and_fix_columns( +folly::Future reduce_and_fix_columns( std::shared_ptr &context, SegmentInMemory &frame, const ReadOptions& read_options, diff --git a/cpp/arcticdb/processing/clause_utils.cpp b/cpp/arcticdb/processing/clause_utils.cpp index 854ad5e39b..6b40a64e67 100644 --- a/cpp/arcticdb/processing/clause_utils.cpp +++ b/cpp/arcticdb/processing/clause_utils.cpp @@ -81,4 +81,29 @@ std::vector flatten_entities(std::vector>&& enti return res; } +std::vector> split_futures( + std::vector>&& segment_and_slice_futures) { + std::vector> res; + res.reserve(segment_and_slice_futures.size()); + for (auto&& future: segment_and_slice_futures) { + res.emplace_back(folly::splitFuture(std::move(future))); + } + return res; +} + +std::shared_ptr> generate_segment_fetch_counts( + const std::vector>& processing_unit_indexes, + size_t num_segments) { + auto res = std::make_shared>(num_segments, 0); + for (const auto& list: processing_unit_indexes) { + for (auto idx: list) { + res->at(idx)++; + } + } + debug::check( + std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }), + "All segments should be needed by at least one ProcessingUnit"); + return res; +} + } diff --git a/cpp/arcticdb/processing/clause_utils.hpp b/cpp/arcticdb/processing/clause_utils.hpp index 7bf93221d0..b0d7c044ec 100644 --- a/cpp/arcticdb/processing/clause_utils.hpp +++ b/cpp/arcticdb/processing/clause_utils.hpp @@ -12,6 +12,8 @@ #include #include +#include + #include #include #include @@ -242,4 +244,11 @@ std::vector push_entities(ComponentManager& component_manager, Process std::vector flatten_entities(std::vector>&& entity_ids_vec); +std::vector> split_futures( + std::vector>&& segment_and_slice_futures); + +std::shared_ptr> generate_segment_fetch_counts( + const std::vector>& processing_unit_indexes, + size_t num_segments); + }//namespace arcticdb diff --git a/cpp/arcticdb/processing/component_manager.cpp b/cpp/arcticdb/processing/component_manager.cpp index 5b413b6477..187ef90d1b 100644 --- a/cpp/arcticdb/processing/component_manager.cpp +++ b/cpp/arcticdb/processing/component_manager.cpp @@ -14,17 +14,26 @@ namespace arcticdb { std::vector ComponentManager::get_new_entity_ids(size_t count) { std::vector ids(count); - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); registry_.create(ids.begin(), ids.end()); return ids; } -void ComponentManager::erase_entity(EntityId id) { - // Ideally would call registry_.destroy(id), or at least registry_.erase>(id) - // at this point. However, they are both slower than this, so just decrement the ref count of the only - // sizeable component, so that when the shared pointer goes out of scope in the calling function, the - // memory is freed - registry_.get>(id).reset(); +void ComponentManager::decrement_entity_fetch_count(EntityId id) { + if (registry_.get>(id).fetch_sub(1) == 1) { + // This entity will never be accessed again + // Ideally would call registry_.destroy(id), or at least registry_.erase>(id) + // at this point. However, they are both slower than this, and would require taking a unique_lock on the + // shared_mutex, so just decrement the ref count of the only sizeable component, so that when the shared pointer + // goes out of scope in the calling function, the memory is freed + registry_.get>(id).reset(); + debug::check(!registry_.get>(id), + "SegmentInMemory memory retained in ComponentManager"); + } +} + +void ComponentManager::update_entity_fetch_count(EntityId id, EntityFetchCount count) { + registry_.get>(id).store(count); } diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index 406df05c10..df204f443a 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -8,6 +8,7 @@ #pragma once #include +#include #include @@ -23,8 +24,6 @@ using bucket_id = uint8_t; using namespace entt::literals; -constexpr auto remaining_entity_fetch_count_id = "remaining_entity_fetch_count"_hs; - class ComponentManager { public: ComponentManager() = default; @@ -35,16 +34,16 @@ class ComponentManager { // Add a single entity with the components defined by args template void add_entity(EntityId id, Args... args) { - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); ([&]{ registry_.emplace(id, args); // Store the initial entity fetch count component as a "first-class" entity, accessible by // registry_.get(id), as this is external facing (used by resample) // The remaining entity fetch count below will be decremented each time an entity is fetched, but is never - // accessed externally, so make this a named component. + // accessed externally. Stored as an atomic to minimise the requirement to take the shared_mutex with a + // unique_lock. if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - remaining_entity_fetch_count_registry.emplace(id, args); + registry_.emplace>(id, args); } }(), ...); } @@ -55,7 +54,7 @@ class ComponentManager { std::vector add_entities(Args... args) { std::vector ids; size_t entity_count{0}; - std::lock_guard lock(mtx_); + std::unique_lock lock(mtx_); ([&]{ if (entity_count == 0) { // Reserve memory for the result on the first pass @@ -70,8 +69,9 @@ class ComponentManager { } registry_.insert(ids.cbegin(), ids.cend(), args.begin()); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - remaining_entity_fetch_count_registry.insert(ids.cbegin(), ids.cend(), args.begin()); + for (auto&& [idx, id]: folly::enumerate(ids)) { + registry_.emplace>(id, args[idx]); + } } }(), ...); return ids; @@ -79,24 +79,23 @@ class ComponentManager { template void replace_entities(const std::vector& ids, T value) { + std::unique_lock lock(mtx_); for (auto id: ids) { registry_.replace(id, value); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - // For some reason named storages don't have a replace API - remaining_entity_fetch_count_registry.patch(id, [value](EntityFetchCount& entity_fetch_count){ entity_fetch_count = value; }); + update_entity_fetch_count(id, value); } } } template void replace_entities(const std::vector& ids, const std::vector& values) { + internal::check(ids.size() == values.size(), "Received vectors of differing lengths in ComponentManager::replace_entities"); + std::unique_lock lock(mtx_); for (auto [idx, id]: folly::enumerate(ids)) { registry_.replace(id, values[idx]); if constexpr (std::is_same_v) { - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); - // For some reason named storages don't have a replace API - remaining_entity_fetch_count_registry.patch(id, [&values, idx](EntityFetchCount& entity_fetch_count){ entity_fetch_count = values[idx]; }); + update_entity_fetch_count(id, values[idx]); } } } @@ -107,8 +106,7 @@ class ComponentManager { std::vector> tuple_res; tuple_res.reserve(ids.size()); { - std::lock_guard lock(mtx_); - auto&& remaining_entity_fetch_count_registry = registry_.storage(remaining_entity_fetch_count_id); + std::shared_lock lock(mtx_); // Using view.get theoretically and empirically faster than registry_.get auto view = registry_.view(); for (auto id: ids) { @@ -116,11 +114,7 @@ class ComponentManager { } if (decrement_fetch_count) { for (auto id: ids) { - auto& remaining_entity_fetch_count = remaining_entity_fetch_count_registry.get(id); - // This entity will never be accessed again - if (--remaining_entity_fetch_count == 0) { - erase_entity(id); - } + decrement_entity_fetch_count(id); } } } @@ -138,10 +132,11 @@ class ComponentManager { } private: - void erase_entity(EntityId id); + void decrement_entity_fetch_count(EntityId id); + void update_entity_fetch_count(EntityId id, EntityFetchCount count); entt::registry registry_; - std::mutex mtx_; + std::shared_mutex mtx_; }; } // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/test/test_parallel_processing.cpp b/cpp/arcticdb/processing/test/test_parallel_processing.cpp new file mode 100644 index 0000000000..5d2cb54a1b --- /dev/null +++ b/cpp/arcticdb/processing/test/test_parallel_processing.cpp @@ -0,0 +1,166 @@ +/* Copyright 2024 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace arcticdb::pipelines; + +struct RowSliceClause { + // Simple clause that accepts and produces segments partitioned by row-slice, which is representative of a lot of + // the real clauses we support. In place of doing any processing, the process method just sleeps for a random amount + // of time and then increments the stream id of each input segment. + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + + RowSliceClause() = default; + ARCTICDB_MOVE_COPY_DEFAULT(RowSliceClause) + + [[nodiscard]] std::vector> structure_for_processing(std::vector& ranges_and_keys) { + return structure_by_row_slice(ranges_and_keys); + } + + [[nodiscard]] std::vector> structure_for_processing(std::vector>&& entity_ids_vec) { + log::version().warn("RowSliceClause::structure_for_processing called"); + return structure_by_row_slice(*component_manager_, std::move(entity_ids_vec)); + } + + [[nodiscard]] std::vector process(std::vector&& entity_ids) const { + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{10, 100}; + auto sleep_ms = dist(eng); + log::version().warn("RowSliceClause::process sleeping for {}ms", sleep_ms); + std::this_thread::sleep_for(std::chrono::milliseconds{sleep_ms}); + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ++id; + segment->descriptor().set_id(id); + } + return push_entities(*component_manager_, std::move(proc)); + } + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig&) {} + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } +}; + +struct RestructuringClause { + // Simple clause that accepts non row-slice structured segments to stress the restructuring process (fan-in/fan-out) + // process method is the same as the RowSliceClause above + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + + RestructuringClause() { + clause_info_.input_structure_ = ProcessingStructure::ALL; + }; + ARCTICDB_MOVE_COPY_DEFAULT(RestructuringClause) + + [[nodiscard]] std::vector> structure_for_processing(std::vector& ranges_and_keys) { + return structure_by_row_slice(ranges_and_keys); + } + + [[nodiscard]] std::vector> structure_for_processing(std::vector>&& entity_ids_vec) { + log::version().warn("RestructuringClause::structure_for_processing called"); + return structure_by_row_slice(*component_manager_, std::move(entity_ids_vec)); + } + + [[nodiscard]] std::vector process(std::vector&& entity_ids) const { + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{10, 100}; + auto sleep_ms = dist(eng); + log::version().warn("RestructuringClause::process sleeping for {}ms", sleep_ms); + std::this_thread::sleep_for(std::chrono::milliseconds{sleep_ms}); + if (entity_ids.empty()) { + return {}; + } + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager_, std::move(entity_ids)); + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ++id; + segment->descriptor().set_id(id); + } + return push_entities(*component_manager_, std::move(proc)); + } + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(ARCTICDB_UNUSED const ProcessingConfig&) {} + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } +}; + +TEST(Clause, ScheduleClauseProcessingStress) { + // Extensible stress test of schedule_clause_processing. Useful for ensuring a lack of deadlock when running on + // threadpools with 1 or multiple cores. Dummy clauses provided above used to stress the fan-in/fan-out behaviour. + // Could be extended to profile and compare different scheduling algorithms and threadpool implementations if we + // want to move away from folly. + using namespace arcticdb::version_store; + auto num_clauses = 5; + std::mt19937_64 eng{std::random_device{}()}; + std::uniform_int_distribution<> dist{0, 1}; + + auto clauses = std::make_shared>>(); + for (auto unused=0; unusedemplace_back(std::make_shared(RowSliceClause())); + } else { + clauses->emplace_back(std::make_shared(RestructuringClause())); + } + } + + auto component_manager = std::make_shared(); + for (auto& clause: *clauses) { + clause->set_component_manager(component_manager); + } + + size_t num_segments{2}; + std::vector> segment_and_slice_promises(num_segments); + std::vector> segment_and_slice_futures; + std::vector> processing_unit_indexes; + for (size_t idx = 0; idx < num_segments; ++idx) { + segment_and_slice_futures.emplace_back(segment_and_slice_promises[idx].getFuture()); + processing_unit_indexes.emplace_back(std::vector{idx}); + } + + auto processed_entity_ids_fut = schedule_clause_processing(component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + clauses); + + for (size_t idx = 0; idx < segment_and_slice_promises.size(); ++idx) { + SegmentInMemory segment; + segment.descriptor().set_id(static_cast(idx)); + segment_and_slice_promises[idx].setValue(SegmentAndSlice(RangesAndKey({idx, idx+1}, {0, 1}, {}), std::move(segment))); + } + + auto processed_entity_ids = std::move(processed_entity_ids_fut).get(); + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + ASSERT_EQ(proc.segments_.value().size(), num_segments); + NumericId start_id{0}; + for (const auto& segment: proc.segments_.value()) { + auto id = std::get(segment->descriptor().id()); + ASSERT_EQ(id, start_id++ + num_clauses); + } +} diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 32b1f59743..4b6e04e436 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -107,9 +107,10 @@ void write_dataframe_to_file_internal( version_store::ReadVersionOutput read_dataframe_from_file_internal( const StreamId& stream_id, const std::string& path, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, - const arcticdb::proto::encoding::VariantCodec &codec_opts) { + const arcticdb::proto::encoding::VariantCodec &codec_opts, + std::any& handler_data) { auto config = storage::file::pack_config(path, codec_opts); storage::LibraryPath lib_path{std::string{"file"}, fmt::format("{}", stream_id)}; auto library = create_library(lib_path, storage::OpenMode::WRITE, {std::move(config)}); @@ -126,8 +127,6 @@ version_store::ReadVersionOutput read_dataframe_from_file_internal( const auto header_offset = key_data.key_offset_ + key_data.key_size_; ARCTICDB_DEBUG(log::storage(), "Got header offset at {}", header_offset); single_file_storage->load_header(header_offset, data_end - header_offset); - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - auto frame_and_descriptor = version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data); - return {std::move(versioned_item), std::move(frame_and_descriptor)}; + return version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get(); } } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index 68951f4397..d351e19905 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -206,15 +206,35 @@ namespace arcticdb { }); } - folly::Future read_compressed(const entity::VariantKey&, storage::ReadKeyOpts) override { - throw std::runtime_error("Not implemented"); + folly::Future read_compressed(const entity::VariantKey& key, storage::ReadKeyOpts opts) override { + // Anything read_compressed_sync() throws should be returned inside the Future, so: + return folly::makeFutureWith([&](){ return read_compressed_sync(key, opts); }); } storage::KeySegmentPair read_compressed_sync( - const entity::VariantKey&, + const entity::VariantKey& key, storage::ReadKeyOpts ) override { - throw std::runtime_error("Not implemented"); + StorageFailureSimulator::instance()->go(FailureType::READ); + std::lock_guard lock{mutex_}; + auto segment_in_memory = util::variant_match( + key, + [&] (const RefKey& ref_key) { + auto it = seg_by_ref_key_.find(ref_key); + if (it == seg_by_ref_key_.end()) + throw storage::KeyNotFoundException(Composite(ref_key)); + ARCTICDB_DEBUG(log::storage(), "Mock store returning compressed ref key {}", ref_key); + return it->second->clone(); + }, + [&] (const AtomKey& atom_key) { + auto it = seg_by_atom_key_.find(atom_key); + if (it == seg_by_atom_key_.end()) + throw storage::KeyNotFoundException(Composite(atom_key)); + ARCTICDB_DEBUG(log::storage(), "Mock store returning compressed atom key {}", atom_key); + return it->second->clone(); + }); + auto key_copy = key; + return storage::KeySegmentPair(std::move(key_copy), ::arcticdb::encode_dispatch(std::move(segment_in_memory), codec_, EncodingVersion::V1)); } folly::Future> read(const VariantKey& key, storage::ReadKeyOpts opts) override { @@ -432,24 +452,6 @@ namespace arcticdb { }); } - folly::Future, arcticdb::TimeseriesDescriptor>> - read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key, - storage::ReadKeyOpts /*opts*/) override { - return util::variant_match(key, [&](const AtomKey &ak) { - auto it = seg_by_atom_key_.find(ak); - if (it == seg_by_atom_key_.end()) - throw storage::KeyNotFoundException(Composite(ak)); - ARCTICDB_DEBUG(log::storage(), "Mock store read for atom key {}", ak); - auto tsd = it->second->index_descriptor(); - tsd.set_stream_descriptor(it->second->descriptor()); - return std::make_pair(key, it->second->index_descriptor()); - }, - [&](const RefKey&) { - util::raise_rte("Not implemented"); - return std::make_pair(key, TimeseriesDescriptor{}); - }); - } - void set_failure_sim(const arcticdb::proto::storage::VersionStoreConfig::StorageFailureSimulator &) override {} std::string name() const override { @@ -475,6 +477,7 @@ namespace arcticdb { std::recursive_mutex mutex_; // Allow iterate_type() to be re-entrant std::unordered_map> seg_by_atom_key_; std::unordered_map> seg_by_ref_key_; + arcticdb::proto::encoding::VariantCodec codec_; }; } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index 6e095f100f..894076118a 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -27,7 +27,7 @@ TEST(InMemory, ReadTwice) { auto test_frame = get_test_frame(symbol, fields, num_rows, start_val); version_store.write_versioned_dataframe_internal(symbol, std::move(test_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result1 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); diff --git a/cpp/arcticdb/stream/append_map.cpp b/cpp/arcticdb/stream/append_map.cpp index 77df44dff2..c61267b122 100644 --- a/cpp/arcticdb/stream/append_map.cpp +++ b/cpp/arcticdb/stream/append_map.cpp @@ -5,6 +5,7 @@ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ +#include #include #include #include @@ -379,11 +380,13 @@ std::pair> get_descriptor_a bool load_data, storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) { if(load_data) { - auto [key, seg] = store->read_sync(k, opts); + auto seg = store->read_sync(k, opts).second; return std::make_pair(seg.index_descriptor(), std::make_optional(seg)); } else { - auto [key, tsd] = store->read_timeseries_descriptor_for_incompletes(k, opts).get(); - return std::make_pair(std::move(tsd), std::nullopt); + auto seg_ptr = store->read_compressed_sync(k, opts).segment_ptr(); + auto tsd = decode_timeseries_descriptor_for_incompletes(*seg_ptr); + internal::check(tsd.has_value(), "Failed to decode timeseries descriptor"); + return std::make_pair(std::move(*tsd), std::nullopt); } } diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 9fa6aa8488..401fa8a4ac 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -82,11 +82,6 @@ struct StreamSource { virtual folly::Future> read_timeseries_descriptor(const entity::VariantKey& key, storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; - - virtual folly::Future> - read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key, - storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; - }; } // namespace arcticdb::stream \ No newline at end of file diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 3cd88a7258..da52561a14 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -360,14 +360,13 @@ std::variant get_version_identifier( ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) { py::gil_scoped_release release_gil; auto version = get_version_to_read(stream_id, version_query); const auto identifier = get_version_identifier(stream_id, version_query, read_options, version); - auto frame_and_descriptor = read_frame_for_version(store(), identifier, read_query, read_options, handler_data); - return ReadVersionOutput{version.value_or(VersionedItem{}), std::move(frame_and_descriptor)}; + return read_frame_for_version(store(), identifier, read_query, read_options, handler_data).get(); } folly::Future LocalVersionedEngine::get_descriptor( @@ -406,14 +405,14 @@ folly::Future LocalVersionedEngine::get_descriptor( } folly::Future LocalVersionedEngine::get_descriptor_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query){ - return std::move(version_fut) - .thenValue([this, &stream_id, &version_query](std::optional&& key){ - missing_data::check(key.has_value(), + return std::move(opt_index_key_fut) + .thenValue([this, &stream_id, &version_query](std::optional&& opt_index_key){ + missing_data::check(opt_index_key.has_value(), "Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query); - return get_descriptor(std::move(key.value())); + return get_descriptor(std::move(*opt_index_key)); }).via(&async::cpu_executor()); } @@ -437,11 +436,11 @@ std::vector> LocalVersionedEngine::batch internal::check(read_options.batch_throw_on_error_.has_value(), "ReadOptions::batch_throw_on_error_ should always be set here"); - auto version_futures = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector> descriptor_futures; - for (auto&& [idx, version_fut]: folly::enumerate(version_futures)) { + for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) { descriptor_futures.emplace_back( - get_descriptor_async(std::move(version_fut), stream_ids[idx], version_queries[idx])); + get_descriptor_async(std::move(opt_index_key_fut), stream_ids[idx], version_queries[idx])); } auto descriptors = folly::collectAll(descriptor_futures).get(); std::vector> descriptors_or_errors; @@ -1050,77 +1049,55 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea return versioned_item; } -folly::Future async_read_direct( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options) { - return async_read_direct_impl(store, index_key, std::move(index_segment), read_query, shared_data, handler_data, read_options); -} - std::vector LocalVersionedEngine::batch_read_keys(const std::vector &keys) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); py::gil_scoped_release release_gil; - std::vector>> index_futures; - for (auto &index_key: keys) { - index_futures.emplace_back(store()->read(index_key)); - } - auto indexes = folly::collect(index_futures).get(); - std::vector> results_fut; - auto i = 0u; - for (auto&& [index_key, index_segment] : indexes) { - DecodePathData shared_data; - results_fut.emplace_back(async_read_direct(store(), - keys[i], - std::move(index_segment), - std::make_shared(), - shared_data, - handler_data, - ReadOptions{})); - ++i; + std::vector> res; + res.reserve(keys.size()); + for (const auto& index_key: keys) { + res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared(), ReadOptions{}, handler_data)); } Allocator::instance()->trim(); - return folly::collect(results_fut).get(); + return folly::collect(res).get(); } -std::vector> LocalVersionedEngine::temp_batch_read_internal_direct( - const std::vector &stream_ids, - const std::vector &version_queries, - std::vector> &read_queries, - const ReadOptions &read_options) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); +std::vector> LocalVersionedEngine::batch_read_internal( + const std::vector& stream_ids, + const std::vector& version_queries, + std::vector>& read_queries, + const ReadOptions& read_options, + std::any& handler_data) { py::gil_scoped_release release_gil; - - auto versions = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + // This read option should always be set when calling batch_read + internal::check(read_options.batch_throw_on_error_.has_value(), + "ReadOptions::batch_throw_on_error_ should always be set here"); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector> read_versions_futs; - for (auto&& [idx, version] : folly::enumerate(versions)) { - read_versions_futs.emplace_back(std::move(version) - .thenValue([store = store()](auto&& maybe_index_key) { - missing_data::check( - maybe_index_key.has_value(), - "Version not found for symbol"); - return store->read(*maybe_index_key); - }) - .thenValue([store = store(), - read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], - read_options, - &handler_data](auto&& key_segment_pair) { - auto [index_key, index_segment] = std::move(key_segment_pair); - return async_read_direct(store, - index_key, - std::move(index_segment), - read_query, - DecodePathData{}, - handler_data, - read_options); - }) + for (auto&& [idx, opt_index_key_fut] : folly::enumerate(opt_index_key_futs)) { + read_versions_futs.emplace_back( + std::move(opt_index_key_fut).thenValue([store = store(), + idx, + &stream_ids, + &version_queries, + read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], + &read_options, + &handler_data](auto&& opt_index_key) { + std::variant version_info; + if (opt_index_key.has_value()) { + version_info = VersionedItem(std::move(*opt_index_key)); + } else { + if (opt_false(read_options.incompletes_)) { + log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_ids[idx]); + version_info = stream_ids[idx]; + } else { + missing_data::raise( + "batch_read_internal: version matching query '{}' not found for symbol '{}'", version_queries[idx], stream_ids[idx]); + } + } + return read_frame_for_version(store, version_info, read_query, read_options, handler_data); + }) ); } - // TODO: https://github.com/man-group/ArcticDB/issues/241 - // Move everything from here to the end of the function out into batch_read_internal as part of #241 auto read_versions = folly::collectAll(read_versions_futs).get(); std::vector> read_versions_or_errors; read_versions_or_errors.reserve(read_versions.size()); @@ -1135,7 +1112,8 @@ std::vector> LocalVersionedEngine::te DataError data_error(stream_ids[idx], exception.what().toStdString(), version_queries[idx].content_); if (exception.is_compatible_with()) { data_error.set_error_code(ErrorCode::E_NO_SUCH_VERSION); - } else if (exception.is_compatible_with()) { + } else if (exception.is_compatible_with() || + exception.is_compatible_with()) { data_error.set_error_code(ErrorCode::E_KEY_NOT_FOUND); } read_versions_or_errors.emplace_back(std::move(data_error)); @@ -1145,68 +1123,6 @@ std::vector> LocalVersionedEngine::te return read_versions_or_errors; } -std::vector> LocalVersionedEngine::batch_read_internal( - const std::vector& stream_ids, - const std::vector& version_queries, - std::vector>& read_queries, - const ReadOptions& read_options, - std::any& handler_data) { - // This read option should always be set when calling batch_read - internal::check(read_options.batch_throw_on_error_.has_value(), - "ReadOptions::batch_throw_on_error_ should always be set here"); - - if(std::none_of(std::begin(read_queries), std::end(read_queries), [] (const auto& read_query) { - return !read_query->clauses_.empty(); - })) { - return temp_batch_read_internal_direct(stream_ids, version_queries, read_queries, read_options); - } - - std::vector> read_versions_or_errors; - read_versions_or_errors.reserve(stream_ids.size()); - for (size_t i=0; i < stream_ids.size(); ++i) { - auto version_query = version_queries.size() > i ? version_queries[i] : VersionQuery{}; - auto read_query = read_queries.size() > i ? *read_queries[i] : ReadQuery{}; - // TODO: https://github.com/man-group/ArcticDB/issues/241 - // Remove this try-catch in favour of the implementation in temp_batch_read_internal_direct as part of #241 - try { - auto read_version = read_dataframe_version_internal(stream_ids[i], version_query, read_query, read_options, handler_data); - read_versions_or_errors.emplace_back(std::move(read_version)); - } catch (const NoSuchVersionException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_NO_SUCH_VERSION)); - } catch (const storage::NoDataFoundException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_KEY_NOT_FOUND)); - } catch (const storage::KeyNotFoundException& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_, - ErrorCode::E_KEY_NOT_FOUND)); - } catch (const std::exception& e) { - if (*read_options.batch_throw_on_error_) { - throw; - } - read_versions_or_errors.emplace_back(DataError(stream_ids[i], - e.what(), - version_query.content_)); - } - } - return read_versions_or_errors; -} - void LocalVersionedEngine::write_version_and_prune_previous( bool prune_previous_versions, const AtomKey& new_version, @@ -1593,15 +1509,15 @@ folly::Future, std::optional>> LocalVersionedEngine::get_metadata_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query ) { - return std::move(version_fut) - .thenValue([this, &stream_id, &version_query](std::optional&& key){ - missing_data::check(key.has_value(), + return std::move(opt_index_key_fut) + .thenValue([this, &stream_id, &version_query](std::optional&& opt_index_key){ + missing_data::check(opt_index_key.has_value(), "Unable to retrieve metadata. {}@{}: version not found", stream_id, version_query); - return get_metadata(std::move(key)); + return get_metadata(std::move(*opt_index_key)); }) .thenValue([](auto&& metadata){ auto&& [opt_key, meta_proto] = metadata; @@ -1617,10 +1533,10 @@ std::vector(read_options.batch_throw_on_error_.has_value(), "ReadOptions::batch_throw_on_error_ should always be set here"); - auto version_futures = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries); std::vector>>> metadata_futures; - for (auto&& [idx, version]: folly::enumerate(version_futures)) { - metadata_futures.emplace_back(get_metadata_async(std::move(version), stream_ids[idx], version_queries[idx])); + for (auto&& [idx, opt_index_key_fut]: folly::enumerate(opt_index_key_futs)) { + metadata_futures.emplace_back(get_metadata_async(std::move(opt_index_key_fut), stream_ids[idx], version_queries[idx])); } auto metadatas = folly::collectAll(metadata_futures).get(); diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index d450549394..da140ceec4 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -141,7 +141,7 @@ class LocalVersionedEngine : public VersionedEngine { ReadVersionOutput read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) override; @@ -192,7 +192,7 @@ class LocalVersionedEngine : public VersionedEngine { std::optional&& key); folly::Future>> get_metadata_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query); @@ -200,7 +200,7 @@ class LocalVersionedEngine : public VersionedEngine { AtomKey&& key); folly::Future get_descriptor_async( - folly::Future>&& version_fut, + folly::Future>&& opt_index_key_fut, const StreamId& stream_id, const VersionQuery& version_query); @@ -286,12 +286,6 @@ class LocalVersionedEngine : public VersionedEngine { const ReadOptions& read_options, std::any& handler_data); - std::vector> temp_batch_read_internal_direct( - const std::vector& stream_ids, - const std::vector& version_queries, - std::vector>& read_queries, - const ReadOptions& read_options); - std::vector> batch_read_descriptor_internal( const std::vector& stream_ids, const std::vector& version_queries, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index bd837a4e46..00a5398629 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -179,8 +179,9 @@ void register_bindings(py::module &version, py::exception read_query){ + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return adapt_read_df(read_dataframe_from_file(sid, path, read_query, handler_data)); }); using FrameDataWrapper = arcticdb::pipelines::FrameDataWrapper; @@ -615,7 +616,7 @@ void register_bindings(py::module &version, py::exception(), "Write a specific version of this dataframe to the store") .def("read_dataframe_version", - [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, ReadQuery& read_query, const ReadOptions& read_options) { + [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, std::shared_ptr read_query, const ReadOptions& read_options) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data)); }, @@ -715,7 +716,8 @@ void register_bindings(py::module &version, py::exception& version_queries, std::vector>& read_queries, const ReadOptions& read_options){ - return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options)); + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data)); }, py::call_guard(), "Read a dataframe from the store") .def("batch_read_keys", diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index 92241eaa72..4182fae934 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -84,8 +84,8 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -176,8 +176,8 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -227,8 +227,8 @@ TEST_F(SparseTestStore, DenseToSparse) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -276,8 +276,8 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); const auto& frame = read_result.frame_data.frame();; @@ -330,8 +330,8 @@ TEST_F(SparseTestStore, Multiblock) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -383,8 +383,8 @@ TEST_F(SparseTestStore, Segment) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -443,8 +443,8 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -503,9 +503,9 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.columns = {"time", "first"}; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->columns = {"time", "first"}; + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -560,8 +560,8 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { ReadOptions read_options; read_options.set_dynamic_schema(true); read_options.set_incompletes(true); - pipelines::ReadQuery read_query; - read_query.row_filter = IndexRange(timestamp{3000}, timestamp{6999}); + auto read_query = std::make_shared(); + read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -614,8 +614,8 @@ TEST_F(SparseTestStore, Compact) { ReadOptions read_options; read_options.set_dynamic_schema(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); @@ -673,8 +673,8 @@ TEST_F(SparseTestStore, CompactWithStrings) { ReadOptions read_options; read_options.set_dynamic_schema(true); - pipelines::ReadQuery read_query; - read_query.row_filter = universal_range(); + auto read_query = std::make_shared(); + read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); const auto& frame = read_result.frame_data.frame(); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 4e2ef14ff5..060d3813a2 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -256,7 +256,7 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { } auto vit = test_store_->compact_incomplete(symbol, false, false, true, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -366,8 +366,9 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { std::vector> read_queries; ReadOptions read_options; register_native_handler_data_factory(); + auto handler_data = get_type_handler_data(); read_options.set_batch_throw_on_error(true); - auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options); + auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options, handler_data); for(auto&& [idx, version] : folly::enumerate(latest_versions)) { auto expected = get_test_simple_frame(std::get(version).item.symbol(), 10, idx); bool equal = expected.segment_ == std::get(version).frame_data.frame(); @@ -531,7 +532,7 @@ TEST(VersionStore, UpdateWithin) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -574,7 +575,7 @@ TEST(VersionStore, UpdateBefore) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -617,7 +618,7 @@ TEST(VersionStore, UpdateAfter) { auto update_frame = get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -661,7 +662,7 @@ TEST(VersionStore, UpdateIntersectBefore) { get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -705,7 +706,7 @@ TEST(VersionStore, UpdateIntersectAfter) { get_test_frame(symbol, fields, update_range.diff(), update_range.first, update_val); version_store.update_internal(symbol, UpdateQuery{}, std::move(update_frame.frame_), false, false, false); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); @@ -759,7 +760,7 @@ TEST(VersionStore, UpdateWithinSchemaChange) { ReadOptions read_options; read_options.set_dynamic_schema(true); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); @@ -822,7 +823,7 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { ReadOptions read_options; read_options.set_dynamic_schema(true); - ReadQuery read_query; + auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 14d3fbb193..0be39e11b1 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -404,54 +404,7 @@ void set_row_id_for_empty_columns_set( } } -// This is a parallelizable direct read (no processing pipeline) that is used -// for things that need to get multiple objects in their entirety, as with -// recursive normalizers. Outside those specific situtations it's probably -// not what you want -folly::Future async_read_direct_impl( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options) { - auto index_segment_reader = std::make_shared(std::move(index_segment)); - const auto& tsd = index_segment_reader->tsd(); - check_column_and_date_range_filterable(*index_segment_reader, *read_query); - add_index_columns_to_query(*read_query, tsd); - read_query->calculate_row_filter(static_cast(tsd.total_rows())); - - auto pipeline_context = std::make_shared(StreamDescriptor{tsd.as_stream_descriptor()}); - pipeline_context->set_selected_columns(read_query->columns); - - const bool dynamic_schema = opt_false(read_options.dynamic_schema_); - const bool bucketize_dynamic = index_segment_reader->bucketize_dynamic(); - - auto queries = get_column_bitset_and_query_functions( - *read_query, - pipeline_context, - dynamic_schema, - bucketize_dynamic); - - pipeline_context->slice_and_keys_ = filter_index(*index_segment_reader, combine_filter_functions(queries)); - - generate_filtered_field_descriptors(pipeline_context, read_query->columns); - mark_index_slices(pipeline_context, dynamic_schema, bucketize_dynamic); - auto frame = allocate_frame(pipeline_context); - - return fetch_data(frame, pipeline_context, store, dynamic_schema, shared_data, handler_data).thenValue( - [pipeline_context, frame, read_options, &handler_data](auto &&) mutable { - reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data); - }).thenValue( - [index_segment_reader, frame, index_key, shared_data, read_query, pipeline_context](auto &&) mutable { - set_row_id_for_empty_columns_set(*read_query, *pipeline_context, frame, index_segment_reader->tsd().total_rows() - 1); - return ReadVersionOutput{VersionedItem{to_atom(index_key)}, - FrameAndDescriptor{frame, std::move(index_segment_reader->mutable_tsd()), {}, shared_data.buffers()}}; - }); -} - -FrameAndDescriptor read_multi_key( +folly::Future read_multi_key( const std::shared_ptr& store, const SegmentInMemory& index_key_seg, std::any& handler_data) { @@ -461,61 +414,69 @@ FrameAndDescriptor read_multi_key( } AtomKey dup{keys[0]}; - ReadQuery read_query; VersionedItem versioned_item{std::move(dup)}; - auto res = read_frame_for_version(store, versioned_item, read_query, ReadOptions{}, handler_data); TimeseriesDescriptor multi_key_desc{index_key_seg.index_descriptor()}; - multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(res.desc_.proto().normalization()); - return {res.frame_, multi_key_desc, keys, std::shared_ptr{}}; + return read_frame_for_version(store, versioned_item, std::make_shared(), ReadOptions{}, handler_data) + .thenValue([multi_key_desc=std::move(multi_key_desc), keys=std::move(keys)](ReadVersionOutput&& read_version_output) mutable { + multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(read_version_output.frame_and_descriptor_.desc_.proto().normalization()); + read_version_output.frame_and_descriptor_.desc_ = std::move(multi_key_desc); + read_version_output.frame_and_descriptor_.keys_ = std::move(keys); + read_version_output.frame_and_descriptor_.buffers_ = std::make_shared(); + return std::move(read_version_output); + }); } -std::vector process_clauses( - std::shared_ptr component_manager, - std::vector>&& segment_and_slice_futures, - std::vector>&& processing_unit_indexes, - std::shared_ptr>> clauses) { - // There are some odd looking choices in this method: - // - clauses being shared_ptr> - // - segment_proc_unit_counts, entity_added_mtx, and entity_added created as shared pointers rather than just on the - // stack - // Both are for the same reason. folly::collect short-circuits and throws an exception the first time a task - // finishes due to an exception rather than cleanly exiting. However, other tasks that have already been enqueued - // continue executing, and so any variables from this scope that they depend on must be kept alive by the tasks - // themselves. - // It was considered to make the type of ReadQuery::clauses_ std::shared_ptr>. However, this - // makes all the other uses of clauses_ much less clean, so the compromise is an odd function signature here. - - std::vector> segment_and_slice_future_splitters; - segment_and_slice_future_splitters.reserve(segment_and_slice_futures.size()); - for (auto&& future: segment_and_slice_futures) { - segment_and_slice_future_splitters.emplace_back(folly::splitFuture(std::move(future))); +size_t generate_scheduling_iterations(const std::vector>& clauses) { + size_t res{1}; + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ != (*it)->clause_info().input_structure_) { + ++res; + } + ++it; } + return res; +} - // Map from index in segment_and_slice_future_splitters to the number of processing units that require that segment - auto segment_proc_unit_counts = std::make_shared>(segment_and_slice_futures.size(), 0); - for (const auto& list: processing_unit_indexes) { - for (auto idx: list) { - internal::check( - idx < segment_proc_unit_counts->size(), - "Index {} in processing_unit_indexes out of bounds >{}", idx, segment_proc_unit_counts->size() - 1); - (*segment_proc_unit_counts)[idx]++; +void remove_processed_clauses(std::vector>& clauses) { + // Erase all the clauses we have already scheduled to run + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { + ++it; + } else { + break; } } - internal::check( - std::all_of(segment_proc_unit_counts->begin(), segment_proc_unit_counts->end(), [](const size_t& val) { return val != 0; }), - "All segments should be needed by at least one ProcessingUnit"); - // Map from position in segment_and_slice_futures to entity ids + clauses.erase(clauses.cbegin(), it); +} + +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses) { + // All the shared pointers as arguments to this function and created within it are to ensure that resources are + // correctly kept alive after this function returns it's future + auto num_segments = segment_and_slice_futures.size(); + auto segment_and_slice_future_splitters = split_futures(std::move(segment_and_slice_futures)); + + // Map from index in segment_and_slice_future_splitters to the number of calls to process in the first clause that + // will require that segment + auto segment_fetch_counts = generate_segment_fetch_counts(processing_unit_indexes, num_segments); + // Map from position in segment_and_slice_future_splitters to entity ids std::vector pos_to_id; // Map from entity id to position in segment_and_slice_futures auto id_to_pos = std::make_shared>(); - pos_to_id.reserve(segment_and_slice_futures.size()); - auto ids = component_manager->get_new_entity_ids(segment_and_slice_futures.size()); + pos_to_id.reserve(num_segments); + auto ids = component_manager->get_new_entity_ids(num_segments); for (auto&& [idx, id]: folly::enumerate(ids)) { pos_to_id.emplace_back(id); id_to_pos->emplace(id, idx); } - // Give this a more descriptive name as we modify it between clauses std::vector> entity_ids_vec; entity_ids_vec.reserve(processing_unit_indexes.size()); for (const auto& indexes: processing_unit_indexes) { @@ -527,23 +488,21 @@ std::vector process_clauses( } // Used to make sure each entity is only added into the component manager once - auto slice_added_mtx = std::make_shared>(segment_and_slice_futures.size()); - auto slice_added = std::make_shared>(segment_and_slice_futures.size(), false); - std::vector>> futures; - bool first_clause{true}; - while (!clauses->empty()) { - for (auto&& entity_ids: entity_ids_vec) { - if (first_clause) { - std::vector> local_futs; - local_futs.reserve(entity_ids.size()); - for (auto id: entity_ids) { - local_futs.emplace_back(segment_and_slice_future_splitters[id_to_pos->at(id)].getFuture()); - } - futures.emplace_back( - folly::collect(local_futs) + auto slice_added_mtx = std::make_shared>(num_segments); + auto slice_added = std::make_shared>(num_segments, false); + auto futures = std::make_shared>>>(); + + for (auto&& entity_ids: entity_ids_vec) { + std::vector> local_futs; + local_futs.reserve(entity_ids.size()); + for (auto id: entity_ids) { + local_futs.emplace_back(segment_and_slice_future_splitters[id_to_pos->at(id)].getFuture()); + } + futures->emplace_back( + folly::collect(local_futs) .via(&async::cpu_executor()) .thenValue([component_manager, - segment_proc_unit_counts, + segment_fetch_counts, id_to_pos, slice_added_mtx, slice_added, @@ -560,41 +519,44 @@ std::vector process_clauses( std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), - (*segment_proc_unit_counts)[pos] - ); + (*segment_fetch_counts)[pos] + ); (*slice_added)[pos] = true; } } return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))(); })); - } else { - futures.emplace_back( - async::submit_cpu_task( - async::MemSegmentProcessingTask(*clauses, - std::move(entity_ids)) - ) - ); - } - } - first_clause = false; - entity_ids_vec = folly::collect(futures).get(); - futures.clear(); - // Erase all the clauses we have already called process on - auto it = std::next(clauses->cbegin()); - while (it != clauses->cend()) { - auto prev_it = std::prev(it); - if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { - ++it; - } else { - break; - } - } - clauses->erase(clauses->cbegin(), it); - if (!clauses->empty()) { - entity_ids_vec = clauses->front()->structure_for_processing(std::move(entity_ids_vec)); + } + + auto entity_ids_vec_fut = folly::Future>>::makeEmpty(); + // The number of iterations we need to pass through the following loop to get all the work scheduled + auto scheduling_iterations = generate_scheduling_iterations(*clauses); + for (size_t i=0; i work_scheduled(folly::Unit{}); + if (i > 0) { + work_scheduled = entity_ids_vec_fut.via(&async::cpu_executor()).thenValue([clauses, futures](std::vector>&& entity_ids_vec) { + futures->clear(); + for (auto&& entity_ids: entity_ids_vec) { + futures->emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask(*clauses, std::move(entity_ids)))); + } + return folly::Unit{}; + }); } + + entity_ids_vec_fut = work_scheduled.via(&async::cpu_executor()).thenValue([clauses, futures](auto&&) { + return folly::collect(*futures).via(&async::cpu_executor()).thenValue([clauses](std::vector>&& entity_ids_vec) { + remove_processed_clauses(*clauses); + if (clauses->empty()) { + return entity_ids_vec; + } else { + return clauses->front()->structure_for_processing(std::move(entity_ids_vec)); + } + }); + }); } - return flatten_entities(std::move(entity_ids_vec)); + return entity_ids_vec_fut.via(&async::cpu_executor()).thenValue([](std::vector>&& entity_ids_vec) { + return flatten_entities(std::move(entity_ids_vec)); + }); } void set_output_descriptors( @@ -730,15 +692,15 @@ std::vector generate_ranges_and_keys(const std::vector read_and_process( +folly::Future> read_and_process( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - const ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options ) { auto component_manager = std::make_shared(); ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; - for (auto& clause: read_query.clauses_) { + for (auto& clause: read_query->clauses_) { clause->set_processing_config(processing_config); clause->set_component_manager(component_manager); } @@ -749,39 +711,27 @@ std::vector read_and_process( // Each element of the vector corresponds to one processing unit containing the list of indexes in ranges_and_keys required for that processing unit // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] // then the structure will be {{0, 1}, {2, 3}} - std::vector> processing_unit_indexes = read_query.clauses_[0]->structure_for_processing(ranges_and_keys); + std::vector> processing_unit_indexes = read_query->clauses_[0]->structure_for_processing(ranges_and_keys); // Start reading as early as possible auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(ranges_and_keys), columns_to_decode(pipeline_context)); - auto processed_entity_ids = process_clauses(component_manager, - std::move(segment_and_slice_futures), - std::move(processing_unit_indexes), - std::make_shared>>(read_query.clauses_)); - auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - - if (std::any_of(read_query.clauses_.begin(), read_query.clauses_.end(), [](const std::shared_ptr& clause) { - return clause->clause_info().modifies_output_descriptor_; - })) { - set_output_descriptors(proc, read_query.clauses_, pipeline_context); - } - return collect_segments(std::move(proc)); -} - -SegmentInMemory read_direct(const std::shared_ptr& store, - const std::shared_ptr& pipeline_context, - DecodePathData shared_data, - const ReadOptions& read_options, - std::any& handler_data) { - ARCTICDB_DEBUG(log::version(), "Allocating frame"); - ARCTICDB_SAMPLE_DEFAULT(ReadDirect) - auto frame = allocate_frame(pipeline_context); - util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); - - ARCTICDB_DEBUG(log::version(), "Fetching frame data"); - fetch_data(frame, pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data).get(); - util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); - return frame; + return schedule_clause_processing(component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + std::make_shared>>( + read_query->clauses_)) + .via(&async::cpu_executor()) + .thenValue([component_manager, read_query, pipeline_context](auto&& processed_entity_ids) { + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + + if (std::any_of(read_query->clauses_.begin(), read_query->clauses_.end(), [](const std::shared_ptr& clause) { + return clause->clause_info().modifies_output_descriptor_; + })) { + set_output_descriptors(proc, read_query->clauses_, pipeline_context); + } + return collect_segments(std::move(proc)); + }); } void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDescriptor& desc) { @@ -918,7 +868,7 @@ bool read_incompletes_to_pipeline( // In order to have the right normalization metadata and descriptor we need to find the first non-empty segment. // Picking an empty segment when there are non-empty ones will impact the index type and column namings. - // If all segments are empty we will procede as if were appending/writing and empty dataframe. + // If all segments are empty we will proceed as if were appending/writing and empty dataframe. debug::check(!incomplete_segments.empty(), "Incomplete segments must be non-empty"); const auto first_non_empty_seg = std::find_if(incomplete_segments.begin(), incomplete_segments.end(), [&](auto& slice){ return slice.segment(store).row_count() > 0; @@ -1130,7 +1080,7 @@ void copy_frame_data_to_buffer( struct CopyToBufferTask : async::BaseTask { SegmentInMemory&& source_segment_; - SegmentInMemory& target_segment_; + SegmentInMemory target_segment_; FrameSlice frame_slice_; DecodePathData shared_data_; std::any& handler_data_; @@ -1138,7 +1088,7 @@ struct CopyToBufferTask : async::BaseTask { CopyToBufferTask( SegmentInMemory&& source_segment, - SegmentInMemory& target_segment, + const SegmentInMemory& target_segment, FrameSlice frame_slice, DecodePathData shared_data, std::any& handler_data, @@ -1178,10 +1128,10 @@ struct CopyToBufferTask : async::BaseTask { } }; -void copy_segments_to_frame( +folly::Future copy_segments_to_frame( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, - SegmentInMemory& frame, + SegmentInMemory frame, std::any& handler_data) { std::vector> copy_tasks; DecodePathData shared_data; @@ -1198,10 +1148,10 @@ void copy_segments_to_frame( handler_data, context_row->fetch_index()})); } - folly::collect(copy_tasks).get(); + return folly::collect(copy_tasks).via(&async::cpu_executor()).unit(); } -SegmentInMemory prepare_output_frame( +folly::Future prepare_output_frame( std::vector&& items, const std::shared_ptr& pipeline_context, const std::shared_ptr& store, @@ -1224,9 +1174,7 @@ SegmentInMemory prepare_output_frame( } auto frame = allocate_frame(pipeline_context); - copy_segments_to_frame(store, pipeline_context, frame, handler_data); - - return frame; + return copy_segments_to_frame(store, pipeline_context, frame, handler_data).thenValue([frame](auto&&){ return frame; }); } AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key) { @@ -1251,7 +1199,7 @@ void create_column_stats_impl( log::version().warn("Cannot create empty column stats"); return; } - ReadQuery read_query({std::make_shared(std::move(*clause))}); + auto read_query = std::make_shared(std::vector>{std::make_shared(std::move(*clause))}); auto column_stats_key = index_key_to_column_stats_key(versioned_item.key_); std::optional old_segment; @@ -1270,7 +1218,7 @@ void create_column_stats_impl( auto pipeline_context = std::make_shared(); pipeline_context->stream_id_ = versioned_item.key_.id(); - read_indexed_keys_to_pipeline(store, pipeline_context, versioned_item, read_query, read_options); + read_indexed_keys_to_pipeline(store, pipeline_context, versioned_item, *read_query, read_options); schema::check( !pipeline_context->multi_key_, @@ -1281,7 +1229,7 @@ void create_column_stats_impl( "Cannot create column stats on pickled data" ); - auto segs = read_and_process(store, pipeline_context, read_query, read_options); + auto segs = read_and_process(store, pipeline_context, read_query, read_options).get(); schema::check(!segs.empty(), "Cannot create column stats for nonexistent columns"); // Convert SliceAndKey vector into SegmentInMemory vector @@ -1377,26 +1325,29 @@ ColumnStats get_column_stats_info_impl( } } -SegmentInMemory do_direct_read_or_process( +folly::Future do_direct_read_or_process( const std::shared_ptr& store, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, const std::shared_ptr& pipeline_context, const DecodePathData& shared_data, std::any& handler_data) { - SegmentInMemory frame; - if(!read_query.clauses_.empty()) { + if(!read_query->clauses_.empty()) { ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) util::check_rte(!pipeline_context->is_pickled(),"Cannot filter pickled data"); - auto segs = read_and_process(store, pipeline_context, read_query, read_options); - frame = prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); + return read_and_process(store, pipeline_context, read_query, read_options) + .thenValue([store, pipeline_context, &read_options, &handler_data](auto&& segs) { + return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); + }); } else { ARCTICDB_SAMPLE(MarkAndReadDirect, 0) - util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query.row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); + util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query->row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); mark_index_slices(pipeline_context, opt_false(read_options.dynamic_schema_), pipeline_context->bucketize_dynamic_); - frame = read_direct(store, pipeline_context, shared_data, read_options, handler_data); + auto frame = allocate_frame(pipeline_context); + util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); + ARCTICDB_DEBUG(log::version(), "Fetching frame data"); + return fetch_data(std::move(frame), pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data); } - return frame; } VersionedItem collate_and_write( @@ -1496,14 +1447,14 @@ VersionedItem sort_merge_impl( auto pipeline_context = std::make_shared(); pipeline_context->stream_id_ = stream_id; pipeline_context->version_id_ = update_info.next_version_id_; - ReadQuery read_query; + auto read_query = std::make_shared(); std::optional previous_sorted_value; const IncompleteKeysRAII incomplete_keys_raii = options.delete_staged_data_on_failure_ ? IncompleteKeysRAII{pipeline_context, store, &options} : IncompleteKeysRAII{}; if(options.append_ && update_info.previous_index_key_.has_value()) { - read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, ReadOptions{}); + read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, ReadOptions{}); if (!write_options.dynamic_schema) { user_input::check( pipeline_context->slice_and_keys_.front().slice().columns() == pipeline_context->slice_and_keys_.back().slice().columns(), @@ -1517,7 +1468,7 @@ VersionedItem sort_merge_impl( const bool has_incomplete_segments = read_incompletes_to_pipeline( store, pipeline_context, - read_query, + *read_query, ReadOptions{}, options.convert_int_to_float_, options.via_iteration_, @@ -1534,19 +1485,19 @@ VersionedItem sort_merge_impl( auto index = stream::index_type_from_descriptor(pipeline_context->descriptor()); util::variant_match(index, [&](const stream::TimeseriesIndex ×eries_index) { - read_query.clauses_.emplace_back(std::make_shared(SortClause{timeseries_index.name(), pipeline_context->incompletes_after()})); - read_query.clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{})); + read_query->clauses_.emplace_back(std::make_shared(SortClause{timeseries_index.name(), pipeline_context->incompletes_after()})); + read_query->clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{})); //const auto split_size = ConfigsMap::instance()->get_int("Split.RowCount", 10000); - //read_query.clauses_.emplace_back(std::make_shared(SplitClause{static_cast(split_size)})); + //read_query->clauses_.emplace_back(std::make_shared(SplitClause{static_cast(split_size)})); - read_query.clauses_.emplace_back(std::make_shared(MergeClause{ + read_query->clauses_.emplace_back(std::make_shared(MergeClause{ timeseries_index, SparseColumnPolicy{}, stream_id, pipeline_context->descriptor(), write_options.dynamic_schema })); - auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}); + auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}).get(); if (options.append_ && update_info.previous_index_key_ && !segments.empty()) { const timestamp last_index_on_disc = update_info.previous_index_key_->end_time() - 1; const timestamp incomplete_start = @@ -1715,8 +1666,8 @@ PredefragmentationInfo get_pre_defragmentation_info( pipeline_context->stream_id_ = stream_id; pipeline_context->version_id_ = update_info.next_version_id_; - ReadQuery read_query; - read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), read_query, defragmentation_read_options_generator(options)); + auto read_query = std::make_shared(); + read_indexed_keys_to_pipeline(store, pipeline_context, *(update_info.previous_index_key_), *read_query, defragmentation_read_options_generator(options)); using CompactionStartInfo = std::pair;//row, segment_append_after std::vector first_col_segment_idx; @@ -1776,8 +1727,8 @@ VersionedItem defragment_symbol_data_impl( util::variant_match(std::move(policies), [ &fut_vec, &slices, &store, &options, &pre_defragmentation_info, segment_size=segment_size] (auto &&idx, auto &&schema) { - pre_defragmentation_info.read_query.clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{pre_defragmentation_info.append_after.value()})); - auto segments = read_and_process(store, pre_defragmentation_info.pipeline_context, pre_defragmentation_info.read_query, defragmentation_read_options_generator(options)); + pre_defragmentation_info.read_query->clauses_.emplace_back(std::make_shared(RemoveColumnPartitioningClause{pre_defragmentation_info.append_after.value()})); + auto segments = read_and_process(store, pre_defragmentation_info.pipeline_context, pre_defragmentation_info.read_query, defragmentation_read_options_generator(options)).get(); using IndexType = std::remove_reference_t; using SchemaType = std::remove_reference_t; do_compact( @@ -1817,33 +1768,44 @@ void set_row_id_if_index_only( // This is the main user-facing read method that either returns all or // part of a dataframe as-is, or transforms it via a processing pipeline -FrameAndDescriptor read_frame_for_version( +folly::Future read_frame_for_version( const std::shared_ptr& store, const std::variant& version_info, - ReadQuery& read_query, + const std::shared_ptr& read_query , const ReadOptions& read_options, std::any& handler_data) { using namespace arcticdb::pipelines; auto pipeline_context = std::make_shared(); + VersionedItem res_versioned_item; if(std::holds_alternative(version_info)) { pipeline_context->stream_id_ = std::get(version_info); + // This isn't ideal. It would be better if the version() and timestamp() methods on the C++ VersionedItem class + // returned optionals, but this change would bubble up to the Python VersionedItem class defined in _store.py. + // This class is very hard to change at this point, as users do things like pickling them to pass them around. + // This at least gets the symbol attribute of VersionedItem correct. The creation timestamp will be zero, which + // corresponds to 1970, and so with this obviously ridiculous version ID, it should be clear to users that these + // values are meaningless before an indexed version exists. + res_versioned_item = VersionedItem(AtomKeyBuilder() + .version_id(std::numeric_limits::max()) + .build(std::get(version_info))); } else { pipeline_context->stream_id_ = std::get(version_info).key_.id(); - read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), read_query, read_options); + read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), *read_query, read_options); + res_versioned_item = std::get(version_info); } if(pipeline_context->multi_key_) { - check_multi_key_is_not_index_only(*pipeline_context, read_query); + check_multi_key_is_not_index_only(*pipeline_context, *read_query); return read_multi_key(store, *pipeline_context->multi_key_, handler_data); } if(opt_false(read_options.incompletes_)) { - util::check(std::holds_alternative(read_query.row_filter), "Streaming read requires date range filter"); - const auto& query_range = std::get(read_query.row_filter); + util::check(std::holds_alternative(read_query->row_filter), "Streaming read requires date range filter"); + const auto& query_range = std::get(read_query->row_filter); const auto existing_range = pipeline_context->index_range(); if(!existing_range.specified_ || query_range.end_ > existing_range.end_) - read_incompletes_to_pipeline(store, pipeline_context, read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema_)); + read_incompletes_to_pipeline(store, pipeline_context, *read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema_)); } if(std::holds_alternative(version_info) && !pipeline_context->incompletes_after_) { @@ -1852,16 +1814,24 @@ FrameAndDescriptor read_frame_for_version( } modify_descriptor(pipeline_context, read_options); - generate_filtered_field_descriptors(pipeline_context, read_query.columns); + generate_filtered_field_descriptors(pipeline_context, read_query->columns); ARCTICDB_DEBUG(log::version(), "Fetching data to frame"); DecodePathData shared_data; - auto frame = version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data); - - ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); - reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data); - set_row_id_if_index_only(*pipeline_context, frame, read_query); - return {frame, timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), {}, shared_data.buffers()}; + return version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data) + .thenValue([res_versioned_item, pipeline_context, &read_options, &handler_data, read_query, shared_data](auto&& frame) mutable { + ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); + return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data) + .via(&async::cpu_executor()) + .thenValue([res_versioned_item, pipeline_context, frame, read_query, shared_data](auto&&) mutable { + set_row_id_if_index_only(*pipeline_context, frame, *read_query); + return ReadVersionOutput{std::move(res_versioned_item), + {frame, + timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), + {}, + shared_data.buffers()}}; + }); + }); } } //namespace arcticdb::version_store diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index a465b9c576..7e04f29e7f 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -125,11 +125,17 @@ ColumnStats get_column_stats_info_impl( const std::shared_ptr& store, const VersionedItem& versioned_item); -FrameAndDescriptor read_multi_key( +folly::Future read_multi_key( const std::shared_ptr& store, const SegmentInMemory& index_key_seg, std::any& handler_data); +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses); + FrameAndDescriptor read_segment_impl( const std::shared_ptr& store, const VariantKey& key); @@ -148,35 +154,11 @@ VersionedItem compact_incomplete_impl( struct PredefragmentationInfo{ std::shared_ptr pipeline_context; - ReadQuery read_query; + std::shared_ptr read_query; size_t segments_need_compaction; std::optional append_after; }; -folly::Future async_read_direct_impl( - const std::shared_ptr& store, - const VariantKey& index_key, - SegmentInMemory&& index_segment, - const std::shared_ptr& read_query, - DecodePathData shared_data, - std::any& handler_data, - const ReadOptions& read_options); - -SegmentInMemory prepare_output_frame( - std::vector&& items, - const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, - const ReadOptions& read_options, - std::any& handler_data); - -SegmentInMemory do_direct_read_or_process( - const std::shared_ptr& store, - ReadQuery& read_query, - const ReadOptions& read_options, - const std::shared_ptr& pipeline_context, - const DecodePathData& shared_data, - std::any& handler_data); - PredefragmentationInfo get_pre_defragmentation_info( const std::shared_ptr& store, const StreamId& stream_id, @@ -217,10 +199,10 @@ void add_index_columns_to_query( const ReadQuery& read_query, const TimeseriesDescriptor& desc); -FrameAndDescriptor read_frame_for_version( +folly::Future read_frame_for_version( const std::shared_ptr& store, const std::variant& version_info, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data ); diff --git a/cpp/arcticdb/version/version_map_batch_methods.cpp b/cpp/arcticdb/version/version_map_batch_methods.cpp index a6d0b31ddb..6010b2a784 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.cpp +++ b/cpp/arcticdb/version/version_map_batch_methods.cpp @@ -267,14 +267,19 @@ std::vector>> batch_get_versions_async( }); output.push_back(std::move(version_entry_fut) + .via(&async::cpu_executor()) .thenValue([vq = version_query, sid = *symbol](auto version_or_snapshot) { return util::variant_match(version_or_snapshot, [&vq](const std::shared_ptr &version_map_entry) { return get_key_for_version_query(version_map_entry, vq); }, - [&sid](std::optional snapshot) { - if (!snapshot) - return std::make_optional(); + [&vq, &sid](std::optional snapshot) -> std::optional { + missing_data::check( + snapshot, + "batch_get_versions_async: version matching query '{}' not found for symbol '{}'", + vq, + sid + ); auto [snap_key, snap_segment] = std::move(*snapshot); auto opt_id = row_id_for_stream_in_snapshot_segment( diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index aa394087e8..bbb1d10b5e 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -769,8 +769,8 @@ std::vector> PythonVersionStore::batch_read( const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, - const ReadOptions& read_options) { - auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + const ReadOptions& read_options, + std::any& handler_data) { auto read_versions_or_errors = batch_read_internal(stream_ids, version_queries, read_queries, read_options, handler_data); std::vector> res; for (auto&& [idx, read_version_or_error]: folly::enumerate(read_versions_or_errors)) { @@ -834,7 +834,7 @@ void PythonVersionStore::delete_snapshot_sync(const SnapshotId& snap_name, const ReadResult PythonVersionStore::read_dataframe_version( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) { @@ -1151,13 +1151,15 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId &stream_id, const std::string& path, - ReadQuery& read_query) { + const std::shared_ptr& read_query, + std::any& handler_data) { auto opt_version_and_frame = read_dataframe_from_file_internal( stream_id, path, read_query, ReadOptions{}, - codec::default_lz4_codec()); + codec::default_lz4_codec(), + handler_data); return create_python_read_result(opt_version_and_frame.versioned_item_, std::move(opt_version_and_frame.frame_and_descriptor_)); } diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index d1607eca27..0ace64bf73 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -167,7 +167,7 @@ class PythonVersionStore : public LocalVersionedEngine { ReadResult read_dataframe_version( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data); @@ -297,7 +297,8 @@ class PythonVersionStore : public LocalVersionedEngine { const std::vector& stream_ids, const std::vector& version_queries, std::vector>& read_queries, - const ReadOptions& read_options); + const ReadOptions& read_options, + std::any& handler_data); std::vector, DataError>> batch_read_metadata( const std::vector& stream_ids, @@ -349,7 +350,8 @@ void write_dataframe_to_file( ReadResult read_dataframe_from_file( const StreamId &stream_id, const std::string& path, - ReadQuery& read_query); + const std::shared_ptr& read_query, + std::any& handler_data); struct ManualClockVersionStore : PythonVersionStore { ManualClockVersionStore(const std::shared_ptr& library) : diff --git a/cpp/arcticdb/version/versioned_engine.hpp b/cpp/arcticdb/version/versioned_engine.hpp index 2e47689dd5..f38f302a02 100644 --- a/cpp/arcticdb/version/versioned_engine.hpp +++ b/cpp/arcticdb/version/versioned_engine.hpp @@ -101,7 +101,7 @@ class VersionedEngine { virtual ReadVersionOutput read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, - ReadQuery& read_query, + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) = 0; diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index f3e4dbc313..b13b577fd0 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -94,13 +94,18 @@ class VersionedItem: For data retrieval (read) operations, contains the data read. For data modification operations, the value might not be populated. version: int - For data retrieval operations, the version the `as_of` argument resolved to. + For data retrieval operations, the version the `as_of` argument resolved to. In the special case where no + versions have been written yet, but data is being read exclusively from incomplete segments, this will be + 2^64-1. For data modification operations, the version the data has been written under. metadata: Any The metadata saved alongside `data`. Availability depends on the method used and may be different from that of `data`. host: Optional[str] Informational / for backwards compatibility. + timestamp: Optional[int] + The time in nanoseconds since epoch that this version was written. In the special case where no versions have + been written yet, but data is being read exclusively from incomplete segments, this will be 0. """ symbol: str = attr.ib() @@ -1859,7 +1864,7 @@ def _post_process_dataframe(self, read_result, read_query, implement_read_index= original_data = Flattener().create_original_obj_from_metastruct_new(meta_struct, key_map) return VersionedItem( - symbol=vitem.symbol, + symbol=meta_struct["symbol"], library=vitem.library, data=original_data, version=vitem.version, diff --git a/python/tests/integration/arcticdb/test_s3.py b/python/tests/integration/arcticdb/test_s3.py index 0494f60c9c..d33e4205ac 100644 --- a/python/tests/integration/arcticdb/test_s3.py +++ b/python/tests/integration/arcticdb/test_s3.py @@ -9,6 +9,7 @@ import pytest import pandas as pd +import sys from arcticdb_ext.exceptions import StorageException from arcticdb_ext import set_config_string @@ -18,6 +19,12 @@ from arcticdb.storage_fixtures.s3 import MotoS3StorageFixtureFactory +pytestmark = pytest.mark.skipif( + sys.version_info.major == 3 and sys.version_info.minor == 6 and sys.platform == "linux", + reason="Test setup segfaults" +) + + def test_s3_storage_failures(mock_s3_store_with_error_simulation): lib = mock_s3_store_with_error_simulation symbol_fail_write = "symbol#Failure_Write_99_0" diff --git a/python/tests/integration/arcticdb/test_unicode_strings.py b/python/tests/integration/arcticdb/test_unicode_strings.py index 020add623f..771ab50a54 100644 --- a/python/tests/integration/arcticdb/test_unicode_strings.py +++ b/python/tests/integration/arcticdb/test_unicode_strings.py @@ -1,8 +1,12 @@ +import copy import os from pandas.testing import assert_frame_equal import pandas as pd import numpy as np +from arcticdb import QueryBuilder + + def read_strings(): script_directory = os.path.dirname(os.path.abspath(__file__)) file_path = "{}/blns.txt".format(script_directory) @@ -65,6 +69,24 @@ def test_update_blns(lmdb_version_store): assert_frame_equal(df, vit.data) +def test_batch_read_blns(lmdb_version_store): + lib = lmdb_version_store + strings = read_strings() + num_symbols = 10 + symbols = [f"blns_batch_read_{idx}" for idx in range(num_symbols)] + dfs = [create_dataframe(strings) for _ in range(num_symbols)] + lib.batch_write(symbols, dfs) + q = QueryBuilder() + q = q[q["ints"] > 50] + qbs = (num_symbols // 2) * [None, copy.deepcopy(q)] + res = lib.batch_read(symbols, query_builder=qbs) + for idx, sym in enumerate(symbols): + expected = dfs[idx] + if idx % 2 == 1: + expected = expected[expected["ints"] > 50] + assert_frame_equal(expected, res[sym].data) + + def assert_dicts_of_dfs_equal(dict1, dict2): assert dict1.keys() == dict2.keys(), "Dictionary keys do not match" diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 0ead498da6..963ec7b371 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -1274,7 +1274,8 @@ def equals(x, y): assert x == y -def test_recursively_written_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data(basic_store, batch): samples = [ {"a": np.arange(5), "b": np.arange(8)}, # dict of np arrays (np.arange(5), np.arange(6)), # tuple of np arrays @@ -1283,41 +1284,64 @@ def test_recursively_written_data(basic_store): ] for idx, sample in enumerate(samples): - basic_store.write("sym_recursive" + str(idx), sample, recursive_normalizers=True) - basic_store.write("sym_pickle" + str(idx), sample) # pickled writes - recursive_data = basic_store.read("sym_recursive" + str(idx)).data - pickled_data = basic_store.read("sym_pickle" + str(idx)).data - equals(sample, recursive_data) - equals(pickled_data, recursive_data) + recursive_sym = "sym_recursive" + str(idx) + pickled_sym = "sym_pickled" + str(idx) + basic_store.write(recursive_sym, sample, recursive_normalizers=True) + basic_store.write(pickled_sym, sample) # pickled writes + if batch: + recursive_vit = basic_store.batch_read([recursive_sym])[recursive_sym] + pickled_vit = basic_store.batch_read([pickled_sym])[pickled_sym] + else: + recursive_vit = basic_store.read(recursive_sym) + pickled_vit = basic_store.read(pickled_sym) + equals(sample, recursive_vit.data) + equals(pickled_vit.data, recursive_vit.data) + assert recursive_vit.symbol == recursive_sym + assert pickled_vit.symbol == pickled_sym -def test_recursively_written_data_with_metadata(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data_with_metadata(basic_store, batch): samples = [ {"a": np.arange(5), "b": np.arange(8)}, # dict of np arrays (np.arange(5), np.arange(6)), # tuple of np arrays ] for idx, sample in enumerate(samples): - vit = basic_store.write( - "sym_recursive" + str(idx), sample, metadata={"something": 1}, recursive_normalizers=True - ) - recursive_data = basic_store.read("sym_recursive" + str(idx)).data - equals(sample, recursive_data) - assert vit.metadata == {"something": 1} + sym = "sym_recursive" + str(idx) + metadata = {"something": 1} + basic_store.write(sym, sample, metadata=metadata, recursive_normalizers=True) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(sample, vit.data) + assert vit.symbol == sym + assert vit.metadata == metadata -def test_recursively_written_data_with_nones(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursively_written_data_with_nones(basic_store, batch): sample = {"a": np.arange(5), "b": np.arange(8), "c": None} - - basic_store.write("sym_recursive", sample, recursive_normalizers=True) - basic_store.write("sym_pickle", sample) # pickled writes - recursive_data = basic_store.read("sym_recursive").data - pickled_data = basic_store.read("sym_recursive").data - equals(sample, recursive_data) - equals(pickled_data, recursive_data) + recursive_sym = "sym_recursive" + pickled_sym = "sym_pickled" + basic_store.write(recursive_sym, sample, recursive_normalizers=True) + basic_store.write(pickled_sym, sample) # pickled writes + if batch: + recursive_vit = basic_store.batch_read([recursive_sym])[recursive_sym] + pickled_vit = basic_store.batch_read([pickled_sym])[pickled_sym] + else: + recursive_vit = basic_store.read(recursive_sym) + pickled_vit = basic_store.read(pickled_sym) + equals(sample, recursive_vit.data) + equals(pickled_vit.data, recursive_vit.data) + assert recursive_vit.symbol == recursive_sym + assert pickled_vit.symbol == pickled_sym -def test_recursive_nested_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_recursive_nested_data(basic_store, batch): + sym = "test_recursive_nested_data" sample_data = {"a": {"b": {"c": {"d": np.arange(24)}}}} fl = Flattener() assert fl.can_flatten(sample_data) @@ -1326,8 +1350,13 @@ def test_recursive_nested_data(basic_store): assert len(to_write) == 1 equals(list(to_write.values())[0], np.arange(24)) - basic_store.write("s", sample_data, recursive_normalizers=True) - equals(basic_store.read("s").data, sample_data) + basic_store.write(sym, sample_data, recursive_normalizers=True) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(vit.data, sample_data) + assert vit.symbol == sym def test_named_tuple_flattening_rejected(): @@ -1374,13 +1403,20 @@ def test_recursive_normalizer_with_custom_class(): assert fl.is_normalizable_to_nested_structure(list_like_obj) -def test_really_large_symbol_for_recursive_data(basic_store): +@pytest.mark.parametrize("batch", (True, False)) +def test_really_large_symbol_for_recursive_data(basic_store, batch): + sym = "s" * 100 data = {"a" * 100: {"b" * 100: {"c" * 1000: {"d": np.arange(5)}}}} - basic_store.write("s" * 100, data, recursive_normalizers=True) + basic_store.write(sym, data, recursive_normalizers=True) fl = Flattener() metastruct, to_write = fl.create_meta_structure(data, "s" * 100) assert len(list(to_write.keys())[0]) < fl.MAX_KEY_LENGTH - equals(basic_store.read("s" * 100).data, data) + if batch: + vit = basic_store.batch_read([sym])[sym] + else: + vit = basic_store.read(sym) + equals(vit.data, data) + assert vit.symbol == sym def test_too_much_recursive_metastruct_data(monkeypatch, lmdb_version_store_v1): diff --git a/python/tests/unit/arcticdb/version_store/test_incompletes.py b/python/tests/unit/arcticdb/version_store/test_incompletes.py new file mode 100644 index 0000000000..e9ed2bacf6 --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_incompletes.py @@ -0,0 +1,48 @@ +""" +Copyright 2024 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" +import numpy as np +import pandas as pd +import pytest +from arcticdb.util.test import assert_frame_equal + + +@pytest.mark.parametrize("batch", (True, False)) +def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch): + lib = lmdb_version_store_v1 + lib_tool = lib.library_tool() + sym = "test_read_incompletes_with_indexed_data" + num_rows = 10 + df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) + lib.write(sym, df.iloc[:num_rows // 2]) + for idx in range(num_rows // 2, num_rows): + lib_tool.append_incomplete(sym, df.iloc[idx: idx+1]) + assert lib.has_symbol(sym) + if batch: + received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] + else: + received_vit = lib.read(sym, date_range=(df.index[1], df.index[-2]), incomplete=True) + assert received_vit.symbol == sym + assert_frame_equal(df.iloc[1:-1], received_vit.data) + + +@pytest.mark.parametrize("batch", (True, False)) +def test_read_incompletes_no_indexed_data(lmdb_version_store_v1, batch): + lib = lmdb_version_store_v1 + lib_tool = lib.library_tool() + sym = "test_read_incompletes_no_indexed_data" + num_rows = 10 + df = pd.DataFrame({"col": np.arange(num_rows)}, pd.date_range("2024-01-01", periods=num_rows)) + for idx in range(num_rows): + lib_tool.append_incomplete(sym, df.iloc[idx: idx+1]) + assert not lib.has_symbol(sym) + if batch: + received_vit = lib.batch_read([sym], date_ranges=[(df.index[1], df.index[-2])], incomplete=True)[sym] + else: + received_vit = lib.read(sym, date_range=(df.index[1], df.index[-2]), incomplete=True) + assert received_vit.symbol == sym + assert_frame_equal(df.iloc[1:-1], received_vit.data)