Skip to content

Commit

Permalink
Add restore function (works after rcloning a bucket)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 8, 2023
1 parent eac3b05 commit 78ed95f
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ sonar_cursor.json
out/
state.json
netsync-out/

dbs/
dbs_bak/
213 changes: 180 additions & 33 deletions cmd/literiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/benbjohnson/litestream"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
Expand All @@ -22,36 +24,6 @@ func main() {
Name: "literiver",
Usage: "Replicate SQLite databases in a directory to S3",
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "dir",
Usage: "Directories to monitor (can be specified multiple times)",
EnvVars: []string{"LITERIVER_DIR"},
Required: true,
},
&cli.StringFlag{
Name: "replica-root",
Usage: "S3 Bucket URL for Replication (https://{s3_url}.com/{bucket_name})",
EnvVars: []string{"LITERIVER_REPLICA_ROOT"},
Required: true,
},
&cli.DurationFlag{
Name: "db-ttl",
Usage: "Time to live for a database before it is closed",
EnvVars: []string{"LITERIVER_DB_TTL"},
Value: 2 * time.Minute,
},
&cli.DurationFlag{
Name: "sync-interval",
Usage: "How frequently active DBs should be synced",
EnvVars: []string{"LITERIVER_SYNC_INTERVAL"},
Value: 5 * time.Second,
},
&cli.IntFlag{
Name: "max-active-dbs",
Usage: "Maximum number of active databases to keep open, least recently used will be closed first",
EnvVars: []string{"LITERIVER_MAX_ACTIVE_DBS"},
Value: 2_000,
},
&cli.StringFlag{
Name: "addr",
Usage: "Address to serve metrics on",
Expand All @@ -65,7 +37,65 @@ func main() {
Value: "warn",
},
},
Action: Run,
Commands: []*cli.Command{
{
Name: "restore",
Usage: "Restore a local copy of databases from an S3 bucket",
Action: Restore,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "cloned-bucket-root",
Usage: "local directory that contains a clone of the s3 bucket",
EnvVars: []string{"LITERIVER_CLONED_BUCKET_ROOT"},
Required: true,
},
&cli.StringFlag{
Name: "target-directory",
Usage: "local directory to restore the databases to",
EnvVars: []string{"LITERIVER_TARGET_DIRECTORY"},
Required: true,
},
},
},
{
Name: "replicate",
Usage: "Replicate databases from a directory to S3",
Action: Replicate,
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "dir",
Usage: "Directories to monitor (can be specified multiple times)",
EnvVars: []string{"LITERIVER_DIR"},
Required: true,
},
&cli.StringFlag{
Name: "replica-root",
Usage: "S3 Bucket URL for Replication (https://{s3_url}.com/{bucket_name})",
EnvVars: []string{"LITERIVER_REPLICA_ROOT"},
Required: true,
},
&cli.DurationFlag{
Name: "db-ttl",
Usage: "Time to live for a database before it is closed",
EnvVars: []string{"LITERIVER_DB_TTL"},
Value: 2 * time.Minute,
},
&cli.DurationFlag{
Name: "sync-interval",
Usage: "How frequently active DBs should be synced",
EnvVars: []string{"LITERIVER_SYNC_INTERVAL"},
Value: 5 * time.Second,
},
&cli.IntFlag{
Name: "max-active-dbs",
Usage: "Maximum number of active databases to keep open, least recently used will be closed first",
EnvVars: []string{"LITERIVER_MAX_ACTIVE_DBS"},
Value: 2_000,
},
},
},
},
DefaultCommand: "replicate",
}

