diff --git a/cmd/literiver/main.go b/cmd/literiver/main.go index 679a2b47b..7e88944f0 100644 --- a/cmd/literiver/main.go +++ b/cmd/literiver/main.go @@ -46,15 +46,21 @@ func main() { &cli.StringFlag{ Name: "cloned-bucket-root", Usage: "local directory that contains a clone of the s3 bucket", - EnvVars: []string{"LITERIVER_CLONED_BUCKET_ROOT"}, + EnvVars: []string{"LITERIVER_RESTORE_CLONED_BUCKET_ROOT"}, Required: true, }, &cli.StringFlag{ Name: "target-directory", Usage: "local directory to restore the databases to", - EnvVars: []string{"LITERIVER_TARGET_DIRECTORY"}, + EnvVars: []string{"LITERIVER_RESTORE_TARGET_DIRECTORY"}, Required: true, }, + &cli.IntFlag{ + Name: "concurrency", + Usage: "number of concurrent restores to run.", + Value: 10, + EnvVars: []string{"LITERIVER_RESTORE_CONCURRENCY"}, + }, }, }, { @@ -116,6 +122,12 @@ func Restore(cctx *cli.Context) (err error) { ctx := cctx.Context start := time.Now() + logLvl := new(slog.LevelVar) + logLvl.UnmarshalText([]byte(cctx.String("log-level"))) + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + Level: logLvl, + }))) + // Walk all databases in the directory // For each database, register a new DB using the path as a local replica target @@ -131,7 +143,7 @@ func Restore(cctx *cli.Context) (err error) { return fmt.Errorf("failed to resolve directory path (%s): %w", targetDir, err) } - slog.Info("starting restore", "cloned_bucket_root", clonedBucketRoot, "target_directory", targetDir) + slog.Warn("starting restore", "cloned_bucket_root", clonedBucketRoot, "target_directory", targetDir, "concurrency", cctx.Int("concurrency")) // Check if clonedBucketRoot exists if _, err := os.Stat(clonedBucketRoot); os.IsNotExist(err) { @@ -140,7 +152,7 @@ func Restore(cctx *cli.Context) (err error) { // Create targetDir if it doesn't exist if _, err := os.Stat(targetDir); os.IsNotExist(err) { - slog.Info("creating target directory", "target_directory", targetDir) + slog.Warn("creating target directory", "target_directory", targetDir) if err := os.MkdirAll(targetDir, 0755); err != nil { return fmt.Errorf("failed to create target directory: %w", err) } @@ -164,7 +176,7 @@ func Restore(cctx *cli.Context) (err error) { return fmt.Errorf("error walking the path %q: %v", clonedBucketRoot, err) } - slog.Info("found databases", "databases", dbPaths) + slog.Warn("found databases", "databases", dbPaths) replicas := []replicaOpt{} @@ -195,21 +207,41 @@ func Restore(cctx *cli.Context) (err error) { }) } - replicaRestoreErrors := []error{} + taskCh := make(chan replicaOpt, len(replicas)) + defer close(taskCh) + errorCh := make(chan error, len(replicas)) - // Restore all databases - for i, item := range replicas { - log := slog.With("dest_path", item.opt.OutputPath, "generation", item.opt.Generation, "source_path", item.source) - log.Info("restoring replica") - if err := item.replica.Restore(ctx, *item.opt); err != nil { - replicaRestoreErrors = append(replicaRestoreErrors, fmt.Errorf("failed to restore replica %s: %w", item.opt.OutputPath, err)) - log.Info("failed to restore replica", "error", err) - continue + numRoutines := cctx.Int("concurrency") + for i := 0; i < numRoutines; i++ { + go func() { + for task := range taskCh { + log := slog.With("dest_path", task.opt.OutputPath, "generation", task.opt.Generation, "source_path", task.source) + log.Warn("restoring replica") + if err := task.replica.Restore(ctx, *task.opt); err != nil { + errorCh <- fmt.Errorf("failed to restore replica %s: %w", task.opt.OutputPath, err) + log.Error("failed to restore replica", "error", err) + continue + } + log.Warn("restored replica") + errorCh <- nil + } + }() + } + + // Send tasks to the worker goroutines + for _, item := range replicas { + taskCh <- item + } + + replicaRestoreErrors := []error{} + for range replicas { + err := <-errorCh + if err != nil { + replicaRestoreErrors = append(replicaRestoreErrors, err) } - log.Info("restored replica", "replicas_restored", i+1, "replicas_total", len(replicas)) } - slog.Info("restore complete", + slog.Warn("restore complete", "source_dbs_discovered", len(dbPaths), "successfully_restored_replicas", len(replicas)-len(replicaRestoreErrors), "unsuccessfully_initialized_replicas", len(replicaInitErrors),