Skip to content

Commit b04b4ba

Browse files
authored
Merge pull request #74 from sei-protocol/ReadKvFilesUpdate
Add upper limit on file parsing goroutines
2 parents 22fde3c + 353d4ea commit b04b4ba

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

tools/dbbackend/benchmark.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func writeToDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePair, con
8686
// Given an input dir containing all the raw kv data, it writes to the db one version after another
8787
func BenchmarkDBWrite(db types.StateStore, inputKVDir string, numVersions int, concurrency int, batchSize int) {
8888
startLoad := time.Now()
89-
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
89+
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
9090
if err != nil {
9191
panic(err)
9292
}
@@ -180,7 +180,7 @@ func readFromDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePair, nu
180180
// BenchmarkDBRead measures random read performance of the db
181181
// Given an input dir containing all the raw kv data, it generates random read load and measures performance.
182182
func BenchmarkDBRead(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64) {
183-
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
183+
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
184184
if err != nil {
185185
panic(err)
186186
}
@@ -279,7 +279,7 @@ func forwardIterateDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePa
279279
// BenchmarkDBForwardIteration measures forward iteration performance of the db
280280
// Given an input dir containing all the raw kv data, it selects a random key, forward iterates and measures performance.
281281
func BenchmarkDBForwardIteration(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64, iterationSteps int) {
282-
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
282+
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
283283
if err != nil {
284284
panic(err)
285285
}
@@ -371,7 +371,7 @@ func reverseIterateDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePa
371371
// BenchmarkDBReverseIteration measures reverse iteration performance of the db
372372
// Given an input dir containing all the raw kv data, it selects a random key, reverse iterates and measures performance.
373373
func BenchmarkDBReverseIteration(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64, iterationSteps int) {
374-
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
374+
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
375375
if err != nil {
376376
panic(err)
377377
}

tools/utils/utils.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -220,33 +220,45 @@ func ListAllFiles(dir string) ([]string, error) {
220220
return fileNames, nil
221221
}
222222

223-
func LoadAndShuffleKV(inputDir string) ([]KeyValuePair, error) {
223+
func LoadAndShuffleKV(inputDir string, concurrency int) ([]KeyValuePair, error) {
224224
var allKVs []KeyValuePair
225225
mu := &sync.Mutex{}
226-
wg := &sync.WaitGroup{}
227226

228227
allFiles, err := ListAllFiles(inputDir)
229228
if err != nil {
230229
log.Fatalf("Failed to list all files: %v", err)
231230
}
232231

233-
for _, file := range allFiles {
232+
filesChan := make(chan string)
233+
wg := &sync.WaitGroup{}
234+
235+
// Start worker goroutines
236+
for i := 0; i < concurrency; i++ {
234237
wg.Add(1)
235-
go func(id string, selectedFile string) {
238+
go func() {
236239
defer wg.Done()
237-
238-
kvEntries, err := ReadKVEntriesFromFile(filepath.Join(id, selectedFile))
239-
if err != nil {
240-
panic(err)
240+
for selectedFile := range filesChan {
241+
kvEntries, err := ReadKVEntriesFromFile(filepath.Join(inputDir, selectedFile))
242+
if err != nil {
243+
panic(err)
244+
}
245+
246+
// Safely append the kvEntries to allKVs
247+
mu.Lock()
248+
allKVs = append(allKVs, kvEntries...)
249+
fmt.Printf("Done processing file %+v\n", filepath.Join(inputDir, selectedFile))
250+
mu.Unlock()
241251
}
252+
}()
253+
}
242254

243-
// Safely append the kvEntries to allKVs
244-
mu.Lock()
245-
allKVs = append(allKVs, kvEntries...)
246-
fmt.Printf("Done processing file %+v\n", filepath.Join(id, selectedFile))
247-
mu.Unlock()
248-
}(inputDir, file)
255+
// Send file names to filesChan
256+
for _, file := range allFiles {
257+
filesChan <- file
249258
}
259+
close(filesChan)
260+
261+
// Wait for all workers to finish
250262
wg.Wait()
251263

252264
rand.Shuffle(len(allKVs), func(i, j int) {

0 commit comments

Comments
 (0)