Skip to content

Commit

Permalink
feat: add bpop cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSze1 committed Sep 21, 2024
1 parent ff31360 commit d525c40
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 5 deletions.
87 changes: 86 additions & 1 deletion src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

#include "common.h"
#include "config.h"
#include "log.h"
#include "kiwi.h"
#include "log.h"
#include "praft/praft.h"

namespace kiwi {
Expand Down Expand Up @@ -106,6 +106,91 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client,
BlockedConnNode::Type type) {
std::unique_lock<std::shared_mutex> latch(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
std::shared_ptr<std::atomic<bool>> is_done = std::make_shared<std::atomic<bool>>(false);
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};

auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
auto& wait_list_of_this_key = it->second;
wait_list_of_this_key->emplace_back(expire_time, client, type, is_done);
}
}

void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::shared_lock<std::shared_mutex> read_latch(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
read_latch.unlock();

std::unique_lock<std::shared_mutex> write_lock(g_kiwi->GetBlockMtx());
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
if (conn_blocked->is_done_->exchange(true)) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

PClient* BlockedClient = (*conn_blocked).GetBlockedClient();

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list
}
}

bool BlockedConnNode::IsExpired() {
if (expire_time_ == 0) {
return false;
}
auto now = std::chrono::system_clock::now();
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
if (expire_time_ <= now_in_ms) {
return true;
}
return false;
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
36 changes: 36 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameBRPop = "brpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLTrim = "ltrim";
Expand Down Expand Up @@ -210,6 +212,23 @@ enum AclCategory {
kAclCategoryRaft = (1 << 21),
};

class BlockedConnNode {
public:
enum Type { BLPop = 0, BRPop };
virtual ~BlockedConnNode() {}
BlockedConnNode(int64_t expire_time, PClient* client, Type type, std::shared_ptr<std::atomic<bool>> is_done)
: expire_time_(expire_time), client_(client), type_(type), is_done_(is_done) {}
bool IsExpired();
PClient* GetBlockedClient() { return client_; }
std::shared_ptr<std::atomic<bool>> is_done_;
Type GetCmdType() { return type_; }

private:
Type type_;
int64_t expire_time_;
PClient* client_;
};

/**
* @brief Base class for all commands
* BaseCmd, as the base class for all commands, mainly implements some common functions
Expand Down Expand Up @@ -273,6 +292,11 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);

void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, PClient* client,
BlockedConnNode::Type type);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down Expand Up @@ -312,4 +336,16 @@ class BaseCmdGroup : public BaseCmd {
private:
std::map<std::string, std::unique_ptr<BaseCmd>> subCmds_;
};

struct BlockKey { // this data struct is made for the scenario of multi dbs in pika.
int db_id;
std::string key;
bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; }
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id) ^ std::hash<std::string>{}(k.key);
}
};

} // namespace kiwi
2 changes: 2 additions & 0 deletions src/cmd_admin.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ void SortCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
client->SetKey(store_key_);
ServeAndUnblockConns(client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_keys.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ void RenameCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
client->SetKey(client->argv_[2]);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/cmd_list.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void LPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -74,6 +75,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendStringLen(-1);
} else if (s.IsInvalidArgument()) {
Expand All @@ -98,6 +101,7 @@ void RPushCmd::DoCmd(PClient* client) {
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down
25 changes: 25 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd {
private:
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class BRPopCmd : public BaseCmd {
public:
BRPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class LRangeCmd : public BaseCmd {
public:
LRangeCmd(const std::string& name, int16_t arity);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPush, -3);
ADD_COMMAND(RPush, -3);
ADD_COMMAND(RPop, 2);
ADD_COMMAND(BLPop, -3);
ADD_COMMAND(BRPop, -3);
ADD_COMMAND(LRem, 4);
ADD_COMMAND(LRange, 4);
ADD_COMMAND(LTrim, 4);
Expand Down
26 changes: 25 additions & 1 deletion src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& cl
client->OnConnect();
}

void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::unique_lock<std::shared_mutex> latch(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
} else if (conn_node->IsExpired()) {
PClient* conn_ptr = conn_node->GetBlockedClient();
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
} else {
conn_node++;
}
}
}
}

bool KiwiDB::Init() {
char runid[kRunidSize + 1] = "";
getRandomHexChars(runid, kRunidSize);
Expand Down Expand Up @@ -201,7 +221,7 @@ bool KiwiDB::Init() {
PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load());
}

event_server_ =std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);
event_server_ = std::make_unique<net::EventServer<std::shared_ptr<PClient>>>(num);

event_server_->SetRwSeparation(true);

Expand Down Expand Up @@ -232,6 +252,10 @@ bool KiwiDB::Init() {
timerTask->SetCallback([]() { PREPL.Cron(); });
event_server_->AddTimerTask(timerTask);

auto BLRPopTimerTask = std::make_shared<net::CommonTimerTask>(250);
BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this));
event_server_->AddTimerTask(BLRPopTimerTask);

time(&start_time_s_);

return true;
Expand Down
27 changes: 24 additions & 3 deletions src/kiwi.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ class KiwiDB final {
event_server_->SendPacket(client, std::move(msg));
}

std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>&
GetMapFromKeyToConns() {
return key_to_blocked_conns_;
}

std::shared_mutex& GetBlockMtx() { return block_mtx_; };

void ScanEvictedBlockedConnsOfBlrpop();
inline void SendPacket2Client(const std::shared_ptr<kiwi::PClient>& client, std::string&& msg) {
event_server_->SendPacket(client, std::move(msg));
}

inline void CloseConnection(const std::shared_ptr<kiwi::PClient>& client) {
event_server_->CloseConnection(client);
}
inline void CloseConnection(const std::shared_ptr<kiwi::PClient>& client) { event_server_->CloseConnection(client); }

void TCPConnect(
const net::SocketAddr& addr,
Expand Down Expand Up @@ -88,6 +94,21 @@ class KiwiDB final {
uint32_t cmd_id_ = 0;

time_t start_time_s_ = 0;

/*
* Blpop/BRpop used
*/
/* key_to_blocked_conns_:
* mapping from key to a list that stored the nodes of client-connections that
* were blocked by command blpop/brpop with key.
*/
std::unordered_map<kiwi::BlockKey, std::unique_ptr<std::list<kiwi::BlockedConnNode>>, kiwi::BlockKeyHash>
key_to_blocked_conns_;

/*
* latch of above map.
*/
std::shared_mutex block_mtx_;
};

extern std::unique_ptr<KiwiDB> g_kiwi;

0 comments on commit d525c40

Please sign in to comment.