Skip to content

Commit

Permalink
resumability test
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh committed Jan 23, 2025
1 parent 1ec7e7c commit a5b0527
Showing 1 changed file with 57 additions and 0 deletions.
57 changes: 57 additions & 0 deletions yb-voyager/cmd/importDataFileTaskImporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,60 @@ func TestBasicTaskImport(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(2), rowCount)
}

func TestTaskImportResumable(t *testing.T) {
ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024)
testutils.FatalIfError(t, err)

if ldataDir != "" {
defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir))
}
if lexportDir != "" {
defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir))
}
setupYugabyteTestDb(t)

// file import
fileContents := `id,val
1, "hello"
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
testutils.FatalIfError(t, err)

progressReporter := NewImportDataProgressReporter(true)
workerPool := pool.New().WithMaxGoroutines(2)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
testutils.FatalIfError(t, err)
// for !taskImporter.AllBatchesSubmitted() {
// err := taskImporter.SubmitNextBatch()
// assert.NoError(t, err)
// }

// submit 1 batch
err = taskImporter.SubmitNextBatch()
assert.NoError(t, err)

// simulate restart
workerPool.Wait()
// check that the first batch was imported
var rowCount int64
err = tdb.QueryRow("SELECT count(*) FROM test_table").Scan(&rowCount)
assert.NoError(t, err)
assert.Equal(t, int64(2), rowCount)

progressReporter = NewImportDataProgressReporter(true)
workerPool = pool.New().WithMaxGoroutines(2)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter)

// submit second batch, not first batch again as it was already imported
err = taskImporter.SubmitNextBatch()
assert.NoError(t, err)

assert.Equal(t, true, taskImporter.AllBatchesSubmitted())
workerPool.Wait()
err = tdb.QueryRow("SELECT count(*) FROM test_table").Scan(&rowCount)
assert.NoError(t, err)
assert.Equal(t, int64(4), rowCount)
}

0 comments on commit a5b0527

Please sign in to comment.