diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index a97e6c959..2ec883af8 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -280,7 +280,7 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector(members.size())); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value); } else if (s.IsNotFound()) { char str[4]; EncodeFixed32(str, members.size()); SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t))); version = sets_meta_value.UpdateVersion(); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode()); + batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode()); } else { return s; } for (const auto& member : members) { SetsMemberKey sets_member_key(destination, version, member); BaseDataValue iter_value(Slice{}); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode()); + batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode()); } *ret = static_cast(members.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic); value_to_dest = std::move(members); return s; @@ -461,7 +461,7 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector(members.size())); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value); } else if (s.IsNotFound()) { char str[4]; EncodeFixed32(str, members.size()); SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t))); version = sets_meta_value.UpdateVersion(); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode()); + batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode()); } else { return s; } for (const auto& member : members) { SetsMemberKey sets_member_key(destination, version, member); BaseDataValue iter_value(Slice{}); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode()); + batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode()); } *ret = static_cast(members.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic); value_to_dest = std::move(members); return s; @@ -680,7 +680,7 @@ Status Redis::SMembersWithTTL(const Slice& key, std::vector* member rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, const Slice& member, int32_t* ret) { *ret = 0; - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); rocksdb::ReadOptions read_options; uint64_t version = 0; @@ -713,8 +713,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(-1); - batch.Put(handles_[kSetsMetaCF], base_source.Encode(), meta_value); - batch.Delete(handles_[kSetsDataCF], sets_member_key.Encode()); + batch->Put(kSetsMetaCF, base_source.Encode(), meta_value); + batch->Delete(kSetsDataCF, sets_member_key.Encode()); statistic++; } else if (s.IsNotFound()) { *ret = 0; @@ -737,10 +737,10 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons if (parsed_sets_meta_value.IsStale() || parsed_sets_meta_value.Count() == 0) { version = parsed_sets_meta_value.InitialMetaValue(); parsed_sets_meta_value.SetCount(1); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value); SetsMemberKey sets_member_key(destination, version, member); BaseDataValue i_val(Slice{}); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode()); + batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode()); } else { std::string member_value; version = parsed_sets_meta_value.Version(); @@ -752,8 +752,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons } parsed_sets_meta_value.ModifyCount(1); BaseDataValue iter_value(Slice{}); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), meta_value); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode()); + batch->Put(kSetsMetaCF, base_destination.Encode(), meta_value); + batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode()); } else if (!s.ok()) { return s; } @@ -763,14 +763,14 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons EncodeFixed32(str, 1); SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t))); version = sets_meta_value.UpdateVersion(); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode()); + batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode()); SetsMemberKey sets_member_key(destination, version, member); BaseDataValue iter_value(Slice{}); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), iter_value.Encode()); + batch->Put(kSetsDataCF, sets_member_key.Encode(), iter_value.Encode()); } else { return s; } - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kSets, source.ToString(), 1); return s; } @@ -779,7 +779,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, std::default_random_engine engine; std::string meta_value; - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); ScopeRecordLock l(lock_mgr_, key); uint64_t start_us = pstd::NowMicros(); @@ -802,14 +802,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, auto iter = db_->NewIterator(default_read_options_, handles_[kSetsDataCF]); for (iter->Seek(sets_member_key.EncodeSeekKey()); iter->Valid() && cur_index < size; iter->Next(), cur_index++) { - batch.Delete(handles_[kSetsDataCF], iter->key()); + batch->Delete(kSetsDataCF, iter->key()); ParsedSetsMemberKey parsed_sets_member_key(iter->key()); members->push_back(parsed_sets_member_key.member().ToString()); } // parsed_sets_meta_value.ModifyCount(-cnt); // batch.Put(handles_[kSetsMetaCF], key, meta_value); - batch.Delete(handles_[kSetsMetaCF], base_meta_key.Encode()); + batch->Delete(kSetsMetaCF, base_meta_key.Encode()); delete iter; } else { @@ -839,7 +839,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, } if (sets_index.find(cur_index) != sets_index.end()) { del_count++; - batch.Delete(handles_[kSetsDataCF], iter->key()); + batch->Delete(kSetsDataCF, iter->key()); ParsedSetsMemberKey parsed_sets_member_key(iter->key()); members->push_back(parsed_sets_member_key.member().ToString()); } @@ -849,14 +849,14 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector* members, return Status::InvalidArgument("set size overflow"); } parsed_sets_meta_value.ModifyCount(static_cast(-cnt)); - batch.Put(handles_[kSetsMetaCF], base_meta_key.Encode(), meta_value); + batch->Put(kSetsMetaCF, base_meta_key.Encode(), meta_value); delete iter; } } } else { return s; } - return db_->Write(default_write_options_, &batch); + return batch->Commit(); } rocksdb::Status Redis::ResetSpopCount(const std::string& key) { return spop_counts_store_->Remove(key); } @@ -1043,7 +1043,7 @@ rocksdb::Status Redis::SUnionstore(const Slice& destination, const std::vector(members.size())); - batch.Put(handles_[kSetsMetaCF], destination, meta_value); + batch->Put(kSetsMetaCF, destination, meta_value); } else if (s.IsNotFound()) { char str[4]; EncodeFixed32(str, members.size()); SetsMetaValue sets_meta_value(Slice(str, sizeof(int32_t))); version = sets_meta_value.UpdateVersion(); - batch.Put(handles_[kSetsMetaCF], base_destination.Encode(), sets_meta_value.Encode()); + batch->Put(kSetsMetaCF, base_destination.Encode(), sets_meta_value.Encode()); } else { return s; } for (const auto& member : members) { SetsMemberKey sets_member_key(destination, version, member); BaseDataValue i_val(Slice{}); - batch.Put(handles_[kSetsDataCF], sets_member_key.Encode(), i_val.Encode()); + batch->Put(kSetsDataCF, sets_member_key.Encode(), i_val.Encode()); } *ret = static_cast(members.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kSets, destination.ToString(), statistic); value_to_dest = std::move(members); return s; diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index aac17b200..14d25c81a 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -595,13 +595,13 @@ Status Redis::MSet(const std::vector& kvs) { } MultiScopeRecordLock ml(lock_mgr_, keys); - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); for (const auto& kv : kvs) { BaseKey base_key(kv.key); StringsValue strings_value(kv.value); - batch.Put(base_key.Encode(), strings_value.Encode()); + batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode()); } - return db_->Write(default_write_options_, &batch); + return batch->Commit(); } Status Redis::MSetnx(const std::vector& kvs, int32_t* ret) { @@ -712,7 +712,9 @@ Status Redis::SetBit(const Slice& key, int64_t offset, int32_t on, int32_t* ret) } StringsValue strings_value(data_value); strings_value.SetEtime(timestamp); - return db_->Put(rocksdb::WriteOptions(), base_key.Encode(), strings_value.Encode()); + auto batch = Batch::CreateBatch(this); + batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode()); + return batch->Commit(); } else { return s; } @@ -730,7 +732,9 @@ Status Redis::Setex(const Slice& key, const Slice& value, uint64_t ttl) { BaseKey base_key(key); ScopeRecordLock l(lock_mgr_, key); - return db_->Put(default_write_options_, base_key.Encode(), strings_value.Encode()); + auto batch = Batch::CreateBatch(this); + batch->Put(kStringsCF, base_key.Encode(), strings_value.Encode()); + return batch->Commit(); } Status Redis::Setnx(const Slice& key, const Slice& value, int32_t* ret, const uint64_t ttl) { diff --git a/tests/consistency_test.go b/tests/consistency_test.go index 26721281b..7d83e033b 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -186,6 +186,153 @@ var _ = Describe("Consistency", Ordered, func() { } }) + It("SDiffstore & SInterstore & SMove & SPop & SUnionstore Consistency Test", func() { + const testKey1 = "SetsConsistencyTestKey1" + const testKey2 = "SetsConsistencyTestKey2" + testValues1 := []string{"sa", "sb", "sc", "sd"} + testValues1Less := []string{"sa", "sb", "sc"} + testValues2 := []string{"sa", "sb", "sc2", "sd2"} + testValues2More := []string{"sa", "sb", "sc2", "sd", "sd2"} + + { + sadd, err := leader.SAdd(ctx, testKey1, testValues1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues1)))) + + sadd, err = leader.SAdd(ctx, testKey2, testValues2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues2)))) + + flag, err := leader.SMove(ctx, testKey1, testKey2, "sd").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(flag).To(Equal(true)) + + readChecker(func(c *redis.Client) { + smembers, err := c.SMembers(ctx, testKey1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(smembers).To(Equal(testValues1Less)) + + smembers, err = c.SMembers(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(smembers).To(Equal(testValues2More)) + }) + } + + const testKey3 = "SetsConsistencyTestKey3" + testValues3 := []string{"sa", "sb", "sc", "sd"} + { + sadd, err := leader.SAdd(ctx, testKey3, testValues3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues3)))) + + spop, err := leader.SPop(ctx, testKey3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(spop).To(BeElementOf(testValues3)) + + spops, err := leader.SPopN(ctx, testKey3, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(spops[0]).To(BeElementOf(testValues3)) + Expect(spops[1]).To(BeElementOf(testValues3)) + + readChecker(func(c *redis.Client) { + smembers, err := c.SMembers(ctx, testKey3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(smembers)).To(Equal(int(1))) + }) + } + + const testKey4 = "SetsConsistencyTestKey4" + diff := []string{"sc", "sd"} + { + _, err := leader.Del(ctx, testKey1).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err := leader.SAdd(ctx, testKey1, testValues1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues1)))) + + _, err = leader.Del(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err = leader.SAdd(ctx, testKey2, testValues2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues2)))) + + _, err = leader.Del(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + + sdiff, err := leader.SDiffStore(ctx, testKey4, testKey1, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sdiff).To(Equal(int64(len(diff)))) + + readChecker(func(c *redis.Client) { + smembers, err := c.SMembers(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(smembers).To(Equal(diff)) + }) + } + + inter := []string{"sa", "sb"} + { + _, err := leader.Del(ctx, testKey1).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err := leader.SAdd(ctx, testKey1, testValues1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues1)))) + + _, err = leader.Del(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err = leader.SAdd(ctx, testKey2, testValues2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues2)))) + + _, err = leader.Del(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + + sinter, err := leader.SInterStore(ctx, testKey4, testKey1, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sinter).To(Equal(int64(len(inter)))) + + readChecker(func(c *redis.Client) { + smembers, err := c.SMembers(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(smembers).To(Equal(inter)) + }) + } + + union := []string{"sa", "sb", "sc", "sc2", "sd", "sd2"} + { + _, err := leader.Del(ctx, testKey1).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err := leader.SAdd(ctx, testKey1, testValues1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues1)))) + + _, err = leader.Del(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + + sadd, err = leader.SAdd(ctx, testKey2, testValues2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sadd).To(Equal(int64(len(testValues2)))) + + _, err = leader.Del(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + + sunion, err := leader.SUnionStore(ctx, testKey4, testKey1, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(sunion).To(Equal(int64(len(union)))) + + readChecker(func(c *redis.Client) { + smembers, err := c.SMembers(ctx, testKey4).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(smembers).To(Equal(union)) + }) + } + }) + It("LPush & LPop Consistency Test", func() { const testKey = "ListsConsistencyTestKey" testValues := []string{"la", "lb", "lc", "ld"} @@ -394,9 +541,32 @@ var _ = Describe("Consistency", Ordered, func() { } }) - It("Set Consistency Test", func() { + It("SetBit Consistency Test", func() { + const testKey = "StringsConsistencyTestKey" + { + // set write on leader + set, err := leader.SetBit(ctx, testKey, 1, 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(set).To(Equal(int64(0))) + + readChecker(func(c *redis.Client) { + get, err := c.GetBit(ctx, testKey, 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(int64(0))) + + get, err = c.GetBit(ctx, testKey, 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(int64(1))) + }) + } + }) + + It("Set & SetEx & MSet & MSetNX Consistency Test", func() { const testKey = "StringsConsistencyTestKey" const testValue = "StringsConsistencyTestKey" + const testValueNew = "StringsConsistencyTestKey-new" + const testKey2 = "StringsConsistencyTestKey2" + const testValue2 = "StringsConsistencyTestKey2" { // set write on leader set, err := leader.Set(ctx, testKey, testValue, 0).Result() @@ -410,6 +580,59 @@ var _ = Describe("Consistency", Ordered, func() { Expect(get).To(Equal(testValue)) }) } + { + // set write on leader + set, err := leader.MSet(ctx, testKey, testValue, testKey2, testValue2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(set).To(Equal("OK")) + + // read check + readChecker(func(c *redis.Client) { + get, err := c.Get(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(testValue)) + + get, err = c.Get(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(testValue2)) + }) + } + { + mSetNX, err := leader.MSetNX(ctx, testKey, testValueNew, testKey2, testValue2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(mSetNX).To(Equal(false)) + + del, err := leader.Del(ctx, testKey, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(del).To(Equal(int64(2))) + + mSetNX, err = leader.MSetNX(ctx, testKey, testValueNew, testKey2, testValue2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(mSetNX).To(Equal(true)) + + readChecker(func(c *redis.Client) { + get, err := c.Get(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(testValueNew)) + + get, err = c.Get(ctx, testKey2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(get).To(Equal(testValue2)) + }) + } + { + // set write on leader + set, err := leader.SetEx(ctx, testKey, testValue, 3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(set).To(Equal("OK")) + + // read check + time.Sleep(10 * time.Second) + readChecker(func(c *redis.Client) { + _, err := c.Get(ctx, testKey).Result() + Expect(err).To(Equal(redis.Nil)) + }) + } }) It("ThreeNodesClusterConstructionTest", func() {