From a7ea824bb9484a64902985c2f4c8cf365f4b43fe Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 15 Jul 2024 09:10:52 -0400 Subject: [PATCH] Revert changes to Import --- ss/pebbledb/db.go | 95 ++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 156cc21..6f6ffe0 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -496,63 +496,50 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ // Import loads the initial version of the state in parallel with numWorkers goroutines // TODO: Potentially add retries instead of panics func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error { - // Re route to RawImport - rawCh := make(chan types.RawSnapshotNode, db.config.ImportNumWorkers) - go func() { - defer close(rawCh) + var wg sync.WaitGroup + + worker := func() { + defer wg.Done() + batch, err := NewBatch(db.storage, version) + if err != nil { + panic(err) + } + + var counter int for entry := range ch { - rawCh <- types.GetRawSnapshotNode(entry, version) + err := batch.Set(entry.StoreKey, entry.Key, entry.Value) + if err != nil { + panic(err) + } + + counter++ + if counter%ImportCommitBatchSize == 0 { + if err := batch.Write(); err != nil { + panic(err) + } + + batch, err = NewBatch(db.storage, version) + if err != nil { + panic(err) + } + } } - }() - return db.RawImport(rawCh) - - // TODO: Revert. This is just for testing RawImport End to End temporarily - - // var wg sync.WaitGroup - - // worker := func() { - // defer wg.Done() - // batch, err := NewBatch(db.storage, version) - // if err != nil { - // panic(err) - // } - - // var counter int - // for entry := range ch { - // err := batch.Set(entry.StoreKey, entry.Key, entry.Value) - // if err != nil { - // panic(err) - // } - - // counter++ - // if counter%ImportCommitBatchSize == 0 { - // if err := batch.Write(); err != nil { - // panic(err) - // } - - // batch, err = NewBatch(db.storage, version) - // if err != nil { - // panic(err) - // } - // } - // } - - // if batch.Size() > 0 { - // if err := batch.Write(); err != nil { - // panic(err) - // } - // } - // } - - // wg.Add(db.config.ImportNumWorkers) - // for i := 0; i < db.config.ImportNumWorkers; i++ { - // go worker() - // } - - // wg.Wait() - - // return nil + if batch.Size() > 0 { + if err := batch.Write(); err != nil { + panic(err) + } + } + } + + wg.Add(db.config.ImportNumWorkers) + for i := 0; i < db.config.ImportNumWorkers; i++ { + go worker() + } + + wg.Wait() + + return nil } func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {