From 8014e216e6a993dd8df3d8f51f11d9b8a491c4a2 Mon Sep 17 00:00:00 2001 From: Oleg Pipikin Date: Mon, 9 Dec 2024 15:51:48 +0000 Subject: [PATCH] Update plugins api to load cached model with mmap buffer --- .../openvino/runtime/internal_properties.hpp | 7 + src/inference/src/cache_manager.hpp | 6 +- src/inference/src/dev/core_impl.cpp | 5 +- .../tests/functional/caching_test.cpp | 136 ++++++++++++++++++ src/plugins/intel_cpu/src/plugin.cpp | 5 + src/plugins/intel_cpu/src/utils/serialize.cpp | 13 +- src/plugins/intel_cpu/src/utils/serialize.hpp | 7 +- 7 files changed, 169 insertions(+), 10 deletions(-) diff --git a/src/inference/dev_api/openvino/runtime/internal_properties.hpp b/src/inference/dev_api/openvino/runtime/internal_properties.hpp index 60d6b66cfda897..bec304104581ac 100644 --- a/src/inference/dev_api/openvino/runtime/internal_properties.hpp +++ b/src/inference/dev_api/openvino/runtime/internal_properties.hpp @@ -9,6 +9,7 @@ #pragma once +#include "openvino/runtime/aligned_buffer.hpp" #include "openvino/runtime/properties.hpp" #include "openvino/runtime/threading/istreams_executor.hpp" @@ -36,6 +37,12 @@ static constexpr Property, PropertyMutability::RO> cac */ static constexpr Property caching_with_mmap{"CACHING_WITH_MMAP"}; +/** + * @brief Property to get a ov::AlignedBuffer with cached model + * @ingroup ov_dev_api_plugin_api + */ +static constexpr Property, PropertyMutability::RW> cached_model_buffer{"CACHED_MODEL_BUFFER"}; + /** * @brief Allow to create exclusive_async_requests with one executor * @ingroup ov_dev_api_plugin_api diff --git a/src/inference/src/cache_manager.hpp b/src/inference/src/cache_manager.hpp index c441811c3cfd02..82813e5dd4788f 100644 --- a/src/inference/src/cache_manager.hpp +++ b/src/inference/src/cache_manager.hpp @@ -69,7 +69,7 @@ class ICacheManager { /** * @brief Function passing created input stream */ - using StreamReader = std::function; + using StreamReader = std::function)>; /** * @brief Callback when OpenVINO intends to read model from cache @@ -143,10 +143,10 @@ class FileStorageCacheManager final : public ICacheManager { std::make_shared>>(mmap->data(), mmap->size(), mmap); OwningSharedStreamBuffer buf(shared_buffer); std::istream stream(&buf); - reader(stream); + reader(stream, shared_buffer); } else { std::ifstream stream(blob_file_name, std::ios_base::binary); - reader(stream); + reader(stream, nullptr); } } } diff --git a/src/inference/src/dev/core_impl.cpp b/src/inference/src/dev/core_impl.cpp index 244d27b5eebb67..673f6fd569a11e 100644 --- a/src/inference/src/dev/core_impl.cpp +++ b/src/inference/src/dev/core_impl.cpp @@ -1413,7 +1413,7 @@ ov::SoPtr ov::CoreImpl::load_model_from_cache( cacheContent.blobId, coreConfig.get_enable_mmap() && ov::util::contains(plugin.get_property(ov::internal::supported_properties), ov::internal::caching_with_mmap), - [&](std::istream& networkStream) { + [&](std::istream& networkStream, std::shared_ptr model_buffer) { OV_ITT_SCOPE(FIRST_INFERENCE, ov::itt::domains::LoadTime, "Core::load_model_from_cache::ReadStreamAndImport"); @@ -1459,6 +1459,9 @@ ov::SoPtr ov::CoreImpl::load_model_from_cache( update_config[ov::weights_path.name()] = weights_path; } } + if (model_buffer) { + update_config[ov::internal::cached_model_buffer.name()] = model_buffer; + } compiled_model = context ? plugin.import_model(networkStream, context, update_config) : plugin.import_model(networkStream, update_config); }); diff --git a/src/inference/tests/functional/caching_test.cpp b/src/inference/tests/functional/caching_test.cpp index 5b01af9a22cde8..6b1c7f938ae731 100644 --- a/src/inference/tests/functional/caching_test.cpp +++ b/src/inference/tests/functional/caching_test.cpp @@ -2424,6 +2424,142 @@ TEST_P(CachingTest, Load_threads) { std::cout << "Caching Load multiple threads test completed. Tried " << index << " times" << std::endl; } +TEST_P(CachingTest, Load_mmap) { + ON_CALL(*mockPlugin, import_model(_, _)).WillByDefault(Invoke([&](std::istream& istr, const ov::AnyMap& config) { + if (m_checkConfigCb) { + m_checkConfigCb(config); + } + std::shared_ptr model_buffer; + if (config.count(ov::internal::cached_model_buffer.name())) + model_buffer = config.at(ov::internal::cached_model_buffer.name()).as>(); + EXPECT_TRUE(model_buffer); + + std::string name; + istr >> name; + char space; + istr.read(&space, 1); + std::lock_guard lock(mock_creation_mutex); + return create_mock_compiled_model(m_models[name], mockPlugin); + })); + + ON_CALL(*mockPlugin, get_property(ov::internal::supported_properties.name(), _)) + .WillByDefault(Invoke([&](const std::string&, const ov::AnyMap&) { + return std::vector{ov::internal::caching_properties.name(), + ov::internal::caching_with_mmap.name()}; + })); + EXPECT_CALL(*mockPlugin, get_property(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, query_model(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::device::architecture.name(), _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::internal::caching_properties.name(), _)).Times(AnyNumber()); + if (m_remoteContext) { + return; // skip the remote Context test for Multi plugin + } + int index = 0; + m_post_mock_net_callbacks.emplace_back([&](MockICompiledModelImpl& net) { + EXPECT_CALL(net, export_model(_)).Times(1); + }); + MkDirGuard guard(m_cacheDir); + EXPECT_CALL(*mockPlugin, compile_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, compile_model(A&>(), _)).Times(1); + EXPECT_CALL(*mockPlugin, import_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, import_model(_, _)).Times(1); + testLoad([&](ov::Core& core) { + core.set_property({{ov::cache_dir.name(), m_cacheDir}}); + m_testFunction(core); + m_testFunction(core); + }); + std::cout << "Caching Load multiple threads test completed. Tried " << index << " times" << std::endl; +} + +TEST_P(CachingTest, Load_mmap_is_disabled) { + ON_CALL(*mockPlugin, import_model(_, _)).WillByDefault(Invoke([&](std::istream& istr, const ov::AnyMap& config) { + if (m_checkConfigCb) { + m_checkConfigCb(config); + } + std::shared_ptr model_buffer; + if (config.count(ov::internal::cached_model_buffer.name())) + model_buffer = config.at(ov::internal::cached_model_buffer.name()).as>(); + EXPECT_FALSE(model_buffer); + + std::string name; + istr >> name; + char space; + istr.read(&space, 1); + std::lock_guard lock(mock_creation_mutex); + return create_mock_compiled_model(m_models[name], mockPlugin); + })); + ON_CALL(*mockPlugin, get_property(ov::internal::supported_properties.name(), _)) + .WillByDefault(Invoke([&](const std::string&, const ov::AnyMap&) { + return std::vector{ov::internal::caching_properties.name(), + ov::internal::caching_with_mmap.name()}; + })); + EXPECT_CALL(*mockPlugin, get_property(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, query_model(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::device::architecture.name(), _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::internal::caching_properties.name(), _)).Times(AnyNumber()); + if (m_remoteContext) { + return; // skip the remote Context test for Multi plugin + } + int index = 0; + m_post_mock_net_callbacks.emplace_back([&](MockICompiledModelImpl& net) { + EXPECT_CALL(net, export_model(_)).Times(1); + }); + MkDirGuard guard(m_cacheDir); + EXPECT_CALL(*mockPlugin, compile_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, compile_model(A&>(), _)).Times(1); + EXPECT_CALL(*mockPlugin, import_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, import_model(_, _)).Times(1); + testLoad([&](ov::Core& core) { + core.set_property({{ov::cache_dir.name(), m_cacheDir}}); + core.set_property({ov::enable_mmap(false)}); + m_testFunction(core); + m_testFunction(core); + }); + std::cout << "Caching Load multiple threads test completed. Tried " << index << " times" << std::endl; +} + +TEST_P(CachingTest, Load_mmap_is_not_supported_by_plugin) { + ON_CALL(*mockPlugin, import_model(_, _)).WillByDefault(Invoke([&](std::istream& istr, const ov::AnyMap& config) { + if (m_checkConfigCb) { + m_checkConfigCb(config); + } + std::shared_ptr model_buffer; + if (config.count(ov::internal::cached_model_buffer.name())) + model_buffer = config.at(ov::internal::cached_model_buffer.name()).as>(); + EXPECT_FALSE(model_buffer); + + std::string name; + istr >> name; + char space; + istr.read(&space, 1); + std::lock_guard lock(mock_creation_mutex); + return create_mock_compiled_model(m_models[name], mockPlugin); + })); + EXPECT_CALL(*mockPlugin, get_property(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, query_model(_, _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::device::architecture.name(), _)).Times(AnyNumber()); + EXPECT_CALL(*mockPlugin, get_property(ov::internal::caching_properties.name(), _)).Times(AnyNumber()); + if (m_remoteContext) { + return; // skip the remote Context test for Multi plugin + } + int index = 0; + m_post_mock_net_callbacks.emplace_back([&](MockICompiledModelImpl& net) { + EXPECT_CALL(net, export_model(_)).Times(1); + }); + MkDirGuard guard(m_cacheDir); + EXPECT_CALL(*mockPlugin, compile_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, compile_model(A&>(), _)).Times(1); + EXPECT_CALL(*mockPlugin, import_model(_, _, _)).Times(0); + EXPECT_CALL(*mockPlugin, import_model(_, _)).Times(1); + testLoad([&](ov::Core& core) { + core.set_property({{ov::cache_dir.name(), m_cacheDir}}); + core.set_property({ov::enable_mmap(true)}); + m_testFunction(core); + m_testFunction(core); + }); + std::cout << "Caching Load multiple threads test completed. Tried " << index << " times" << std::endl; +} + #if defined(ENABLE_OV_IR_FRONTEND) static std::string getTestCaseName(const testing::TestParamInfo>& obj) { diff --git a/src/plugins/intel_cpu/src/plugin.cpp b/src/plugins/intel_cpu/src/plugin.cpp index 6fdbf7a4ea4dee..264c7a4a038243 100644 --- a/src/plugins/intel_cpu/src/plugin.cpp +++ b/src/plugins/intel_cpu/src/plugin.cpp @@ -565,8 +565,13 @@ std::shared_ptr Plugin::import_model(std::istream& model_str decript_from_string = true; } + std::shared_ptr model_buffer; + if (config.count(ov::internal::cached_model_buffer.name())) + model_buffer = config.at(ov::internal::cached_model_buffer.name()).as>(); + ModelDeserializer deserializer( model_stream, + model_buffer, [this](const std::shared_ptr& model, const std::shared_ptr& weights) { return get_core()->read_model(model, weights); }, diff --git a/src/plugins/intel_cpu/src/utils/serialize.cpp b/src/plugins/intel_cpu/src/utils/serialize.cpp index 814e8d19311a8c..33d8140fbe4a84 100644 --- a/src/plugins/intel_cpu/src/utils/serialize.cpp +++ b/src/plugins/intel_cpu/src/utils/serialize.cpp @@ -30,8 +30,12 @@ void ModelSerializer::operator<<(const std::shared_ptr& model) { ////////// ModelDeserializer ////////// -ModelDeserializer::ModelDeserializer(std::istream& model_stream, ModelBuilder fn, const CacheDecrypt& decrypt_fn, bool decript_from_string) - : m_istream(model_stream), m_model_builder(std::move(fn)), m_decript_from_string(decript_from_string) { +ModelDeserializer::ModelDeserializer(std::istream& model_stream, + std::shared_ptr model_buffer, + ModelBuilder fn, + const CacheDecrypt& decrypt_fn, + bool decript_from_string) + : m_istream(model_stream), m_model_builder(std::move(fn)), m_decript_from_string(decript_from_string), m_model_buffer(model_buffer) { if (m_decript_from_string) { m_cache_decrypt.m_decrypt_str = decrypt_fn.m_decrypt_str; } else { @@ -42,9 +46,8 @@ ModelDeserializer::ModelDeserializer(std::istream& model_stream, ModelBuilder fn void ModelDeserializer::set_info(pugi::xml_node& root, std::shared_ptr& model) {} void ModelDeserializer::operator>>(std::shared_ptr& model) { - if (auto mmap_buffer = dynamic_cast(m_istream.rdbuf())) { - auto buffer = mmap_buffer->get_buffer(); - process_mmap(model, buffer); + if (m_model_buffer) { + process_mmap(model, m_model_buffer); } else { process_stream(model); } diff --git a/src/plugins/intel_cpu/src/utils/serialize.hpp b/src/plugins/intel_cpu/src/utils/serialize.hpp index 897a2c2e52f092..4dfdd6b22afbd4 100644 --- a/src/plugins/intel_cpu/src/utils/serialize.hpp +++ b/src/plugins/intel_cpu/src/utils/serialize.hpp @@ -31,7 +31,11 @@ class ModelDeserializer { public: typedef std::function(const std::shared_ptr&, const std::shared_ptr&)> ModelBuilder; - ModelDeserializer(std::istream& model, ModelBuilder fn, const CacheDecrypt& encrypt_fn, bool decript_from_string); + ModelDeserializer(std::istream& model, + std::shared_ptr model_buffer, + ModelBuilder fn, + const CacheDecrypt& encrypt_fn, + bool decript_from_string); virtual ~ModelDeserializer() = default; @@ -48,6 +52,7 @@ class ModelDeserializer { ModelBuilder m_model_builder; CacheDecrypt m_cache_decrypt; bool m_decript_from_string; + std::shared_ptr m_model_buffer; }; } // namespace intel_cpu