Skip to content

Commit

Permalink
feat: support zpopmin/zpopmax/zunionstore/zinterstore
Browse files Browse the repository at this point in the history
issue: OpenAtomFoundation#30
issue: OpenAtomFoundation#280

Signed-off-by: HappyUncle <[email protected]>
  • Loading branch information
happy-v587 committed May 26, 2024
1 parent 128d408 commit 83f3215
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Run this command, compare with redis use pipeline commands, try it.
- sadd scard srem sismember smembers sdiff sdiffstore sinter sinterstore sunion sunionstore smove spop srandmember sscan

#### sorted set commands
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands
- subscribe unsubscribe publish psubscribe punsubscribe pubsub
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ RocksDB 可以配置为 PikiwiDB 的持久化存储引擎,可以存储更多

#### sorted set commands

- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands

Expand Down
4 changes: 4 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ const std::string kCmdNameRPoplpush = "rpoplpush";

// zset cmd
const std::string kCmdNameZAdd = "zadd";
const std::string kCmdNameZPopMin = "zpopmin";
const std::string kCmdNameZPopMax = "zpopmax";
const std::string kCmdNameZInterstore = "zinterstore";
const std::string kCmdNameZUnionstore = "zunionstore";
const std::string kCmdNameZRevrange = "zrevrange";
const std::string kCmdNameZRangebyscore = "zrangebyscore";
const std::string kCmdNameZRemrangebyscore = "zremrangebyscore";
Expand Down
4 changes: 4 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ void CmdTableManager::InitCmdTable() {

// zset
ADD_COMMAND(ZAdd, -4);
ADD_COMMAND(ZPopMin, -2);
ADD_COMMAND(ZPopMax, -2);
ADD_COMMAND(ZInterstore, -4);
ADD_COMMAND(ZUnionstore, -4);
ADD_COMMAND(ZRevrange, -4);
ADD_COMMAND(ZRangebyscore, -4);
ADD_COMMAND(ZRemrangebyscore, 4);
Expand Down
181 changes: 181 additions & 0 deletions src/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,187 @@ void ZAddCmd::DoCmd(PClient* client) {
}
}

ZPopMinCmd::ZPopMinCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMinCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMinCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMin(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZPopMaxCmd::ZPopMaxCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMaxCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMaxCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMax(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZsetUIstoreParentCmd::ZsetUIstoreParentCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

// ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
// ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
bool ZsetUIstoreParentCmd::DoInitial(PClient* client) {
auto argv_ = client->argv_;
dest_key_ = argv_[1];
if (pstd::String2int(argv_[2].data(), argv_[2].size(), &num_keys_) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (num_keys_ < 1) {
client->SetRes(CmdRes::kErrOther, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return false;
}
auto argc = argv_.size();
if (argc < num_keys_ + 3) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
keys_.assign(argv_.begin() + 3, argv_.begin() + 3 + num_keys_);
weights_.assign(num_keys_, 1);
auto index = num_keys_ + 3;
while (index < argc) {
if (strcasecmp(argv_[index].data(), "weights") == 0) {
index++;
if (argc < index + num_keys_) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
double weight;
auto base = index;
for (; index < base + num_keys_; index++) {
if (pstd::String2d(argv_[index].data(), argv_[index].size(), &weight) == 0) {
client->SetRes(CmdRes::kErrOther, "weight value is not a float");
return false;
}
weights_[index - base] = weight;
}
} else if (strcasecmp(argv_[index].data(), "aggregate") == 0) {
index++;
if (argc < index + 1) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
if (strcasecmp(argv_[index].data(), "sum") == 0) {
aggregate_ = storage::SUM;
} else if (strcasecmp(argv_[index].data(), "min") == 0) {
aggregate_ = storage::MIN;
} else if (strcasecmp(argv_[index].data(), "max") == 0) {
aggregate_ = storage::MAX;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
index++;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
}
return true;
}

ZInterstoreCmd::ZInterstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZInterstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZInterstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::vector<storage::ScoreMember> value_to_dest_;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZInterstore(dest_key_, keys_, weights_, aggregate_, value_to_dest_, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZUnionstoreCmd::ZUnionstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZUnionstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZUnionstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::map<std::string, double> value_to_dest;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZUnionstore(dest_key_, keys_, weights_, aggregate_, value_to_dest, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZRevrangeCmd::ZRevrangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {}

Expand Down
58 changes: 58 additions & 0 deletions src/cmd_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,64 @@ class ZAddCmd : public BaseCmd {
void DoCmd(PClient *client) override;
};

class ZPopMinCmd : public BaseCmd {
public:
ZPopMinCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZPopMaxCmd : public BaseCmd {
public:
ZPopMaxCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZsetUIstoreParentCmd : public BaseCmd {
public:
ZsetUIstoreParentCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

std::string dest_key_;
int64_t num_keys_ = 0;
storage::AGGREGATE aggregate_{storage::SUM};
std::vector<std::string> keys_;
std::vector<double> weights_;
};

class ZInterstoreCmd : public ZsetUIstoreParentCmd {
public:
ZInterstoreCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZUnionstoreCmd : public ZsetUIstoreParentCmd {
public:
ZUnionstoreCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZRevrangeCmd : public BaseCmd {
public:
ZRevrangeCmd(const std::string &name, int16_t arity);
Expand Down
Loading

0 comments on commit 83f3215

Please sign in to comment.