From 20991c02a0f2b9a0cba497496c780bad2527f9f3 Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Tue, 2 Jan 2024 08:44:17 -0600 Subject: [PATCH] Occ iterator fix (#389) ## Describe your changes and provide context This change serves to improve the way we track the values of the keys we iterate over when running iterators. Previously, the iterateset would only track the keys that were iterated, and the behavior of the iterator was thought to not include keys that didn't have values present, OR that the readset would be appropriately updated when reading the value from the iterateset. (I'm not yet 100% sure that updating readset WITHIN the tracked iterator is fully necessary, since it may be the case that the readset modifications may have been sufficient to mitigate this issue, but the change is currently in the PR since this is the version of code running on the loadtest cluster for stability testing. However, in cases when an earlier transaction was writing to the range that would be iterated, it was possible that the stale value was read by the transaction handler, BUT the value that got into the readset was the newer one. I believe this has to do with the readset updating based on directly querying values from underlying stores, and overwriting the prior readset value that indicated that the transaction used a stale value. The fix I have made is that during tx execution, the cache memiterator now reads directly form MVKV instead of individually reading from underlying stores. The key difference here is that IF the key is already in the readset, it will serve that STALE value instead of reading into the underlying store where the value may have since mutated. As a result, the behavior we now expect is that one a key is read, ONLY that value that was read will be utilized for the duration of the transaction. This way, we won't potentially mutate the readset by overwriting the key entry with the later value only to have it incorrectly pass validation. Additionally, to more rigorously enforce this behavior, updating the readset now will only update the map IFF the key doesnt already exist in the readset. This should provide better guarantees around catching any stale reads that occur over the lifespan of the transacation execution. ## Testing performed to validate your change Running a lot of iterator heavy workloads on a loadtest cluster to verify that no nondeterminism remains in the iterator workflow --- store/multiversion/memiterator.go | 86 ++++++++++++++------------- store/multiversion/mergeiterator.go | 4 ++ store/multiversion/mvkv.go | 27 ++++----- store/multiversion/mvkv_test.go | 8 ++- store/multiversion/trackediterator.go | 7 ++- 5 files changed, 71 insertions(+), 61 deletions(-) diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 43e8e306b..32cb257b8 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -12,19 +12,13 @@ import ( // Implements Iterator. type memIterator struct { types.Iterator - - mvStore MultiVersionStore - writeset WriteSet - index int - abortChannel chan occtypes.Abort - ReadsetHandler + mvkv *VersionIndexedStore } func (store *VersionIndexedStore) newMemIterator( start, end []byte, items *dbm.MemDB, ascending bool, - readsetHandler ReadsetHandler, ) *memIterator { var iter types.Iterator var err error @@ -43,40 +37,25 @@ func (store *VersionIndexedStore) newMemIterator( } return &memIterator{ - Iterator: iter, - mvStore: store.multiVersionStore, - index: store.transactionIndex, - abortChannel: store.abortChannel, - writeset: store.GetWriteset(), - ReadsetHandler: readsetHandler, + Iterator: iter, + mvkv: store, } } -// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent func (mi *memIterator) Value() []byte { key := mi.Iterator.Key() + // TODO: verify that this is correct + return mi.mvkv.Get(key) +} - // try fetch from writeset - return if exists - if val, ok := mi.writeset[string(key)]; ok { - return val - } - - // get the value from the multiversion store - val := mi.mvStore.GetLatestBeforeIndex(mi.index, key) - - // if we have an estiamte, write to abort channel - if val.IsEstimate() { - mi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) - } +type validationIterator struct { + types.Iterator - // need to update readset - // if we have a deleted value, return nil - if val.IsDeleted() { - defer mi.ReadsetHandler.UpdateReadSet(key, nil) - return nil - } - defer mi.ReadsetHandler.UpdateReadSet(key, val.Value()) - return val.Value() + mvStore MultiVersionStore + writeset WriteSet + index int + abortChannel chan occtypes.Abort } func (store *Store) newMVSValidationIterator( @@ -86,7 +65,7 @@ func (store *Store) newMVSValidationIterator( ascending bool, writeset WriteSet, abortChannel chan occtypes.Abort, -) *memIterator { +) *validationIterator { var iter types.Iterator var err error @@ -103,12 +82,35 @@ func (store *Store) newMVSValidationIterator( panic(err) } - return &memIterator{ - Iterator: iter, - mvStore: store, - index: index, - abortChannel: abortChannel, - ReadsetHandler: NoOpHandler{}, - writeset: writeset, + return &validationIterator{ + Iterator: iter, + mvStore: store, + index: index, + abortChannel: abortChannel, + writeset: writeset, + } +} + +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +func (vi *validationIterator) Value() []byte { + key := vi.Iterator.Key() + + // try fetch from writeset - return if exists + if val, ok := vi.writeset[string(key)]; ok { + return val } + + // get the value from the multiversion store + val := vi.mvStore.GetLatestBeforeIndex(vi.index, key) + + // if we have an estimate, write to abort channel + if val.IsEstimate() { + vi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) + } + + // if we have a deleted value, return nil + if val.IsDeleted() { + return nil + } + return val.Value() } diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go index 3b5cee741..1e398cf94 100644 --- a/store/multiversion/mergeiterator.go +++ b/store/multiversion/mergeiterator.go @@ -138,6 +138,8 @@ func (iter *mvsMergeIterator) Value() []byte { // If cache is invalid, get the parent value. if !iter.cache.Valid() { value := iter.parent.Value() + // add values read from parent to readset + iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value) return value } @@ -148,6 +150,8 @@ func (iter *mvsMergeIterator) Value() []byte { switch cmp { case -1: // parent < cache value := iter.parent.Value() + // add values read from parent to readset + iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value) return value case 0, 1: // parent >= cache value := iter.cache.Value() diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 1e8437ad7..13511281f 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -78,8 +78,6 @@ type VersionIndexedStore struct { iterateset Iterateset // TODO: need to add iterateset here as well - // dirty keys that haven't been sorted yet for iteration - dirtySet map[string]struct{} // used for iterators - populated at the time of iterator instantiation // TODO: when we want to perform iteration, we need to move all the dirty keys (writeset and readset) into the sortedTree and then combine with the iterators for the underlying stores sortedStore *dbm.MemDB // always ascending sorted @@ -102,7 +100,6 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion readset: make(map[string][]byte), writeset: make(map[string][]byte), iterateset: []iterationTracker{}, - dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, multiVersionStore: multiVersionStore, @@ -231,7 +228,7 @@ func (store *VersionIndexedStore) Delete(key []byte) { // defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete") types.AssertValidKey(key) - store.setValue(key, nil, true, true) + store.setValue(key, nil) } // Has implements types.KVStore. @@ -248,7 +245,7 @@ func (store *VersionIndexedStore) Set(key []byte, value []byte) { // defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set") types.AssertValidKey(key) - store.setValue(key, value, false, true) + store.setValue(key, value) } // Iterator implements types.KVStore. @@ -278,11 +275,15 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b for key := range store.writeset { memDB.Set([]byte(key), []byte{}) } + // also add readset elements such that they fetch from readset instead of parent + for key := range store.readset { + memDB.Set([]byte(key), []byte{}) + } var parent, memIterator types.Iterator // make a memIterator - memIterator = store.newMemIterator(start, end, memDB, ascending, store) + memIterator = store.newMemIterator(start, end, memDB, ascending) if ascending { parent = store.parent.Iterator(start, end) @@ -293,7 +294,7 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store) iterationTracker := NewIterationTracker(start, end, ascending, store.writeset) - trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store) + trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store, store) // mergeIterator return trackedIterator @@ -326,14 +327,11 @@ func (v *VersionIndexedStore) GetWorkingHash() ([]byte, error) { } // Only entrypoint to mutate writeset -func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirty bool) { +func (store *VersionIndexedStore) setValue(key, value []byte) { types.AssertValidKey(key) keyStr := string(key) store.writeset[keyStr] = value - if dirty { - store.dirtySet[keyStr] = struct{}{} - } } func (store *VersionIndexedStore) WriteToMultiVersionStore() { @@ -358,9 +356,10 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { // add to readset keyStr := string(key) - store.readset[keyStr] = value - // add to dirty set - store.dirtySet[keyStr] = struct{}{} + // TODO: maybe only add if not already existing? + if _, ok := store.readset[keyStr]; !ok { + store.readset[keyStr] = value + } } // Write implements types.CacheWrap so this store can exist on the cache multi store diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index 44304fd50..008a7fa61 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -375,15 +375,17 @@ func TestIterator(t *testing.T) { mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ "key2": []byte("value1_b"), }) - + // need to reset readset + abortC2 := make(chan scheduler.Abort) + visNew := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 3, abortC2) go func() { // new iter - iter4 := vis.Iterator([]byte("000"), []byte("key5")) + iter4 := visNew.Iterator([]byte("000"), []byte("key5")) defer iter4.Close() for ; iter4.Valid(); iter4.Next() { } }() - abort := <-abortC // read the abort from the channel + abort := <-abortC2 // read the abort from the channel require.Equal(t, 1, abort.DependentTxIdx) } diff --git a/store/multiversion/trackediterator.go b/store/multiversion/trackediterator.go index 361d848cb..24a1d7a16 100644 --- a/store/multiversion/trackediterator.go +++ b/store/multiversion/trackediterator.go @@ -7,16 +7,18 @@ type trackedIterator struct { types.Iterator iterateset iterationTracker + ReadsetHandler IterateSetHandler } // TODO: test -func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { +func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler, readSetHandler ReadsetHandler) *trackedIterator { return &trackedIterator{ Iterator: iter, iterateset: iterationTracker, IterateSetHandler: iterateSetHandler, + ReadsetHandler: readSetHandler, } } @@ -43,9 +45,10 @@ func (ti *trackedIterator) Key() []byte { // Value calls the iterator.Key() and adds the key to the iterateset, then returns the value from the iterator func (ti *trackedIterator) Value() []byte { key := ti.Iterator.Key() + val := ti.Iterator.Value() // add key to the tracker ti.iterateset.AddKey(key) - return ti.Iterator.Value() + return val } func (ti *trackedIterator) Next() {