From f6b85ed436e968a624063c691c4c29f7efe979d6 Mon Sep 17 00:00:00 2001 From: Daniel Lipovetsky Date: Thu, 18 Apr 2024 11:39:18 -0700 Subject: [PATCH] feat: Restore zstd-compressed database backup The database backup is compressed using zstd. This allows us to load the backup without having to decompress it beforehand. --- pkg/sloop/ingress/dbrestore.go | 22 ++++- pkg/sloop/ingress/dbrestore_test.go | 131 ++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 pkg/sloop/ingress/dbrestore_test.go diff --git a/pkg/sloop/ingress/dbrestore.go b/pkg/sloop/ingress/dbrestore.go index 463a6c2a..21016dc2 100644 --- a/pkg/sloop/ingress/dbrestore.go +++ b/pkg/sloop/ingress/dbrestore.go @@ -1,9 +1,12 @@ package ingress import ( + "io" "os" "runtime" + "github.com/golang/glog" + "github.com/klauspost/compress/zstd" "github.com/pkg/errors" "github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap" @@ -17,10 +20,25 @@ func DatabaseRestore(db badgerwrap.DB, filename string) error { } defer file.Close() - err = db.Load(file, runtime.NumCPU()) + zr, err := zstd.NewReader(file) if err != nil { - return errors.Wrapf(err, "failed to restore database from file: %q", filename) + return errors.Wrapf(err, "failed to initialize zstd reader") } + defer zr.Close() + + err = db.Load(zr, runtime.NumCPU()) + if errors.Is(err, zstd.ErrMagicMismatch) { + glog.V(2).Infof("database file is not compressed with zstd, will load without decompressing") + // We already loaded the database, advancing the file offset. To load the database again, + // we must reset the offset to the start. + if _, err := file.Seek(0, io.SeekStart); err != nil { + return errors.Wrapf(err, "failed to to rewind to start of database restore file: %q", filename) + } + err = db.Load(file, runtime.NumCPU()) + } + if err != nil { + return errors.Wrapf(err, "failed to restore database from file: %q", filename) + } return nil } diff --git a/pkg/sloop/ingress/dbrestore_test.go b/pkg/sloop/ingress/dbrestore_test.go new file mode 100644 index 00000000..29e362c5 --- /dev/null +++ b/pkg/sloop/ingress/dbrestore_test.go @@ -0,0 +1,131 @@ +package ingress + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/DataDog/zstd" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/pkg/errors" + "github.com/salesforce/sloop/pkg/sloop/store/typed" + "github.com/salesforce/sloop/pkg/sloop/store/untyped" + "github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap" +) + +func TestDatabaseRestore(t *testing.T) { + tests := []struct { + name string + backupFn func(db badgerwrap.DB, path string) error + wantErr error + }{ + { + name: "restore uncompressed database backup", + backupFn: backupUncompressed, + }, + { + name: "restore zstd-compressed database backup", + backupFn: backupZstdCompressed, + }, + } + + tmpDir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("failed to create temporary directory: %s", err) + } + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, err := createExampleDatabase() + if err != nil { + t.Fatalf("failed to create example database: %s", err) + } + defer db.Close() + + dbPath := filepath.Join(tmpDir, fmt.Sprintf("test-%d.db", i)) + err = tt.backupFn(db, dbPath) + if err != nil { + t.Fatalf("failed to backup example database: %s", err) + } + + if err := DatabaseRestore(db, dbPath); err != tt.wantErr { + t.Errorf("DatabaseRestore() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func backupUncompressed(db badgerwrap.DB, path string) error { + w, err := os.Create(path) + if err != nil { + return errors.Wrapf(err, "failed to create file") + } + defer w.Close() + + _, err = db.Backup(w, 0) + if err != nil { + return errors.Wrapf(err, "failed to backup database") + } + + return nil +} + +func backupZstdCompressed(db badgerwrap.DB, path string) error { + cf, err := os.Create(path) + if err != nil { + return errors.Wrapf(err, "failed to create file") + } + defer cf.Close() + + zw := zstd.NewWriter(cf) + _, err = db.Backup(zw, 0) + if err != nil { + return errors.Wrapf(err, "failed to backup database") + } + zw.Close() + + return nil +} + +func createExampleDatabase() (badgerwrap.DB, error) { + rootPath, err := os.MkdirTemp("", "") + if err != nil { + return nil, errors.Wrapf(err, "failed to create temporary directory for database:") + } + + factory := &badgerwrap.BadgerFactory{} + storeConfig := &untyped.Config{ + RootPath: rootPath, + ConfigPartitionDuration: time.Hour, + } + db, err := untyped.OpenStore(factory, storeConfig) + if err != nil { + return nil, errors.Wrapf(err, "failed to init untyped store:") + } + + wt := typed.OpenKubeWatchResultTable() + err = db.Update(func(txn badgerwrap.Txn) error { + txerr := wt.Set(txn, + typed.NewWatchTableKey( + "somePartition", + "someKind", + "someNamespace", + "someName", + time.UnixMicro(0)).String(), + &typed.KubeWatchResult{Timestamp: ×tamp.Timestamp{ + Seconds: 0, + Nanos: 0, + }, Kind: "test", Payload: "test"}) + if txerr != nil { + return txerr + } + return nil + }) + if err != nil { + return nil, errors.Wrapf(err, "error updating database:") + } + + return db, nil +}