From 7af2bdb8077ca27d16a92999d8a6678ae98a6c49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 7 Oct 2024 09:34:56 +0200 Subject: [PATCH] feat(restore): filter restored manifests by dc Fixes #3829 s: filter by dc --- pkg/service/restore/index.go | 21 +++++++++++++++------ pkg/service/restore/model.go | 1 + pkg/service/restore/schema_worker.go | 10 ++++++++++ pkg/service/restore/tables_worker.go | 2 +- pkg/service/restore/worker.go | 10 ++++++++++ 5 files changed, 37 insertions(+), 7 deletions(-) diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index dd7b7b72c..585498d08 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -10,6 +10,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/sstable" + "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter" ) // LocationWorkload represents aggregated restore workload @@ -56,10 +57,10 @@ type SSTable struct { } // IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. -func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) { +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location, dcFilters []string) ([]LocationWorkload, error) { var workload []LocationWorkload for _, l := range locations { - lw, err := w.indexLocationWorkload(ctx, l) + lw, err := w.indexLocationWorkload(ctx, l, dcFilters) if err != nil { return nil, errors.Wrapf(err, "index workload in %s", l) } @@ -68,8 +69,8 @@ func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) return workload, nil } -func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) { - rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location, dcFilters []string) (LocationWorkload, error) { + rawWorkload, err := w.createRemoteDirWorkloads(ctx, location, dcFilters) if err != nil { return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") } @@ -84,9 +85,17 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat return workload, nil } -func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { +func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location, dcFilters []string) ([]RemoteDirWorkload, error) { + dcs, err := dcfilter.NewFilter(dcFilters) + if err != nil { + return nil, errors.Wrapf(err, "create dc filter") + } + var rawWorkload []RemoteDirWorkload - err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + err = w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + if !dcs.Check(m.DC) { + return nil + } return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { return nil diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 9ed446d15..cd0e85870 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -25,6 +25,7 @@ import ( type Target struct { Location []Location `json:"location"` Keyspace []string `json:"keyspace,omitempty"` + Datacenter []string `json:"datacenter,omitempty"` SnapshotTag string `json:"snapshot_tag"` BatchSize int `json:"batch_size,omitempty"` Parallel int `json:"parallel,omitempty"` diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index d60417b99..31f5e284b 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter" "github.com/scylladb/scylla-manager/v3/pkg/util/query" "go.uber.org/atomic" @@ -205,7 +206,16 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc return w.workFunc(ctx, fm) } + dcs, err := dcfilter.NewFilter(w.target.Datacenter) + if err != nil { + return errors.Wrapf(err, "create dc filter") + } + manifestDownloadHandler := func(miwc ManifestInfoWithContent) error { + if !dcs.Check(miwc.DC) { + return nil + } + w.logger.Info(ctx, "Downloading schema from manifest", "manifest", miwc.ManifestInfo) defer w.logger.Info(ctx, "Downloading schema from manifest finished", "manifest", miwc.ManifestInfo) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index e79d8b70d..c4d60776d 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -161,7 +161,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "Started restoring tables") defer w.logger.Info(ctx, "Restoring tables finished") - workload, err := w.IndexWorkload(ctx, w.target.Location) + workload, err := w.IndexWorkload(ctx, w.target.Location, w.target.Datacenter) if err != nil { return err } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 8984a934a..b8cf32fcd 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -22,6 +22,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/schema/table" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter" "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/retry" "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" @@ -336,9 +337,18 @@ func (w *worker) initUnits(ctx context.Context) error { unitMap = make(map[string]Unit) ) + dcs, err := dcfilter.NewFilter(w.target.Datacenter) + if err != nil { + return errors.Wrapf(err, "create dc filter") + } + var foundManifest bool for _, l := range w.target.Location { manifestHandler := func(miwc ManifestInfoWithContent) error { + if !dcs.Check(miwc.DC) { + return nil + } + foundManifest = true filesHandler := func(fm FilesMeta) {