diff --git a/include/rocksdb/utilities/cache_dump_load.h b/include/rocksdb/utilities/cache_dump_load.h index fde03db7e68..c73c87d8c22 100644 --- a/include/rocksdb/utilities/cache_dump_load.h +++ b/include/rocksdb/utilities/cache_dump_load.h @@ -68,6 +68,10 @@ class CacheDumpReader { // dump or load process related control variables can be added here. struct CacheDumpOptions { SystemClock* clock; + // Deadline for dumper or loader in microseconds + std::chrono::microseconds deadline = std::chrono::microseconds::zero(); + // Max size bytes for dumper or loader + uint64_t max_size_bytes = 0; }; // NOTE that: this class is EXPERIMENTAL! May be changed in the future! @@ -110,6 +114,10 @@ class CacheDumpedLoader { return IOStatus::NotSupported( "RestoreCacheEntriesToSecondaryCache is not supported"); } + virtual IOStatus RestoreCacheEntriesToCache() { + return IOStatus::NotSupported( + "RestoreCacheEntriesToCache is not supported"); + } }; // Get the writer which stores all the metadata and data sequentially to a file @@ -136,7 +144,8 @@ Status NewDefaultCacheDumpedLoader( const BlockBasedTableOptions& toptions, const std::shared_ptr& secondary_cache, std::unique_ptr&& reader, - std::unique_ptr* cache_dump_loader); + std::unique_ptr* cache_dump_loader, + const std::shared_ptr& cache); } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/cache_dump_load.cc b/utilities/cache_dump_load.cc index 9a7c7679875..cab00195193 100644 --- a/utilities/cache_dump_load.cc +++ b/utilities/cache_dump_load.cc @@ -59,9 +59,10 @@ Status NewDefaultCacheDumpedLoader( const BlockBasedTableOptions& toptions, const std::shared_ptr& secondary_cache, std::unique_ptr&& reader, - std::unique_ptr* cache_dump_loader) { + std::unique_ptr* cache_dump_loader, + const std::shared_ptr& cache) { cache_dump_loader->reset(new CacheDumpedLoaderImpl( - dump_options, toptions, secondary_cache, std::move(reader))); + dump_options, toptions, secondary_cache, std::move(reader), cache)); return Status::OK(); } diff --git a/utilities/cache_dump_load_impl.cc b/utilities/cache_dump_load_impl.cc index 221634ea036..fbccc4a8ad1 100644 --- a/utilities/cache_dump_load_impl.cc +++ b/utilities/cache_dump_load_impl.cc @@ -7,6 +7,9 @@ #include "table/block_based/block_based_table_reader.h" #ifndef ROCKSDB_LITE +#include +#include + #include "utilities/cache_dump_load_impl.h" #include "cache/cache_entry_roles.h" @@ -17,6 +20,7 @@ #include "rocksdb/utilities/ldb_cmd.h" #include "table/format.h" #include "util/crc32c.h" +#include "memory/memory_allocator.h" namespace ROCKSDB_NAMESPACE { @@ -26,6 +30,7 @@ namespace ROCKSDB_NAMESPACE { // requirement. Status CacheDumperImpl::SetDumpFilter(std::vector db_list) { Status s = Status::OK(); + dump_all_keys_ = false; for (size_t i = 0; i < db_list.size(); i++) { assert(i < db_list.size()); TablePropertiesCollection ptc; @@ -68,6 +73,7 @@ IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() { return IOStatus::InvalidArgument("System clock is null"); } clock_ = options_.clock; + deadline_ = options_.deadline; // We copy the Cache Deleter Role Map as its member. role_map_ = CopyCacheDeleterRoleMap(); // Set the sequence number @@ -112,6 +118,19 @@ CacheDumperImpl::DumpOneBlockCallBack() { return [&](const Slice& key, void* value, size_t /*charge*/, Cache::DeleterFn deleter) { // Step 1: get the type of the block from role_map_ + if (options_.max_size_bytes > 0 && + dumped_size_bytes_ > options_.max_size_bytes) { + return; + } + + uint64_t timestamp = clock_->NowMicros(); + if (deadline_.count()) { + std::chrono::microseconds now = std::chrono::microseconds(timestamp); + if (now >= deadline_) { + return; + } + } + auto e = role_map_.find(deleter); CacheEntryRole role; CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax; @@ -123,7 +142,7 @@ CacheDumperImpl::DumpOneBlockCallBack() { bool filter_out = false; // Step 2: based on the key prefix, check if the block should be filter out. - if (ShouldFilterOut(key)) { + if (!dump_all_keys_ && ShouldFilterOut(key)) { filter_out = true; } @@ -175,8 +194,9 @@ CacheDumperImpl::DumpOneBlockCallBack() { // Step 4: if the block should not be filter out, write the block to the // CacheDumpWriter if (!filter_out && block_start != nullptr) { - WriteBlock(type, key, Slice(block_start, block_len)) + WriteBlock(type, key, Slice(block_start, block_len), timestamp) .PermitUncheckedError(); + dumped_size_bytes_ += block_len; } }; } @@ -190,8 +210,7 @@ CacheDumperImpl::DumpOneBlockCallBack() { // First, we write the metadata first, which is a fixed size string. Then, we // Append the dump unit string to the writer. IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key, - const Slice& value) { - uint64_t timestamp = clock_->NowMicros(); + const Slice& value, uint64_t timestamp) { uint32_t value_checksum = crc32c::Value(value.data(), value.size()); // First, serialize the block information in a string @@ -241,7 +260,8 @@ IOStatus CacheDumperImpl::WriteHeader() { "block_size, block_data, block_checksum> cache_value\n"; std::string header_value(s.str()); CacheDumpUnitType type = CacheDumpUnitType::kHeader; - return WriteBlock(type, header_key, header_value); + uint64_t timestamp = clock_->NowMicros(); + return WriteBlock(type, header_key, header_value, timestamp); } // Write the footer after all the blocks are stored to indicate the ending. @@ -249,7 +269,8 @@ IOStatus CacheDumperImpl::WriteFooter() { std::string footer_key = "footer"; std::string footer_value("cache dump completed"); CacheDumpUnitType type = CacheDumpUnitType::kFooter; - return WriteBlock(type, footer_key, footer_value); + uint64_t timestamp = clock_->NowMicros(); + return WriteBlock(type, footer_key, footer_value, timestamp); } // This is the main function to restore the cache entries to secondary cache. @@ -368,6 +389,151 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { } } +// This is the main function to restore the cache entries to secondary cache. +// First, we check if all the arguments are valid. Then, we read the block +// sequentially from the reader and insert them to the secondary cache. +IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToCache() { + // TODO: remove this line when options are used in the loader + (void)options_; + // Step 1: we check if all the arguments are valid + if (!cache_) { + return IOStatus::InvalidArgument("Cache is null"); + } + if (reader_ == nullptr) { + return IOStatus::InvalidArgument("CacheDumpReader is null"); + } + // we copy the Cache Deleter Role Map as its member. + role_map_ = CopyCacheDeleterRoleMap(); + + clock_ = options_.clock; + deadline_ = options_.deadline; + // Step 2: read the header + // TODO: we need to check the cache dump format version and RocksDB version + // after the header is read out. + IOStatus io_s; + DumpUnit dump_unit; + std::string data; + io_s = ReadHeader(&data, &dump_unit); + if (!io_s.ok()) { + return io_s; + } + + // Step 3: read out the rest of the blocks from the reader. The loop will stop + // either I/O status is not ok or we reach to the the end. + while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) { + dump_unit.reset(); + data.clear(); + uint64_t timestamp = clock_->NowMicros(); + if (deadline_.count()) { + std::chrono::microseconds now = std::chrono::microseconds(timestamp); + if (now >= deadline_) { + break; + } + } + if (loaded_size_bytes_ > options_.max_size_bytes) { + break; + } + // read the content and store in the dump_unit + io_s = ReadCacheBlock(&data, &dump_unit); + if (!io_s.ok()) { + break; + } + loaded_size_bytes_ += dump_unit.key.size(); + loaded_size_bytes_ += dump_unit.value_len; + // Create the uncompressed_block based on the information in the dump_unit + // (There is no block trailer here compatible with block-based SST file.) + CacheAllocationPtr cache_ptr = AllocateBlock(dump_unit.value_len, nullptr); + std::copy_n((char*)dump_unit.value, dump_unit.value_len, cache_ptr.get()); + BlockContents uncompressed_block(std::move(cache_ptr), dump_unit.value_len); + size_t charge = uncompressed_block.ApproximateMemoryUsage(); + Cache::CacheItemHelper* helper = nullptr; + Statistics* statistics = nullptr; + Status s = Status::OK(); + // according to the block type, get the helper callback function and create + // the corresponding block + switch (dump_unit.type) { + case CacheDumpUnitType::kFilter: { + helper = BlocklikeTraits::GetCacheItemHelper( + BlockType::kFilter); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper, charge); + if (s.ok()) { + block_holder.release(); + } + } + break; + } + case CacheDumpUnitType::kData: { + helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kData); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper, charge); + if (s.ok()) { + block_holder.release(); + } + } + break; + } + case CacheDumpUnitType::kIndex: { + helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kIndex); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(uncompressed_block), 0, statistics, false, + toptions_.filter_policy.get())); + if (helper != nullptr) { + s = cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper, charge); + if (s.ok()) { + block_holder.release(); + } + } + break; + } + case CacheDumpUnitType::kFilterMetaBlock: { + helper = BlocklikeTraits::GetCacheItemHelper( + BlockType::kFilterPartitionIndex); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper, charge); + if (s.ok()) { + block_holder.release(); + } + } + break; + } + case CacheDumpUnitType::kFooter: + break; + case CacheDumpUnitType::kDeprecatedFilterBlock: + // Obsolete + break; + default: + continue; + } + if (!s.ok()) { + io_s = status_to_io_status(std::move(s)); + } + } + if (dump_unit.type == CacheDumpUnitType::kFooter) { + return IOStatus::OK(); + } else { + return io_s; + } +} + + // Read and copy the dump unit metadata to std::string data, decode and create // the unit metadata based on the string IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data, diff --git a/utilities/cache_dump_load_impl.h b/utilities/cache_dump_load_impl.h index f45b3360b1e..35819d0dc45 100644 --- a/utilities/cache_dump_load_impl.h +++ b/utilities/cache_dump_load_impl.h @@ -97,14 +97,16 @@ class CacheDumperImpl : public CacheDumper { CacheDumperImpl(const CacheDumpOptions& dump_options, const std::shared_ptr& cache, std::unique_ptr&& writer) - : options_(dump_options), cache_(cache), writer_(std::move(writer)) {} + : options_(dump_options), cache_(cache), writer_(std::move(writer)) { + dumped_size_bytes_ = 0; + } ~CacheDumperImpl() { writer_.reset(); } Status SetDumpFilter(std::vector db_list) override; IOStatus DumpCacheEntriesToWriter() override; private: IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key, - const Slice& value); + const Slice& value, uint64_t timestamp); IOStatus WriteHeader(); IOStatus WriteFooter(); bool ShouldFilterOut(const Slice& key); @@ -122,6 +124,10 @@ class CacheDumperImpl : public CacheDumper { // improvement can be applied like BloomFilter or others to speedup the // filtering. std::set prefix_filter_; + // Deadline for dumper in microseconds. + std::chrono::microseconds deadline_; + uint64_t dumped_size_bytes_; + bool dump_all_keys_ = true; }; // The default implementation of CacheDumpedLoader @@ -130,13 +136,16 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader { CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options, const BlockBasedTableOptions& toptions, const std::shared_ptr& secondary_cache, - std::unique_ptr&& reader) + std::unique_ptr&& reader, + const std::shared_ptr& cache) : options_(dump_options), toptions_(toptions), secondary_cache_(secondary_cache), - reader_(std::move(reader)) {} + reader_(std::move(reader)), + cache_(cache) {} ~CacheDumpedLoaderImpl() {} IOStatus RestoreCacheEntriesToSecondaryCache() override; + IOStatus RestoreCacheEntriesToCache() override; private: IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta); @@ -149,6 +158,10 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader { std::shared_ptr secondary_cache_; std::unique_ptr reader_; UnorderedMap role_map_; + std::shared_ptr cache_; + SystemClock* clock_; + std::chrono::microseconds deadline_; + uint64_t loaded_size_bytes_ = 0; }; // The default implementation of CacheDumpWriter. We write the blocks to a file