if err := app.Run(os.Args); err != nil {
Expand All @@ -75,8 +105,125 @@ func main() {
slog.Info("literiver exiting")
}

// Run loads all databases specified in the configuration.
func Run(cctx *cli.Context) (err error) {
type replicaOpt struct {
replica *litestream.Replica
opt *litestream.RestoreOptions
source string
}

// Restore runs on an rclone'd directory and restores all databases from S3.
func Restore(cctx *cli.Context) (err error) {
ctx := cctx.Context
start := time.Now()

// Walk all databases in the directory
// For each database, register a new DB using the path as a local replica target

var dbPaths []string

clonedBucketRoot, err := filepath.Abs(cctx.String("cloned-bucket-root"))
if err != nil {
return fmt.Errorf("failed to resolve directory path (%s): %w", clonedBucketRoot, err)
}

targetDir, err := filepath.Abs(cctx.String("target-directory"))
if err != nil {
return fmt.Errorf("failed to resolve directory path (%s): %w", targetDir, err)
}

slog.Info("starting restore", "cloned_bucket_root", clonedBucketRoot, "target_directory", targetDir)

// Check if clonedBucketRoot exists
if _, err := os.Stat(clonedBucketRoot); os.IsNotExist(err) {
return fmt.Errorf("cloned bucket root does not exist: %s", clonedBucketRoot)
}

// Create targetDir if it doesn't exist
if _, err := os.Stat(targetDir); os.IsNotExist(err) {
slog.Info("creating target directory", "target_directory", targetDir)
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
}

// Function to be called for each file/directory found
err = filepath.Walk(clonedBucketRoot, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

// Check if the file is a directory and its name ends with '.sqlite'
if info.IsDir() && strings.HasSuffix(info.Name(), ".sqlite") {
dbPaths = append(dbPaths, path)
}

return nil
})

if err != nil {
return fmt.Errorf("error walking the path %q: %v", clonedBucketRoot, err)
}

slog.Info("found databases", "databases", dbPaths)

replicas := []replicaOpt{}

replicaInitErrors := []error{}

for _, dbPath := range dbPaths {
opt := litestream.NewRestoreOptions()
// Set the output path to the target directory plus the relative path of the database
opt.OutputPath = filepath.Join(targetDir, strings.TrimPrefix(dbPath, clonedBucketRoot))
syncInterval := litestream.DefaultSyncInterval
r, err := NewReplicaFromConfig(&ReplicaConfig{
Path: dbPath,
SyncInterval: &syncInterval,
}, nil)
if err != nil {
replicaInitErrors = append(replicaInitErrors, fmt.Errorf("failed to create replica for %s: %w", dbPath, err))
continue
}
opt.Generation, _, err = r.CalcRestoreTarget(ctx, opt)
if err != nil {
replicaInitErrors = append(replicaInitErrors, fmt.Errorf("failed to calculate restore target for %s: %w", dbPath, err))
continue
}
replicas = append(replicas, replicaOpt{
replica: r,
opt: &opt,
source: dbPath,
})
}

replicaRestoreErrors := []error{}

// Restore all databases
for i, item := range replicas {
log := slog.With("dest_path", item.opt.OutputPath, "generation", item.opt.Generation, "source_path", item.source)
log.Info("restoring replica")
if err := item.replica.Restore(ctx, *item.opt); err != nil {
replicaRestoreErrors = append(replicaRestoreErrors, fmt.Errorf("failed to restore replica %s: %w", item.opt.OutputPath, err))
log.Info("failed to restore replica", "error", err)
continue
}
log.Info("restored replica", "replicas_restored", i+1, "replicas_total", len(replicas))
}

slog.Info("restore complete",
"source_dbs_discovered", len(dbPaths),
"successfully_restored_replicas", len(replicas)-len(replicaRestoreErrors),
"unsuccessfully_initialized_replicas", len(replicaInitErrors),
"unsuccessfully_restored_replicas", len(replicaRestoreErrors),
"duration", time.Since(start).String(),
"replica_init_errors", replicaInitErrors,
"replica_restore_errors", replicaRestoreErrors,
)

return nil
}

// Replicate loads all databases specified in the configuration.
func Replicate(cctx *cli.Context) (err error) {
logLvl := new(slog.LevelVar)
logLvl.UnmarshalText([]byte(cctx.String("log-level")))
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Expand Down

0 comments on commit 78ed95f

Please sign in to comment.