Skip to content

Commit

Permalink
Revert changes to Import
Browse files Browse the repository at this point in the history
  • Loading branch information
Kbhat1 committed Jul 15, 2024
1 parent 8cacf9a commit a7ea824
Showing 1 changed file with 41 additions and 54 deletions.
95 changes: 41 additions & 54 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,63 +496,50 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [
// Import loads the initial version of the state in parallel with numWorkers goroutines
// TODO: Potentially add retries instead of panics
func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error {
// Re route to RawImport
rawCh := make(chan types.RawSnapshotNode, db.config.ImportNumWorkers)
go func() {
defer close(rawCh)
var wg sync.WaitGroup

worker := func() {
defer wg.Done()
batch, err := NewBatch(db.storage, version)
if err != nil {
panic(err)
}

var counter int
for entry := range ch {
rawCh <- types.GetRawSnapshotNode(entry, version)
err := batch.Set(entry.StoreKey, entry.Key, entry.Value)
if err != nil {
panic(err)
}

counter++
if counter%ImportCommitBatchSize == 0 {
if err := batch.Write(); err != nil {
panic(err)
}

batch, err = NewBatch(db.storage, version)
if err != nil {
panic(err)
}
}
}
}()

return db.RawImport(rawCh)

// TODO: Revert. This is just for testing RawImport End to End temporarily

// var wg sync.WaitGroup

// worker := func() {
// defer wg.Done()
// batch, err := NewBatch(db.storage, version)
// if err != nil {
// panic(err)
// }

// var counter int
// for entry := range ch {
// err := batch.Set(entry.StoreKey, entry.Key, entry.Value)
// if err != nil {
// panic(err)
// }

// counter++
// if counter%ImportCommitBatchSize == 0 {
// if err := batch.Write(); err != nil {
// panic(err)
// }

// batch, err = NewBatch(db.storage, version)
// if err != nil {
// panic(err)
// }
// }
// }

// if batch.Size() > 0 {
// if err := batch.Write(); err != nil {
// panic(err)
// }
// }
// }

// wg.Add(db.config.ImportNumWorkers)
// for i := 0; i < db.config.ImportNumWorkers; i++ {
// go worker()
// }

// wg.Wait()

// return nil
if batch.Size() > 0 {
if err := batch.Write(); err != nil {
panic(err)
}
}
}

wg.Add(db.config.ImportNumWorkers)
for i := 0; i < db.config.ImportNumWorkers; i++ {
go worker()
}

wg.Wait()

return nil
}

func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {
Expand Down

0 comments on commit a7ea824

Please sign in to comment.