From d525c40df8239cf79738ef0279f0a86a0dc5850d Mon Sep 17 00:00:00 2001 From: SamuelSze1 Date: Sat, 21 Sep 2024 16:47:23 +0800 Subject: [PATCH] feat: add bpop cmd --- src/base_cmd.cc | 87 +++++++++++++++++++++++++++++++++++++++- src/base_cmd.h | 36 +++++++++++++++++ src/cmd_admin.cc | 2 + src/cmd_keys.cc | 2 + src/cmd_list.cc | 4 ++ src/cmd_list.h | 25 ++++++++++++ src/cmd_table_manager.cc | 2 + src/kiwi.cc | 26 +++++++++++- src/kiwi.h | 27 +++++++++++-- 9 files changed, 206 insertions(+), 5 deletions(-) mode change 100644 => 100755 src/cmd_admin.cc mode change 100644 => 100755 src/cmd_keys.cc mode change 100644 => 100755 src/cmd_list.cc mode change 100644 => 100755 src/cmd_table_manager.cc diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 6ed3715..3032451 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -15,8 +15,8 @@ #include "common.h" #include "config.h" -#include "log.h" #include "kiwi.h" +#include "log.h" #include "praft/praft.h" namespace kiwi { @@ -106,6 +106,91 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) { return subCmd->second.get(); } +void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, + BlockedConnNode::Type type) { + std::unique_lock latch(g_kiwi->GetBlockMtx()); + auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); + std::shared_ptr> is_done = std::make_shared>(false); + for (auto key : keys) { + kiwi::BlockKey blpop_key{client->GetCurrentDB(), key}; + + auto it = key_to_conns.find(blpop_key); + if (it == key_to_conns.end()) { + key_to_conns.emplace(blpop_key, std::make_unique>()); + it = key_to_conns.find(blpop_key); + } + auto& wait_list_of_this_key = it->second; + wait_list_of_this_key->emplace_back(expire_time, client, type, is_done); + } +} + +void BaseCmd::ServeAndUnblockConns(PClient* client) { + kiwi::BlockKey key{client->GetCurrentDB(), client->Key()}; + + std::shared_lock read_latch(g_kiwi->GetBlockMtx()); + auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); + auto it = key_to_conns.find(key); + if (it == key_to_conns.end()) { + // no client is waitting for this key + return; + } + read_latch.unlock(); + + std::unique_lock write_lock(g_kiwi->GetBlockMtx()); + auto& waitting_list = it->second; + std::vector elements; + storage::Status s; + + // traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“ + for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) { + if (conn_blocked->is_done_->exchange(true)) { + conn_blocked = waitting_list->erase(conn_blocked); + continue; + } + + PClient* BlockedClient = (*conn_blocked).GetBlockedClient(); + + if (BlockedClient->State() == ClientState::kClosed) { + conn_blocked = waitting_list->erase(conn_blocked); + continue; + } + + switch (conn_blocked->GetCmdType()) { + case BlockedConnNode::Type::BLPop: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements); + break; + case BlockedConnNode::Type::BRPop: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements); + break; + } + + if (s.ok()) { + BlockedClient->AppendArrayLen(2); + BlockedClient->AppendString(client->Key()); + BlockedClient->AppendString(elements[0]); + } else if (s.IsNotFound()) { + // this key has no more elements to serve more blocked conn. + break; + } else { + BlockedClient->SetRes(CmdRes::kErrOther, s.ToString()); + } + BlockedClient->SendPacket(); + conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list + } +} + +bool BlockedConnNode::IsExpired() { + if (expire_time_ == 0) { + return false; + } + auto now = std::chrono::system_clock::now(); + int64_t now_in_ms = std::chrono::time_point_cast(now).time_since_epoch().count(); + if (expire_time_ <= now_in_ms) { + return true; + } + return false; +} + bool BaseCmdGroup::DoInitial(PClient* client) { client->SetSubCmdName(client->argv_[1]); if (!subCmds_.contains(client->SubCmdName())) { diff --git a/src/base_cmd.h b/src/base_cmd.h index 5329583..626708f 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -135,6 +135,8 @@ const std::string kCmdNameRPush = "rpush"; const std::string kCmdNameRPushx = "rpushx"; const std::string kCmdNameLPop = "lpop"; const std::string kCmdNameRPop = "rpop"; +const std::string kCmdNameBLPop = "blpop"; +const std::string kCmdNameBRPop = "brpop"; const std::string kCmdNameLRem = "lrem"; const std::string kCmdNameLRange = "lrange"; const std::string kCmdNameLTrim = "ltrim"; @@ -210,6 +212,23 @@ enum AclCategory { kAclCategoryRaft = (1 << 21), }; +class BlockedConnNode { + public: + enum Type { BLPop = 0, BRPop }; + virtual ~BlockedConnNode() {} + BlockedConnNode(int64_t expire_time, PClient* client, Type type, std::shared_ptr> is_done) + : expire_time_(expire_time), client_(client), type_(type), is_done_(is_done) {} + bool IsExpired(); + PClient* GetBlockedClient() { return client_; } + std::shared_ptr> is_done_; + Type GetCmdType() { return type_; } + + private: + Type type_; + int64_t expire_time_; + PClient* client_; +}; + /** * @brief Base class for all commands * BaseCmd, as the base class for all commands, mainly implements some common functions @@ -273,6 +292,11 @@ class BaseCmd : public std::enable_shared_from_this { uint32_t GetCmdID() const; + void ServeAndUnblockConns(PClient* client); + + void BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, + BlockedConnNode::Type type); + protected: // Execute a specific command virtual void DoCmd(PClient* client) = 0; @@ -312,4 +336,16 @@ class BaseCmdGroup : public BaseCmd { private: std::map> subCmds_; }; + +struct BlockKey { // this data struct is made for the scenario of multi dbs in pika. + int db_id; + std::string key; + bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } +}; +struct BlockKeyHash { + std::size_t operator()(const BlockKey& k) const { + return std::hash{}(k.db_id) ^ std::hash{}(k.key); + } +}; + } // namespace kiwi diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc old mode 100644 new mode 100755 index 3e214d6..12da844 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -592,6 +592,8 @@ void SortCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + client->SetKey(store_key_); + ServeAndUnblockConns(client); } else { client->SetRes(CmdRes::kErrOther, s.ToString()); } diff --git a/src/cmd_keys.cc b/src/cmd_keys.cc old mode 100644 new mode 100755 index 9003e63..bcbc763 --- a/src/cmd_keys.cc +++ b/src/cmd_keys.cc @@ -250,6 +250,8 @@ void RenameCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]); if (s.ok()) { client->SetRes(CmdRes::kOK); + client->SetKey(client->argv_[2]); + ServeAndUnblockConns(client); } else if (s.IsNotFound()) { client->SetRes(CmdRes::kNotFound, s.ToString()); } else { diff --git a/src/cmd_list.cc b/src/cmd_list.cc old mode 100644 new mode 100755 index 64a4dbe..383d514 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -27,6 +27,7 @@ void LPushCmd::DoCmd(PClient* client) { PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + ServeAndUnblockConns(client); } else if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { @@ -74,6 +75,8 @@ void RPoplpushCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value); if (s.ok()) { client->AppendString(value); + client->SetKey(receiver_); + ServeAndUnblockConns(client); } else if (s.IsNotFound()) { client->AppendStringLen(-1); } else if (s.IsInvalidArgument()) { @@ -98,6 +101,7 @@ void RPushCmd::DoCmd(PClient* client) { PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + ServeAndUnblockConns(client); } else if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { diff --git a/src/cmd_list.h b/src/cmd_list.h index 409681f..906090c 100644 --- a/src/cmd_list.h +++ b/src/cmd_list.h @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd { private: void DoCmd(PClient* client) override; }; + +class BLPopCmd : public BaseCmd { + public: + BLPopCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; + int64_t expire_time_{0}; +}; + +class BRPopCmd : public BaseCmd { + public: + BRPopCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; + int64_t expire_time_{0}; +}; + class LRangeCmd : public BaseCmd { public: LRangeCmd(const std::string& name, int16_t arity); diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc old mode 100644 new mode 100755 index 589ccd9..02b91d2 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -154,6 +154,8 @@ void CmdTableManager::InitCmdTable() { ADD_COMMAND(LPush, -3); ADD_COMMAND(RPush, -3); ADD_COMMAND(RPop, 2); + ADD_COMMAND(BLPop, -3); + ADD_COMMAND(BRPop, -3); ADD_COMMAND(LRem, 4); ADD_COMMAND(LRange, 4); ADD_COMMAND(LTrim, 4); diff --git a/src/kiwi.cc b/src/kiwi.cc index b9f6353..04a8777 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -165,6 +165,26 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr& cl client->OnConnect(); } +void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { + std::unique_lock latch(block_mtx_); + auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns(); + for (auto& it : key_to_blocked_conns) { + auto& conns_list = it.second; + for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) { + if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) { + conn_node = conns_list->erase(conn_node); + } else if (conn_node->IsExpired()) { + PClient* conn_ptr = conn_node->GetBlockedClient(); + conn_ptr->AppendString(""); + conn_ptr->SendPacket(); + conn_node = conns_list->erase(conn_node); + } else { + conn_node++; + } + } + } +} + bool KiwiDB::Init() { char runid[kRunidSize + 1] = ""; getRandomHexChars(runid, kRunidSize); @@ -201,7 +221,7 @@ bool KiwiDB::Init() { PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load()); } - event_server_ =std::make_unique>>(num); + event_server_ = std::make_unique>>(num); event_server_->SetRwSeparation(true); @@ -232,6 +252,10 @@ bool KiwiDB::Init() { timerTask->SetCallback([]() { PREPL.Cron(); }); event_server_->AddTimerTask(timerTask); + auto BLRPopTimerTask = std::make_shared(250); + BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this)); + event_server_->AddTimerTask(BLRPopTimerTask); + time(&start_time_s_); return true; diff --git a/src/kiwi.h b/src/kiwi.h index 6921d84..3fd68c6 100644 --- a/src/kiwi.h +++ b/src/kiwi.h @@ -54,13 +54,19 @@ class KiwiDB final { event_server_->SendPacket(client, std::move(msg)); } + std::unordered_map>, kiwi::BlockKeyHash>& + GetMapFromKeyToConns() { + return key_to_blocked_conns_; + } + + std::shared_mutex& GetBlockMtx() { return block_mtx_; }; + + void ScanEvictedBlockedConnsOfBlrpop(); inline void SendPacket2Client(const std::shared_ptr& client, std::string&& msg) { event_server_->SendPacket(client, std::move(msg)); } - inline void CloseConnection(const std::shared_ptr& client) { - event_server_->CloseConnection(client); - } + inline void CloseConnection(const std::shared_ptr& client) { event_server_->CloseConnection(client); } void TCPConnect( const net::SocketAddr& addr, @@ -88,6 +94,21 @@ class KiwiDB final { uint32_t cmd_id_ = 0; time_t start_time_s_ = 0; + + /* + * Blpop/BRpop used + */ + /* key_to_blocked_conns_: + * mapping from key to a list that stored the nodes of client-connections that + * were blocked by command blpop/brpop with key. + */ + std::unordered_map>, kiwi::BlockKeyHash> + key_to_blocked_conns_; + + /* + * latch of above map. + */ + std::shared_mutex block_mtx_; }; extern std::unique_ptr g_kiwi;