Skip to content

Commit

Permalink
Add concurrency for restores
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 8, 2023
1 parent 78ed95f commit a436d76
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions cmd/literiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,21 @@ func main() {
&cli.StringFlag{
Name: "cloned-bucket-root",
Usage: "local directory that contains a clone of the s3 bucket",
EnvVars: []string{"LITERIVER_CLONED_BUCKET_ROOT"},
EnvVars: []string{"LITERIVER_RESTORE_CLONED_BUCKET_ROOT"},
Required: true,
},
&cli.StringFlag{
Name: "target-directory",
Usage: "local directory to restore the databases to",
EnvVars: []string{"LITERIVER_TARGET_DIRECTORY"},
EnvVars: []string{"LITERIVER_RESTORE_TARGET_DIRECTORY"},
Required: true,
},
&cli.IntFlag{
Name: "concurrency",
Usage: "number of concurrent restores to run.",
Value: 10,
EnvVars: []string{"LITERIVER_RESTORE_CONCURRENCY"},
},
},
},
{
Expand Down Expand Up @@ -116,6 +122,12 @@ func Restore(cctx *cli.Context) (err error) {
ctx := cctx.Context
start := time.Now()

logLvl := new(slog.LevelVar)
logLvl.UnmarshalText([]byte(cctx.String("log-level")))
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: logLvl,
})))

// Walk all databases in the directory
// For each database, register a new DB using the path as a local replica target

Expand All @@ -131,7 +143,7 @@ func Restore(cctx *cli.Context) (err error) {
return fmt.Errorf("failed to resolve directory path (%s): %w", targetDir, err)
}

slog.Info("starting restore", "cloned_bucket_root", clonedBucketRoot, "target_directory", targetDir)
slog.Warn("starting restore", "cloned_bucket_root", clonedBucketRoot, "target_directory", targetDir, "concurrency", cctx.Int("concurrency"))

// Check if clonedBucketRoot exists
if _, err := os.Stat(clonedBucketRoot); os.IsNotExist(err) {
Expand All @@ -140,7 +152,7 @@ func Restore(cctx *cli.Context) (err error) {

// Create targetDir if it doesn't exist
if _, err := os.Stat(targetDir); os.IsNotExist(err) {
slog.Info("creating target directory", "target_directory", targetDir)
slog.Warn("creating target directory", "target_directory", targetDir)
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
Expand All @@ -164,7 +176,7 @@ func Restore(cctx *cli.Context) (err error) {
return fmt.Errorf("error walking the path %q: %v", clonedBucketRoot, err)
}

slog.Info("found databases", "databases", dbPaths)
slog.Warn("found databases", "databases", dbPaths)

replicas := []replicaOpt{}

Expand Down Expand Up @@ -195,21 +207,41 @@ func Restore(cctx *cli.Context) (err error) {
})
}

replicaRestoreErrors := []error{}
taskCh := make(chan replicaOpt, len(replicas))
defer close(taskCh)
errorCh := make(chan error, len(replicas))

// Restore all databases
for i, item := range replicas {
log := slog.With("dest_path", item.opt.OutputPath, "generation", item.opt.Generation, "source_path", item.source)
log.Info("restoring replica")
if err := item.replica.Restore(ctx, *item.opt); err != nil {
replicaRestoreErrors = append(replicaRestoreErrors, fmt.Errorf("failed to restore replica %s: %w", item.opt.OutputPath, err))
log.Info("failed to restore replica", "error", err)
continue
numRoutines := cctx.Int("concurrency")
for i := 0; i < numRoutines; i++ {
go func() {
for task := range taskCh {
log := slog.With("dest_path", task.opt.OutputPath, "generation", task.opt.Generation, "source_path", task.source)
log.Warn("restoring replica")
if err := task.replica.Restore(ctx, *task.opt); err != nil {
errorCh <- fmt.Errorf("failed to restore replica %s: %w", task.opt.OutputPath, err)
log.Error("failed to restore replica", "error", err)
continue
}
log.Warn("restored replica")
errorCh <- nil
}
}()
}

// Send tasks to the worker goroutines
for _, item := range replicas {
taskCh <- item
}

replicaRestoreErrors := []error{}
for range replicas {
err := <-errorCh
if err != nil {
replicaRestoreErrors = append(replicaRestoreErrors, err)
}
log.Info("restored replica", "replicas_restored", i+1, "replicas_total", len(replicas))
}

slog.Info("restore complete",
slog.Warn("restore complete",
"source_dbs_discovered", len(dbPaths),
"successfully_restored_replicas", len(replicas)-len(replicaRestoreErrors),
"unsuccessfully_initialized_replicas", len(replicaInitErrors),
Expand Down

0 comments on commit a436d76

Please sign in to comment.