Skip to content

Commit

Permalink
fix: let mset/msetnx/setx/setbit/SDiffstore/SInterstore/SMove/SPop/SU…
Browse files Browse the repository at this point in the history
…nionstore command use new batch

issue: OpenAtomFoundation#280

Signed-off-by: HappyUncle <[email protected]>
  • Loading branch information
happy-v587 committed May 17, 2024
1 parent 2f3b7fe commit 74184f5
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 37 deletions.
62 changes: 31 additions & 31 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st
return rocksdb::Status::Corruption("SDiffsotre invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -354,23 +354,23 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down Expand Up @@ -460,7 +460,7 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
return rocksdb::Status::Corruption("SInterstore invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -549,23 +549,23 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down Expand Up @@ -679,7 +679,7 @@ Status Redis::SMembersWithTTL(const Slice& key, std::vector<std::string>* member

rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;

uint64_t version = 0;
Expand Down Expand Up @@ -712,8 +712,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.ModifyCount(-1);
batch.Put(handles_[kSetsMetaCF], base_source.Encode(), meta_value);
batch.Delete(handles_[kSetsDataCF], sets_member_key.Encode());
batch->Put(kSetsMetaCF, base_source.Encode(), meta_value);
batch->Delete(kSetsDataCF, sets_member_key.Encode());
statistic++;
} else if (s.IsNotFound()) {
*ret = 0;
Expand All @@ -736,10 +736,10 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
if (parsed_sets_meta_value.IsStale() || parsed_sets_meta_value.Count() == 0) {
version = parsed_sets_meta_value.InitialMetaValue();
parsed_sets_meta_value.SetCount(1);
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue i_val(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode());
} else {
std::string member_value;
version = parsed_sets_meta_value.Version();
Expand All @@ -751,8 +751,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
}
parsed_sets_meta_value.ModifyCount(1);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value);
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value);
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
} else if (!s.ok()) {
return s;
}
Expand All @@ -762,14 +762,14 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
EncodeFixed32(str, 1);
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue iter_value(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode());
} else {
return s;
}
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, source.ToString(), 1);
return s;
}
Expand All @@ -778,7 +778,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
std::default_random_engine engine;

std::string meta_value;
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = pstd::NowMicros();
Expand All @@ -801,14 +801,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
auto iter = db_->NewIterator(default_read_options_, handles_[kSetsDataCF]);
for (iter->Seek(sets_member_key.EncodeSeekKey()); iter->Valid() && cur_index < size;
iter->Next(), cur_index++) {
batch.Delete(handles_[kSetsDataCF], iter->key());
batch->Delete(kSetsDataCF, iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
}

// parsed_sets_meta_value.ModifyCount(-cnt);
// batch.Put(handles_[kSetsMetaCF], key, meta_value);
batch.Delete(handles_[kSetsMetaCF], base_meta_key.Encode());
batch->Delete(kSetsMetaCF, base_meta_key.Encode());
delete iter;

} else {
Expand Down Expand Up @@ -838,7 +838,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
}
if (sets_index.find(cur_index) != sets_index.end()) {
del_count++;
batch.Delete(handles_[kSetsDataCF], iter->key());
batch->Delete(kSetsDataCF, iter->key());
ParsedSetsMemberKey parsed_sets_member_key(iter->key());
members->push_back(parsed_sets_member_key.member().ToString());
}
Expand All @@ -848,14 +848,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.ModifyCount(static_cast<int32_t>(-cnt));
batch.Put(handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value);
batch->Put(kSetsMetaCF, base_meta_key.Encode(), meta_value);
delete iter;
}
}
} else {
return s;
}
return db_->Write(default_write_options_, &batch);
return batch->Commit();
}

rocksdb::Status Redis::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); }
Expand Down Expand Up @@ -1042,7 +1042,7 @@ rocksdb::Status Redis::SUnionstore(const Slice& destination, const std::vector<s
return rocksdb::Status::Corruption("SUnionstore invalid parameter, no keys");
}

rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
rocksdb::ReadOptions read_options;
const rocksdb::Snapshot* snapshot;

Expand Down Expand Up @@ -1097,23 +1097,23 @@ rocksdb::Status Redis::SUnionstore(const Slice& destination, const std::vector<s
return Status::InvalidArgument("set size overflow");
}
parsed_sets_meta_value.SetCount(static_cast<int32_t>(members.size()));
batch.Put(handles_[kSetsMetaCF], destination, meta_value);
batch->Put(kSetsMetaCF, destination, meta_value);
} else if (s.IsNotFound()) {
char str[4];
EncodeFixed32(str, members.size());
SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t)));
version = sets_meta_value.UpdateVersion();
batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode());
batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode());
} else {
return s;
}
for (const auto& member : members) {
SetsMemberKey sets_member_key(destination, version, member);
BaseDataValue i_val(Slice{});
batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode());
batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode());
}
*ret = static_cast<int32_t>(members.size());
s = db_->Write(default_write_options_, &batch);
s = batch->Commit();
UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic);
value_to_dest = std::move(members);
return s;
Expand Down
14 changes: 9 additions & 5 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,13 @@ Status Redis::MSet(const std::vector<KeyValue>& kvs) {
}

MultiScopeRecordLock ml(lock_mgr_, keys);
rocksdb::WriteBatch batch;
auto batch = Batch::CreateBatch(this);
for (const auto& kv : kvs) {
BaseKey base_key(kv.key);
StringsValue strings_value(kv.value);
batch.Put(base_key.Encode(), strings_value.Encode());
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
}
return db_->Write(default_write_options_, &batch);
return batch->Commit();
}

Status Redis::MSetnx(const std::vector<KeyValue>& kvs, int32_t* ret) {
Expand Down Expand Up @@ -712,7 +712,9 @@ Status Redis::SetBit(const Slice& key, int64_t offset, int32_t on, int32_t* ret)
}
StringsValue strings_value(data_value);
strings_value.SetEtime(timestamp);
return db_->Put(rocksdb::WriteOptions(), base_key.Encode(), strings_value.Encode());
auto batch = Batch::CreateBatch(this);
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
return batch->Commit();
} else {
return s;
}
Expand All @@ -730,7 +732,9 @@ Status Redis::Setex(const Slice& key, const Slice& value, uint64_t ttl) {

BaseKey base_key(key);
ScopeRecordLock l(lock_mgr_, key);
return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode());
auto batch = Batch::CreateBatch(this);
batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode());
return batch->Commit();
}

Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, const uint64_t ttl) {
Expand Down
Loading

0 comments on commit 74184f5

Please sign in to comment.