diff --git a/cmake/rediscache.cmake b/cmake/rediscache.cmake index cdf943c..d9567cc 100644 --- a/cmake/rediscache.cmake +++ b/cmake/rediscache.cmake @@ -14,7 +14,7 @@ ExternalProject_Add( #URL https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz #URL_HASH MD5=02c8aadc018dd8d4d3803cc420d1d75b #temp used - GIT_REPOSITORY git@github.com:hahahashen/rediscache.git + GIT_REPOSITORY https://github.com/hahahashen/rediscache.git GIT_TAG feat/removeUseTcMallocMacroDefinition CMAKE_ARGS -DCMAKE_BUILD_TYPE=Debug diff --git a/src/cache/redisCache.h b/src/cache/redisCache.h index 9aac92d..399f626 100644 --- a/src/cache/redisCache.h +++ b/src/cache/redisCache.h @@ -104,46 +104,46 @@ class RedisCache { Status RPushx(std::string &key, std::vector &values); // // Set Commands - // Status SAdd(std::string& key, std::vector &members); - // Status SCard(const std::string& key, uint64_t *len); - // Status SIsmember(std::string& key, std::string& member); - // Status SMembers(std::string& key, std::vector *members); - // Status SRem(std::string& key, std::vector &members); - // Status SRandmember(std::string& key, int64_t count, std::vector *members); - - // // Zset Commands - // Status ZAdd(std::string& key, std::vector &score_members); - // Status ZCard(const std::string& key, uint64_t *len); - // Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len); - // Status ZIncrby(std::string& key, std::string& member, double increment); - // Status ZRange(std::string& key, - // int64_t start, int64_t stop, - // std::vector *score_members); - // Status ZRangebyscore(std::string& key, - // std::string &min, std::string &max, - // std::vector *score_members, - // int64_t offset = 0, int64_t count = -1); - // Status ZRank(std::string& key, std::string& member, int64_t *rank); - // Status ZRem(std::string& key, std::vector &members); - // Status ZRemrangebyrank(std::string& key, std::string &min, std::string &max); - // Status ZRemrangebyscore(std::string& key, std::string &min, std::string &max); - // Status ZRevrange(std::string& key, - // int64_t start, int64_t stop, - // std::vector *score_members); - // Status ZRevrangebyscore(std::string& key, - // std::string &min, std::string &max, - // std::vector *score_members, - // int64_t offset = 0, int64_t count = -1); - // Status ZRevrangebylex(std::string& key, - // std::string &min, std::string &max, - // std::vector *members); - // Status ZRevrank(std::string& key, std::string& member, int64_t *rank); - // Status ZScore(std::string& key, std::string& member, double *score); - // Status ZRangebylex(std::string& key, - // std::string &min, std::string &max, - // std::vector *members); - // Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len); - // Status ZRemrangebylex(std::string& key, std::string &min, std::string &max); + Status SAdd(std::string& key, std::vector &members); + Status SCard(const std::string& key, uint64_t *len); + Status SIsmember(std::string& key, std::string& member); + Status SMembers(std::string& key, std::vector *members); + Status SRem(std::string& key, std::vector &members); + Status SRandmember(std::string& key, int64_t count, std::vector *members); + + // Zset Commands + Status ZAdd(std::string& key, std::vector &score_members); + Status ZCard(const std::string& key, uint64_t *len); + Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len); + Status ZIncrby(std::string& key, std::string& member, double increment); + Status ZRange(std::string& key, + int64_t start, int64_t stop, + std::vector *score_members); + Status ZRangebyscore(std::string& key, + std::string &min, std::string &max, + std::vector *score_members, + int64_t offset = 0, int64_t count = -1); + Status ZRank(std::string& key, std::string& member, int64_t *rank); + Status ZRem(std::string& key, std::vector &members); + Status ZRemrangebyrank(std::string& key, std::string &min, std::string &max); + Status ZRemrangebyscore(std::string& key, std::string &min, std::string &max); + Status ZRevrange(std::string& key, + int64_t start, int64_t stop, + std::vector *score_members); + Status ZRevrangebyscore(std::string& key, + std::string &min, std::string &max, + std::vector *score_members, + int64_t offset = 0, int64_t count = -1); + Status ZRevrangebylex(std::string& key, + std::string &min, std::string &max, + std::vector *members); + Status ZRevrank(std::string& key, std::string& member, int64_t *rank); + Status ZScore(std::string& key, std::string& member, double *score); + Status ZRangebylex(std::string& key, + std::string &min, std::string &max, + std::vector *members); + Status ZLexcount(std::string& key, std::string &min, std::string &max, uint64_t *len); + Status ZRemrangebylex(std::string& key, std::string &min, std::string &max); // // Bit Commands // Status SetBit(std::string& key, size_t offset, int64_t value); diff --git a/src/cache/set.cc b/src/cache/set.cc new file mode 100644 index 0000000..e968f2b --- /dev/null +++ b/src/cache/set.cc @@ -0,0 +1,136 @@ +// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "pstd_defer.h" +#include "redisCache.h" + +namespace cache { + +Status RedisCache::SAdd(std::string& key, std::vector &members) { + int ret = RcFreeMemoryIfNeeded(cache_); + if (C_OK != ret) { + return Status::Corruption("[error] Free memory faild !"); + } + + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + robj **vals = (robj **)zcallocate(sizeof(robj *) * members.size()); + for (unsigned int i = 0; i < members.size(); ++i) { + vals[i] = createObject(OBJ_STRING, sdsnewlen(members[i].data(), members[i].size())); + } + DEFER { + FreeObjectList(vals, members.size()); + DecrObjectsRefCount(kobj); + }; + int res = RcSAdd(cache_, kobj, vals, members.size()); + if (C_OK != res) { + return Status::Corruption("RcSAdd failed"); + } + + return Status::OK(); +} + +Status RedisCache::SCard(const std::string& key, uint64_t *len) { + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + int ret = RcSCard(cache_, kobj, reinterpret_cast(len)); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcSCard failed"); + } + + return Status::OK(); +} + +Status RedisCache::SIsmember(std::string& key, std::string& member) { + int is_member = 0; + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + robj *mobj = createObject(OBJ_STRING, sdsnewlen(member.data(), member.size())); + DEFER { + DecrObjectsRefCount(kobj, mobj); + }; + int ret = RcSIsmember(cache_, kobj, mobj, &is_member); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("SIsmember failed"); + } + + return is_member ? Status::OK() : Status::NotFound("member not exist"); +} + +Status RedisCache::SMembers(std::string& key, std::vector *members) { + sds *vals = nullptr; + unsigned long vals_size = 0; + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + int ret = RcSMembers(cache_, kobj, &vals, &vals_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcSMembers failed"); + } + + for (unsigned long i = 0; i < vals_size; ++i) { + members->push_back(std::string(vals[i], sdslen(vals[i]))); + } + + FreeSdsList(vals, vals_size); + return Status::OK(); +} + +Status RedisCache::SRem(std::string& key, std::vector &members) { + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + robj **vals = (robj **)zcallocate(sizeof(robj *) * members.size()); + for (unsigned int i = 0; i < members.size(); ++i) { + vals[i] = createObject(OBJ_STRING, sdsnewlen(members[i].data(), members[i].size())); + } + DEFER { + FreeObjectList(vals, members.size()); + DecrObjectsRefCount(kobj); + }; + + int ret = RcSRem(cache_, kobj, vals, members.size()); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcSRem failed"); + } + + return Status::OK(); +} + +Status RedisCache::SRandmember(std::string& key, int64_t count, std::vector *members) { + sds *vals = nullptr; + unsigned long vals_size = 0; + robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); + DEFER { + DecrObjectsRefCount(kobj); + }; + int ret = RcSRandmember(cache_, kobj, count, &vals, &vals_size); + if (C_OK != ret) { + if (REDIS_KEY_NOT_EXIST == ret) { + return Status::NotFound("key not in cache"); + } + return Status::Corruption("RcSRandmember failed"); + } + + for (unsigned long i = 0; i < vals_size; ++i) { + members->push_back(std::string(vals[i], sdslen(vals[i]))); + } + + FreeSdsList(vals, vals_size); + return Status::OK(); +} + +} // namespace cache \ No newline at end of file diff --git a/src/cmd_set.cc b/src/cmd_set.cc index 896e448..83c23de 100644 --- a/src/cmd_set.cc +++ b/src/cmd_set.cc @@ -14,7 +14,7 @@ namespace pikiwidb { SIsMemberCmd::SIsMemberCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsReadonly|kCmdFlagsSet |kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache, kAclCategoryRead | kAclCategorySet) {} bool SIsMemberCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -22,17 +22,42 @@ bool SIsMemberCmd::DoInitial(PClient* client) { } void SIsMemberCmd::DoCmd(PClient* client) { int32_t reply_Num = 0; // only change to 1 if ismember . key not exist it is 0 - auto s = + s_ = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SIsmember(client->Key(), client->argv_[2], &reply_Num); - if (s.IsInvalidArgument()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); return; } client->AppendInteger(reply_Num); } +void SIsMemberCmd::ReadCache(PClient *client) { + auto key=client->Key(); + auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SIsmember(key, client->argv_[2]); + if (s.ok()) { + client->AppendContent(":1"); + } else if (s.IsNotFound()) { + client->SetRes(CmdRes::kCacheMiss); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + + +void SIsMemberCmd::DoThroughDB(PClient *client) { + client->Clear(); + DoCmd(client); +} + +void SIsMemberCmd::DoUpdateCache(PClient *client) { + if (s_.ok()) { + auto key=client->Key(); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->PushKeyToAsyncLoadQueue(KEY_TYPE_SET, key, client); + } +} + SAddCmd::SAddCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet |kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SAddCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -43,18 +68,30 @@ bool SAddCmd::DoInitial(PClient* client) { void SAddCmd::DoCmd(PClient* client) { const std::vector members(client->argv_.begin() + 2, client->argv_.end()); int32_t ret = 0; - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SAdd(client->Key(), members, &ret); - if (s.ok()) { + s_ = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SAdd(client->Key(), members, &ret); + if (s_.ok()) { client->AppendInteger(ret); - } else if (s.IsInvalidArgument()) { + } else if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kSyntaxErr, "sadd cmd error"); } } +void SAddCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SAddCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + auto key=client->Key(); + std::vector members(client->argv_.begin() + 2, client->argv_.end()); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SAddIfKeyExist(key, members); + } +} + SUnionStoreCmd::SUnionStoreCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SUnionStoreCmd::DoInitial(PClient* client) { std::vector keys(client->argv_.begin() + 1, client->argv_.end()); @@ -66,11 +103,11 @@ void SUnionStoreCmd::DoCmd(PClient* client) { std::vector keys(client->Keys().begin() + 1, client->Keys().end()); std::vector value_to_dest; int32_t ret = 0; - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + s_ = PSTORE.GetBackend(client->GetCurrentDB()) ->GetStorage() ->SUnionstore(client->Keys().at(0), keys, value_to_dest, &ret); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); return; } @@ -78,6 +115,19 @@ void SUnionStoreCmd::DoCmd(PClient* client) { } client->AppendInteger(ret); } + +void SUnionStoreCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SUnionStoreCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + std::vector v; + v.emplace_back(client->Keys().at(0)); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->Del(v); + } +} + SInterCmd::SInterCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} @@ -103,7 +153,7 @@ void SInterCmd::DoCmd(PClient* client) { } SRemCmd::SRemCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet |kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SRemCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -112,18 +162,30 @@ bool SRemCmd::DoInitial(PClient* client) { void SRemCmd::DoCmd(PClient* client) { std::vector to_delete_members(client->argv_.begin() + 2, client->argv_.end()); - int32_t reply_num = 0; - storage::Status s = - PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SRem(client->Key(), to_delete_members, &reply_num); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + + s_ = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SRem(client->Key(), to_delete_members, &deleted_num); + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kErrOther, "srem cmd error"); } return; } - client->AppendInteger(reply_num); + client->AppendInteger(deleted_num); +} + +void SRemCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SRemCmd::DoUpdateCache(PClient* client) { + if (s_.ok() && deleted_num > 0) { + auto key=client->Key(); + std::vector to_delete_members(client->argv_.begin() + 2, client->argv_.end()); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SRem(key, to_delete_members); + } } SUnionCmd::SUnionCmd(const std::string& name, int16_t arity) @@ -150,7 +212,7 @@ void SUnionCmd::DoCmd(PClient* client) { } SInterStoreCmd::SInterStoreCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SInterStoreCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -162,11 +224,11 @@ void SInterStoreCmd::DoCmd(PClient* client) { int32_t reply_num = 0; std::vector inter_keys(client->argv_.begin() + 2, client->argv_.end()); - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + s_ = PSTORE.GetBackend(client->GetCurrentDB()) ->GetStorage() ->SInterstore(client->Key(), inter_keys, value_to_dest, &reply_num); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kSyntaxErr, "sinterstore cmd error"); @@ -176,8 +238,20 @@ void SInterStoreCmd::DoCmd(PClient* client) { client->AppendInteger(reply_num); } +void SInterStoreCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SInterStoreCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + std::vector v; + v.emplace_back(client->Key()); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->Del(v); + } +} + SCardCmd::SCardCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsReadonly|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache, kAclCategoryRead | kAclCategorySet) {} bool SCardCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -185,36 +259,61 @@ bool SCardCmd::DoInitial(PClient* client) { } void SCardCmd::DoCmd(PClient* client) { int32_t reply_Num = 0; - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SCard(client->Key(), &reply_Num); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + s_ = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SCard(client->Key(), &reply_Num); + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kSyntaxErr, "scard cmd error"); } return; } - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { client->AppendInteger(reply_Num); return; } client->SetRes(CmdRes::kSyntaxErr, "scard cmd error"); } +void SCardCmd::ReadCache(PClient* client) { + uint64_t card = 0; + auto key=client->Key(); + auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SCard(key, &card); + if (s.ok()) { + client->AppendInteger(card); + } else if (s.IsNotFound()) { + client->SetRes(CmdRes::kCacheMiss); + } else { + client->SetRes(CmdRes::kErrOther, "scard error"); + } +} + +void SCardCmd::DoThroughDB(PClient* client) { + client->Clear(); + DoCmd(client); +} + +void SCardCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + auto key=client->Key(); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->PushKeyToAsyncLoadQueue(KEY_TYPE_SET, key, client); + } +} + SMoveCmd::SMoveCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SMoveCmd::DoInitial(PClient* client) { return true; } void SMoveCmd::DoCmd(PClient* client) { int32_t reply_num = 0; - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + s_ = PSTORE.GetBackend(client->GetCurrentDB()) ->GetStorage() ->SMove(client->argv_[1], client->argv_[2], client->argv_[3], &reply_num); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { client->AppendInteger(reply_num); } else { - if (s.IsInvalidArgument()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kErrOther, "smove cmd error"); @@ -223,8 +322,21 @@ void SMoveCmd::DoCmd(PClient* client) { } } +void SMoveCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SMoveCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + std::vector members; + members.emplace_back(client->argv_[3]); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SRem(client->argv_[1], members); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SAddIfKeyExist(client->argv_[2], members); + } +} + SRandMemberCmd::SRandMemberCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsReadonly|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache, kAclCategoryRead | kAclCategorySet) {} bool SRandMemberCmd::DoInitial(PClient* client) { if (client->argv_.size() > 3) { @@ -264,8 +376,35 @@ void SRandMemberCmd::DoCmd(PClient* client) { client->AppendString(""); } +void SRandMemberCmd::ReadCache(PClient* client) { + std::vector vec_ret; + auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SRandmember(client->argv_[1], this->num_rand, &vec_ret); + if (s.ok()) { + if (client->argv_.size() == 3) { + client->AppendStringVector(vec_ret); + } else if (client->argv_.size() == 2) { // srand only needs to return one element + client->AppendString(vec_ret[0]); + } + } else if (s.IsNotFound()) { + client->SetRes(CmdRes::kCacheMiss); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +void SRandMemberCmd::DoThroughDB(PClient* client) { + client->Clear(); + DoCmd(client); +} + +void SRandMemberCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->PushKeyToAsyncLoadQueue(KEY_TYPE_SET, client->argv_[1], client); + } +} + SPopCmd::SPopCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SPopCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -273,12 +412,10 @@ bool SPopCmd::DoInitial(PClient* client) { } void SPopCmd::DoCmd(PClient* client) { - std::vector delete_members; if ((client->argv_.size()) == 2) { int64_t cnt = 1; - std::vector delete_member; storage::Status s = - PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SPop(client->Key(), &delete_member, cnt); + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SPop(client->Key(), &deleted_members_, cnt); if (!s.ok()) { if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); @@ -287,17 +424,16 @@ void SPopCmd::DoCmd(PClient* client) { } return; } - client->AppendString(delete_member[0]); + client->AppendString(deleted_members_[0]); } else if ((client->argv_.size()) == 3) { - std::vector delete_members; int64_t cnt = 1; if (client->argv_[2].find(".") != std::string::npos || !pstd::String2int(client->argv_[2], &cnt)) { client->SetRes(CmdRes::kInvalidInt); return; } storage::Status s = - PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SPop(client->Key(), &delete_members, cnt); + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SPop(client->Key(), &deleted_members_, cnt); if (!s.ok()) { if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); @@ -306,7 +442,7 @@ void SPopCmd::DoCmd(PClient* client) { } return; } - client->AppendStringVector(delete_members); + client->AppendStringVector(deleted_members_); } else { client->SetRes(CmdRes::kWrongNum, "spop"); @@ -314,8 +450,19 @@ void SPopCmd::DoCmd(PClient* client) { } } +void SPopCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SPopCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + auto key=client->Key(); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SRem(key, deleted_members_); + } +} + SMembersCmd::SMembersCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsReadonly|kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache, kAclCategoryRead | kAclCategorySet) {} bool SMembersCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -324,9 +471,9 @@ bool SMembersCmd::DoInitial(PClient* client) { void SMembersCmd::DoCmd(PClient* client) { std::vector delete_members; - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SMembers(client->Key(), &delete_members); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + s_ = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SMembers(client->Key(), &delete_members); + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kSyntaxErr, "smembers cmd error"); @@ -336,6 +483,35 @@ void SMembersCmd::DoCmd(PClient* client) { client->AppendStringVector(delete_members); } +void SMembersCmd::ReadCache(PClient* client) { + std::vector members; + auto key=client->Key(); + auto s = PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->SMembers(key, &members); + if (s.ok()) { + client->AppendArrayLen(members.size()); + for (const auto& member : members) { + client->AppendStringLen(member.size()); + client->AppendContent(member); + } + } else if (s.IsNotFound()) { + client->SetRes(CmdRes::kCacheMiss); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +void SMembersCmd::DoThroughDB(PClient* client) { + client->Clear(); + DoCmd(client); +} + +void SMembersCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + auto key=client->Key(); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->PushKeyToAsyncLoadQueue(KEY_TYPE_SET, key, client); + } +} + SDiffCmd::SDiffCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} @@ -360,7 +536,7 @@ void SDiffCmd::DoCmd(PClient* client) { } SDiffstoreCmd::SDiffstoreCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySet) {} + : BaseCmd(name, arity, kCmdFlagsWrite|kCmdFlagsSet |kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache, kAclCategoryWrite | kAclCategorySet) {} bool SDiffstoreCmd::DoInitial(PClient* client) { client->SetKey(client->argv_[1]); @@ -371,11 +547,11 @@ void SDiffstoreCmd::DoCmd(PClient* client) { std::vector value_to_dest; int32_t reply_num = 0; std::vector diffstore_keys(client->argv_.begin() + 2, client->argv_.end()); - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB()) + s_ = PSTORE.GetBackend(client->GetCurrentDB()) ->GetStorage() ->SDiffstore(client->Key(), diffstore_keys, value_to_dest, &reply_num); - if (!s.ok()) { - if (s.IsInvalidArgument()) { + if (!s_.ok()) { + if (s_.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { client->SetRes(CmdRes::kSyntaxErr, "sdiffstore cmd error"); @@ -385,6 +561,18 @@ void SDiffstoreCmd::DoCmd(PClient* client) { client->AppendInteger(reply_num); } +void SDiffstoreCmd::DoThroughDB(PClient* client) { + DoCmd(client); +} + +void SDiffstoreCmd::DoUpdateCache(PClient* client) { + if (s_.ok()) { + std::vector v; + v.emplace_back(client->Key()); + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->Del(v); + } +} + SScanCmd::SScanCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySet) {} diff --git a/src/cmd_set.h b/src/cmd_set.h index 8f6253b..9bbfc7b 100644 --- a/src/cmd_set.h +++ b/src/cmd_set.h @@ -19,6 +19,10 @@ class SIsMemberCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + void ReadCache(PClient *client) override; + storage::Status s_; }; class SAddCmd : public BaseCmd { @@ -30,6 +34,9 @@ class SAddCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; }; class SUnionStoreCmd : public BaseCmd { @@ -41,6 +48,9 @@ class SUnionStoreCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; }; class SRemCmd : public BaseCmd { @@ -52,6 +62,10 @@ class SRemCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; + int32_t deleted_num = 0; }; class SUnionCmd : public BaseCmd { @@ -85,6 +99,9 @@ class SInterStoreCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; }; class SCardCmd : public BaseCmd { @@ -96,6 +113,10 @@ class SCardCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + void ReadCache(PClient *client) override; + storage::Status s_; }; class SMoveCmd : public BaseCmd { @@ -107,6 +128,9 @@ class SMoveCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; }; class SRandMemberCmd : public BaseCmd { @@ -119,6 +143,10 @@ class SRandMemberCmd : public BaseCmd { private: void DoCmd(PClient *client) override; int num_rand = 1; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + void ReadCache(PClient *client) override; + storage::Status s_; }; class SPopCmd : public BaseCmd { @@ -130,6 +158,10 @@ class SPopCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; + std::vector deleted_members_; }; class SMembersCmd : public BaseCmd { @@ -141,6 +173,10 @@ class SMembersCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + void ReadCache(PClient *client) override; + storage::Status s_; }; class SDiffCmd : public BaseCmd { @@ -163,6 +199,9 @@ class SDiffstoreCmd : public BaseCmd { private: void DoCmd(PClient *client) override; + void DoThroughDB(PClient *client) override; + void DoUpdateCache(PClient *client) override; + storage::Status s_; }; class SScanCmd : public BaseCmd { diff --git a/src/pcache.cc b/src/pcache.cc index 08075c5..10f2b89 100644 --- a/src/pcache.cc +++ b/src/pcache.cc @@ -564,73 +564,73 @@ Status PCache::RPushnxWithoutTTL(std::string &key, std::vector &val // /*----------------------------------------------------------------------------- // * Set Commands // *----------------------------------------------------------------------------*/ -// Status PCache::SAdd(std::string& key, std::vector &members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SAdd(key, members); -// } +Status PCache::SAdd(std::string& key, std::vector &members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SAdd(key, members); +} -// Status PCache::SAddIfKeyExist(std::string& key, std::vector &members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// if (caches_[cache_index]->Exists(key)) { -// return caches_[cache_index]->SAdd(key, members); -// } -// return Status::NotFound("key not exist"); -// } +Status PCache::SAddIfKeyExist(std::string& key, std::vector &members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + if (caches_[cache_index]->Exists(key)) { + return caches_[cache_index]->SAdd(key, members); + } + return Status::NotFound("key not exist"); +} -// Status PCache::SAddnx(std::string& key, std::vector &members, int64_t ttl) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// if (!caches_[cache_index]->Exists(key)) { -// caches_[cache_index]->SAdd(key, members); -// caches_[cache_index]->Expire(key, ttl); -// return Status::OK(); -// } else { -// return Status::NotFound("key exist"); -// } -// } +Status PCache::SAddnx(std::string& key, std::vector &members, int64_t ttl) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + if (!caches_[cache_index]->Exists(key)) { + caches_[cache_index]->SAdd(key, members); + caches_[cache_index]->Expire(key, ttl); + return Status::OK(); + } else { + return Status::NotFound("key exist"); + } +} -// Status PCache::SAddnxWithoutTTL(std::string& key, std::vector &members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// if (!caches_[cache_index]->Exists(key)) { -// caches_[cache_index]->SAdd(key, members); -// return Status::OK(); -// } else { -// return Status::NotFound("key exist"); -// } -// } +Status PCache::SAddnxWithoutTTL(std::string& key, std::vector &members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + if (!caches_[cache_index]->Exists(key)) { + caches_[cache_index]->SAdd(key, members); + return Status::OK(); + } else { + return Status::NotFound("key exist"); + } +} -// Status PCache::SCard(std::string& key, uint64_t *len) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SCard(key, len); -// } +Status PCache::SCard(std::string& key, uint64_t *len) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SCard(key, len); +} -// Status PCache::SIsmember(std::string& key, std::string& member) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SIsmember(key, member); -// } +Status PCache::SIsmember(std::string& key, std::string& member) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SIsmember(key, member); +} -// Status PCache::SMembers(std::string& key, std::vector *members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SMembers(key, members); -// } +Status PCache::SMembers(std::string& key, std::vector *members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SMembers(key, members); +} -// Status PCache::SRem(std::string& key, std::vector &members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SRem(key, members); -// } +Status PCache::SRem(std::string& key, std::vector &members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SRem(key, members); +} -// Status PCache::SRandmember(std::string& key, int64_t count, std::vector *members) { -// int cache_index = CacheIndex(key); -// std::lock_guard lm(*cache_mutexs_[cache_index]); -// return caches_[cache_index]->SRandmember(key, count, members); -// } +Status PCache::SRandmember(std::string& key, int64_t count, std::vector *members) { + int cache_index = CacheIndex(key); + std::lock_guard lm(*cache_mutexs_[cache_index]); + return caches_[cache_index]->SRandmember(key, count, members); +} // /*----------------------------------------------------------------------------- // * ZSet Commands @@ -1528,18 +1528,18 @@ Status PCache::WriteListToCache(std::string &key, std::vector &valu return Status::OK(); } -// Status PCache::WriteSetToCache(std::string& key, std::vector &members, int64_t ttl) { -// if (0 >= ttl) { -// if (PIKA_TTL_NONE == ttl) { -// return SAddnxWithoutTTL(key, members); -// } else { -// return Del({key}); -// } -// } else { -// return SAddnx(key, members, ttl); -// } -// return Status::OK(); -// } +Status PCache::WriteSetToCache(std::string& key, std::vector &members, int64_t ttl) { + if (0 >= ttl) { + if (PCache_TTL_NONE == ttl) { + return SAddnxWithoutTTL(key, members); + } else { + return Del({key}); + } + } else { + return SAddnx(key, members, ttl); + } + return Status::OK(); +} // Status PCache::WriteZSetToCache(std::string& key, std::vector &score_members, int64_t ttl) { // if (0 >= ttl) { diff --git a/src/pcache.h b/src/pcache.h index 30f345e..8efafe4 100644 --- a/src/pcache.h +++ b/src/pcache.h @@ -124,15 +124,15 @@ class PCache : public pstd::noncopyable, public std::enable_shared_from_this& values); // Set Commands - // rocksdb::Status SAdd(std::string& key, std::vector& members); - // rocksdb::Status SAddIfKeyExist(std::string& key, std::vector& members); - // rocksdb::Status SAddnx(std::string& key, std::vector& members, int64_t ttl); - // rocksdb::Status SAddnxWithoutTTL(std::string& key, std::vector& members); - // rocksdb::Status SCard(std::string& key, uint64_t* len); - // rocksdb::Status SIsmember(std::string& key, std::string& member); - // rocksdb::Status SMembers(std::string& key, std::vector* members); - // rocksdb::Status SRem(std::string& key, std::vector& members); - // rocksdb::Status SRandmember(std::string& key, int64_t count, std::vector* members); + rocksdb::Status SAdd(std::string& key, std::vector& members); + rocksdb::Status SAddIfKeyExist(std::string& key, std::vector& members); + rocksdb::Status SAddnx(std::string& key, std::vector& members, int64_t ttl); + rocksdb::Status SAddnxWithoutTTL(std::string& key, std::vector& members); + rocksdb::Status SCard(std::string& key, uint64_t* len); + rocksdb::Status SIsmember(std::string& key, std::string& member); + rocksdb::Status SMembers(std::string& key, std::vector* members); + rocksdb::Status SRem(std::string& key, std::vector& members); + rocksdb::Status SRandmember(std::string& key, int64_t count, std::vector* members); // ZSet Commands // rocksdb::Status ZAdd(std::string& key, std::vector& score_members); diff --git a/src/pcache_load_thread.cc b/src/pcache_load_thread.cc index 8cece45..0d1f076 100644 --- a/src/pcache_load_thread.cc +++ b/src/pcache_load_thread.cc @@ -66,23 +66,23 @@ bool PCacheLoadThread::LoadKV(std::string& key, PClient* client) { return true; } -// bool PCacheLoadThread::LoadHash(std::string& key, PClient* client) { -// int32_t len = 0; -// db->storage()->HLen(key, &len); -// if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { -// return false; -// } +bool PCacheLoadThread::LoadHash(std::string& key, PClient* client) { + int32_t len = 0; + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HLen(key, &len); + if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { + return false; + } -// std::vector fvs; -// int64_t ttl = -1; -// rocksdb::Status s = db->storage()->HGetallWithTTL(key, &fvs, &ttl); -// if (!s.ok()) { -// LOG(WARNING) << "load hash failed, key=" << key; -// return false; -// } -// db->cache()->WriteHashToCache(key, fvs, ttl); -// return true; -// } + std::vector fvs; + int64_t ttl = -1; + rocksdb::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->HGetallWithTTL(key, &fvs, &ttl); + if (!s.ok()) { + WARN("load hash failed, key={}",key); + return false; + } + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->WriteHashToCache(key, fvs, ttl); + return true; +} bool PCacheLoadThread::LoadList(std::string& key, PClient* client) { uint64_t len = 0; @@ -103,31 +103,30 @@ bool PCacheLoadThread::LoadList(std::string& key, PClient* client) { return true; } -// bool PCacheLoadThread::LoadSet(std::string& key,PClient* client) { -// int32_t len = 0; -// db->storage()->SCard(key, &len); -// if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { -// LOG(WARNING) << "can not load key, because item size:" << len -// << " beyond max item size:" << CACHE_VALUE_ITEM_MAX_SIZE; -// return false; -// } +bool PCacheLoadThread::LoadSet(std::string& key,PClient* client) { + int32_t len = 0; + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SCard(key, &len); + if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) { + WARN("can not load key, because item size:{} beyond max item size:{}",len,CACHE_VALUE_ITEM_MAX_SIZE); + return false; + } -// std::vector values; -// int64_t ttl = -1; -// rocksdb::Status s = db->storage()->SMembersWithTTL(key, &values, &ttl); -// if (!s.ok()) { -// LOG(WARNING) << "load set failed, key=" << key; -// return false; -// } -// db->cache()->WriteSetToCache(key, values, ttl); -// return true; -// } + std::vector values; + int64_t ttl = -1; + rocksdb::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->SMembersWithTTL(key, &values, &ttl); + if (!s.ok()) { + WARN("load set failed, key={}",key); + return false; + } + PSTORE.GetBackend(client->GetCurrentDB())->GetCache()->WriteSetToCache(key, values, ttl); + return true; +} // bool PCacheLoadThread::LoadZset(std::string& key, PClient* client) { // int32_t len = 0; // int start_index = 0; // int stop_index = -1; -// db->storage()->ZCard(key, &len); +// PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZCard(key, &len); // if (0 >= len) { // return false; // } @@ -165,12 +164,12 @@ bool PCacheLoadThread::LoadKey(const char key_type, std::string& key, PClient* c switch (key_type) { case 'k': return LoadKV(key, client); - // case 'h': - // return LoadHash(key, client); + case 'h': + return LoadHash(key, client); case 'l': return LoadList(key, client); - // case 's': - // return LoadSet(key, client); + case 's': + return LoadSet(key, client); // case 'z': // return LoadZset(key, client); default: diff --git a/src/pcache_load_thread.h b/src/pcache_load_thread.h index 49b494f..dff316a 100644 --- a/src/pcache_load_thread.h +++ b/src/pcache_load_thread.h @@ -29,9 +29,9 @@ class PCacheLoadThread : public Thread { private: bool LoadKV(std::string& key, PClient* client); - // bool LoadHash(std::string& key, PClient* client); + bool LoadHash(std::string& key, PClient* client); bool LoadList(std::string& key, PClient* client); - // bool LoadSet(std::string& key, PClient* client); + bool LoadSet(std::string& key, PClient* client); // bool LoadZset(std::string& key, PClient* client); bool LoadKey(const char key_type, std::string& key, PClient* client); virtual void* ThreadMain() override;