diff --git a/store/multiversion/memiterator.go b/store/multiversion/memiterator.go index 43e8e306b..32cb257b8 100644 --- a/store/multiversion/memiterator.go +++ b/store/multiversion/memiterator.go @@ -12,19 +12,13 @@ import ( // Implements Iterator. type memIterator struct { types.Iterator - - mvStore MultiVersionStore - writeset WriteSet - index int - abortChannel chan occtypes.Abort - ReadsetHandler + mvkv *VersionIndexedStore } func (store *VersionIndexedStore) newMemIterator( start, end []byte, items *dbm.MemDB, ascending bool, - readsetHandler ReadsetHandler, ) *memIterator { var iter types.Iterator var err error @@ -43,40 +37,25 @@ func (store *VersionIndexedStore) newMemIterator( } return &memIterator{ - Iterator: iter, - mvStore: store.multiVersionStore, - index: store.transactionIndex, - abortChannel: store.abortChannel, - writeset: store.GetWriteset(), - ReadsetHandler: readsetHandler, + Iterator: iter, + mvkv: store, } } -// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent func (mi *memIterator) Value() []byte { key := mi.Iterator.Key() + // TODO: verify that this is correct + return mi.mvkv.Get(key) +} - // try fetch from writeset - return if exists - if val, ok := mi.writeset[string(key)]; ok { - return val - } - - // get the value from the multiversion store - val := mi.mvStore.GetLatestBeforeIndex(mi.index, key) - - // if we have an estiamte, write to abort channel - if val.IsEstimate() { - mi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) - } +type validationIterator struct { + types.Iterator - // need to update readset - // if we have a deleted value, return nil - if val.IsDeleted() { - defer mi.ReadsetHandler.UpdateReadSet(key, nil) - return nil - } - defer mi.ReadsetHandler.UpdateReadSet(key, val.Value()) - return val.Value() + mvStore MultiVersionStore + writeset WriteSet + index int + abortChannel chan occtypes.Abort } func (store *Store) newMVSValidationIterator( @@ -86,7 +65,7 @@ func (store *Store) newMVSValidationIterator( ascending bool, writeset WriteSet, abortChannel chan occtypes.Abort, -) *memIterator { +) *validationIterator { var iter types.Iterator var err error @@ -103,12 +82,35 @@ func (store *Store) newMVSValidationIterator( panic(err) } - return &memIterator{ - Iterator: iter, - mvStore: store, - index: index, - abortChannel: abortChannel, - ReadsetHandler: NoOpHandler{}, - writeset: writeset, + return &validationIterator{ + Iterator: iter, + mvStore: store, + index: index, + abortChannel: abortChannel, + writeset: writeset, + } +} + +// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator +func (vi *validationIterator) Value() []byte { + key := vi.Iterator.Key() + + // try fetch from writeset - return if exists + if val, ok := vi.writeset[string(key)]; ok { + return val } + + // get the value from the multiversion store + val := vi.mvStore.GetLatestBeforeIndex(vi.index, key) + + // if we have an estimate, write to abort channel + if val.IsEstimate() { + vi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) + } + + // if we have a deleted value, return nil + if val.IsDeleted() { + return nil + } + return val.Value() } diff --git a/store/multiversion/mergeiterator.go b/store/multiversion/mergeiterator.go index 3b5cee741..1e398cf94 100644 --- a/store/multiversion/mergeiterator.go +++ b/store/multiversion/mergeiterator.go @@ -138,6 +138,8 @@ func (iter *mvsMergeIterator) Value() []byte { // If cache is invalid, get the parent value. if !iter.cache.Valid() { value := iter.parent.Value() + // add values read from parent to readset + iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value) return value } @@ -148,6 +150,8 @@ func (iter *mvsMergeIterator) Value() []byte { switch cmp { case -1: // parent < cache value := iter.parent.Value() + // add values read from parent to readset + iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value) return value case 0, 1: // parent >= cache value := iter.cache.Value() diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 1e8437ad7..13511281f 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -78,8 +78,6 @@ type VersionIndexedStore struct { iterateset Iterateset // TODO: need to add iterateset here as well - // dirty keys that haven't been sorted yet for iteration - dirtySet map[string]struct{} // used for iterators - populated at the time of iterator instantiation // TODO: when we want to perform iteration, we need to move all the dirty keys (writeset and readset) into the sortedTree and then combine with the iterators for the underlying stores sortedStore *dbm.MemDB // always ascending sorted @@ -102,7 +100,6 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion readset: make(map[string][]byte), writeset: make(map[string][]byte), iterateset: []iterationTracker{}, - dirtySet: make(map[string]struct{}), sortedStore: dbm.NewMemDB(), parent: parent, multiVersionStore: multiVersionStore, @@ -231,7 +228,7 @@ func (store *VersionIndexedStore) Delete(key []byte) { // defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete") types.AssertValidKey(key) - store.setValue(key, nil, true, true) + store.setValue(key, nil) } // Has implements types.KVStore. @@ -248,7 +245,7 @@ func (store *VersionIndexedStore) Set(key []byte, value []byte) { // defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set") types.AssertValidKey(key) - store.setValue(key, value, false, true) + store.setValue(key, value) } // Iterator implements types.KVStore. @@ -278,11 +275,15 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b for key := range store.writeset { memDB.Set([]byte(key), []byte{}) } + // also add readset elements such that they fetch from readset instead of parent + for key := range store.readset { + memDB.Set([]byte(key), []byte{}) + } var parent, memIterator types.Iterator // make a memIterator - memIterator = store.newMemIterator(start, end, memDB, ascending, store) + memIterator = store.newMemIterator(start, end, memDB, ascending) if ascending { parent = store.parent.Iterator(start, end) @@ -293,7 +294,7 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store) iterationTracker := NewIterationTracker(start, end, ascending, store.writeset) - trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store) + trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store, store) // mergeIterator return trackedIterator @@ -326,14 +327,11 @@ func (v *VersionIndexedStore) GetWorkingHash() ([]byte, error) { } // Only entrypoint to mutate writeset -func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirty bool) { +func (store *VersionIndexedStore) setValue(key, value []byte) { types.AssertValidKey(key) keyStr := string(key) store.writeset[keyStr] = value - if dirty { - store.dirtySet[keyStr] = struct{}{} - } } func (store *VersionIndexedStore) WriteToMultiVersionStore() { @@ -358,9 +356,10 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { // add to readset keyStr := string(key) - store.readset[keyStr] = value - // add to dirty set - store.dirtySet[keyStr] = struct{}{} + // TODO: maybe only add if not already existing? + if _, ok := store.readset[keyStr]; !ok { + store.readset[keyStr] = value + } } // Write implements types.CacheWrap so this store can exist on the cache multi store diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go index 44304fd50..008a7fa61 100644 --- a/store/multiversion/mvkv_test.go +++ b/store/multiversion/mvkv_test.go @@ -375,15 +375,17 @@ func TestIterator(t *testing.T) { mvs.SetEstimatedWriteset(1, 1, map[string][]byte{ "key2": []byte("value1_b"), }) - + // need to reset readset + abortC2 := make(chan scheduler.Abort) + visNew := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 3, abortC2) go func() { // new iter - iter4 := vis.Iterator([]byte("000"), []byte("key5")) + iter4 := visNew.Iterator([]byte("000"), []byte("key5")) defer iter4.Close() for ; iter4.Valid(); iter4.Next() { } }() - abort := <-abortC // read the abort from the channel + abort := <-abortC2 // read the abort from the channel require.Equal(t, 1, abort.DependentTxIdx) } diff --git a/store/multiversion/trackediterator.go b/store/multiversion/trackediterator.go index 361d848cb..24a1d7a16 100644 --- a/store/multiversion/trackediterator.go +++ b/store/multiversion/trackediterator.go @@ -7,16 +7,18 @@ type trackedIterator struct { types.Iterator iterateset iterationTracker + ReadsetHandler IterateSetHandler } // TODO: test -func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator { +func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler, readSetHandler ReadsetHandler) *trackedIterator { return &trackedIterator{ Iterator: iter, iterateset: iterationTracker, IterateSetHandler: iterateSetHandler, + ReadsetHandler: readSetHandler, } } @@ -43,9 +45,10 @@ func (ti *trackedIterator) Key() []byte { // Value calls the iterator.Key() and adds the key to the iterateset, then returns the value from the iterator func (ti *trackedIterator) Value() []byte { key := ti.Iterator.Key() + val := ti.Iterator.Value() // add key to the tracker ti.iterateset.AddKey(key) - return ti.Iterator.Value() + return val } func (ti *trackedIterator) Next() {