diff --git a/docs/source/sctool/partials/sctool_restore.yaml b/docs/source/sctool/partials/sctool_restore.yaml index d0efdc23d..e67c27d1f 100644 --- a/docs/source/sctool/partials/sctool_restore.yaml +++ b/docs/source/sctool/partials/sctool_restore.yaml @@ -26,6 +26,8 @@ options: usage: | Task schedule as a cron `expression`. It supports the extended syntax including @monthly, @weekly, @daily, @midnight, @hourly, @every X[h|m|s]. + - name: dc-mapping + usage: "Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.\n\nThe Syntax is \"source_dc1=>target_dc1;source_dc2=>target_dc2\" where multiple mappings are separated by semicolons (;)\nand source and target DCs are separated by arrow (=>).\n\nExample: \"dc1=>dc3;dc2=>dc4\" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4.\n\nOnly works with tables restoration (--restore-tables=true). \nNote: Only DCs that are provided in mappings will be restored.\n" - name: dry-run default_value: "false" usage: | @@ -90,6 +92,7 @@ options: The `` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data from this location in a multi-dc setting, it must match Scylla nodes datacenter. By default, all live nodes are used to restore data from specified locations. + If `--dc-mapping` is used, then `` parameter will be ignored. Note that specifying datacenters closest to backup locations might reduce download time of restored data. The supported storage ''s are 'azure', 'gcs', 's3'. diff --git a/docs/source/sctool/partials/sctool_restore_update.yaml b/docs/source/sctool/partials/sctool_restore_update.yaml index 3f0794607..56eb5a07e 100644 --- a/docs/source/sctool/partials/sctool_restore_update.yaml +++ b/docs/source/sctool/partials/sctool_restore_update.yaml @@ -24,6 +24,8 @@ options: usage: | Task schedule as a cron `expression`. It supports the extended syntax including @monthly, @weekly, @daily, @midnight, @hourly, @every X[h|m|s]. + - name: dc-mapping + usage: "Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.\n\nThe Syntax is \"source_dc1=>target_dc1;source_dc2=>target_dc2\" where multiple mappings are separated by semicolons (;)\nand source and target DCs are separated by arrow (=>).\n\nExample: \"dc1=>dc3;dc2=>dc4\" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4.\n\nOnly works with tables restoration (--restore-tables=true). \nNote: Only DCs that are provided in mappings will be restored.\n" - name: dry-run default_value: "false" usage: | @@ -88,6 +90,7 @@ options: The `` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data from this location in a multi-dc setting, it must match Scylla nodes datacenter. By default, all live nodes are used to restore data from specified locations. + If `--dc-mapping` is used, then `` parameter will be ignored. Note that specifying datacenters closest to backup locations might reduce download time of restored data. The supported storage ''s are 'azure', 'gcs', 's3'. diff --git a/pkg/command/restore/cmd.go b/pkg/command/restore/cmd.go index 834f132a8..8cd85daf2 100644 --- a/pkg/command/restore/cmd.go +++ b/pkg/command/restore/cmd.go @@ -37,6 +37,7 @@ type command struct { restoreTables bool dryRun bool showTables bool + dcMapping dcMappings } func NewCommand(client *managerclient.Client) *cobra.Command { @@ -90,6 +91,7 @@ func (cmd *command) init() { w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "") w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "") w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "") + w.Unwrap().Var(&cmd.dcMapping, "dc-mapping", "") } func (cmd *command) run(args []string) error { @@ -182,6 +184,13 @@ func (cmd *command) run(args []string) error { props["restore_tables"] = cmd.restoreTables ok = true } + if cmd.Flag("dc-mapping").Changed { + if cmd.Update() { + return wrapper("dc-mapping") + } + props["dc_mapping"] = cmd.dcMapping + ok = true + } if cmd.dryRun { res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task) diff --git a/pkg/command/restore/dcmappings.go b/pkg/command/restore/dcmappings.go new file mode 100644 index 000000000..7ae085c71 --- /dev/null +++ b/pkg/command/restore/dcmappings.go @@ -0,0 +1,59 @@ +// Copyright (C) 2025 ScyllaDB + +package restore + +import ( + "strings" + + "github.com/pkg/errors" +) + +type dcMappings []dcMapping + +type dcMapping struct { + Source string `json:"source"` + Target string `json:"target"` +} + +// Set parses --dc-mapping flag, where the syntax is following: +// ; - used to split different mappings +// => - used to split source => target DCs. +func (dcm *dcMappings) Set(v string) error { + mappingParts := strings.Split(v, ";") + for _, dcMapPart := range mappingParts { + sourceTargetParts := strings.Split(dcMapPart, "=>") + if len(sourceTargetParts) != 2 { + return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) + } + if sourceTargetParts[0] == "" || sourceTargetParts[1] == "" { + return errors.New("invalid syntax, mapping should be in a format of sourceDcs=>targetDcs, but got: " + dcMapPart) + } + + var mapping dcMapping + mapping.Source = strings.TrimSpace(sourceTargetParts[0]) + mapping.Target = strings.TrimSpace(sourceTargetParts[1]) + + *dcm = append(*dcm, mapping) + } + return nil +} + +// String builds --dc-mapping flag back from struct. +func (dcm *dcMappings) String() string { + if dcm == nil { + return "" + } + var res strings.Builder + for i, mapping := range *dcm { + res.WriteString(mapping.Source + "=>" + mapping.Target) + if i != len(*dcm)-1 { + res.WriteString(";") + } + } + return res.String() +} + +// Type implements pflag.Value interface. +func (dcm *dcMappings) Type() string { + return "dc-mapping" +} diff --git a/pkg/command/restore/dcmappings_test.go b/pkg/command/restore/dcmappings_test.go new file mode 100644 index 000000000..09ab81b24 --- /dev/null +++ b/pkg/command/restore/dcmappings_test.go @@ -0,0 +1,110 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import ( + "fmt" + "slices" + "testing" +) + +func TestSetDCMapping(t *testing.T) { + testCases := []struct { + input string + expectedErr bool + expectedMappings dcMappings + }{ + { + input: "dc1=>dc2", + expectedMappings: dcMappings{ + {Source: "dc1", Target: "dc2"}, + }, + }, + { + input: " dc1 => dc1 ", + expectedMappings: dcMappings{ + {Source: "dc1", Target: "dc1"}, + }, + }, + { + input: "dc1=>dc3;dc2=>dc4", + expectedMappings: dcMappings{ + {Source: "dc1", Target: "dc3"}, + {Source: "dc2", Target: "dc4"}, + }, + }, + { + input: "dc1=>dc3=>dc2=>dc4", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: ";", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "=>", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "dc1=>", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + { + input: "dc1=>;", + expectedMappings: dcMappings{}, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + var mappings dcMappings + + err := mappings.Set(tc.input) + if tc.expectedErr && err == nil { + t.Fatal("Expected err, but got nil") + } + if !tc.expectedErr && err != nil { + t.Fatalf("Unexpected err: %v", err) + } + if !slices.Equal(mappings, tc.expectedMappings) { + t.Fatalf("Expected %v, but got %v", tc.expectedMappings, mappings) + } + }) + } + +} + +func TestDCMappingString(t *testing.T) { + testCases := []struct { + mappings dcMappings + expected string + }{ + { + mappings: dcMappings{ + {Source: "dc1", Target: "dc2"}, + }, + expected: "dc1=>dc2", + }, + { + mappings: dcMappings{ + {Source: "dc1", Target: "dc2"}, + {Source: "dc3", Target: "dc4"}, + }, + expected: "dc1=>dc2;dc3=>dc4", + }, + {}, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + actual := tc.mappings.String() + if actual != tc.expected { + t.Fatalf("Expected %q, but got %q", tc.expected, actual) + } + }) + } +} diff --git a/pkg/command/restore/res.yaml b/pkg/command/restore/res.yaml index eaeb4bd56..17dae45a4 100644 --- a/pkg/command/restore/res.yaml +++ b/pkg/command/restore/res.yaml @@ -15,6 +15,7 @@ location: | The `` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data from this location in a multi-dc setting, it must match Scylla nodes datacenter. By default, all live nodes are used to restore data from specified locations. + If `--dc-mapping` is used, then `` parameter will be ignored. Note that specifying datacenters closest to backup locations might reduce download time of restored data. The supported storage ''s are 'azure', 'gcs', 's3'. @@ -72,3 +73,14 @@ dry-run: | show-tables: | Prints table names together with keyspace, used in combination with --dry-run. + +dc-mapping: | + Specifies mapping between DCs from the backup and DCs in the restored(target) cluster. + + The Syntax is "source_dc1=>target_dc1;source_dc2=>target_dc2" where multiple mappings are separated by semicolons (;) + and source and target DCs are separated by arrow (=>). + + Example: "dc1=>dc3;dc2=>dc4" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4. + + Only works with tables restoration (--restore-tables=true). + Note: Only DCs that are provided in mappings will be restored. diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go index 50646e846..9c17eac4d 100644 --- a/pkg/service/restore/batch.go +++ b/pkg/service/restore/batch.go @@ -57,7 +57,7 @@ type batchDispatcher struct { hostShardCnt map[string]uint } -func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[Location][]string) *batchDispatcher { +func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationInfo []LocationInfo) *batchDispatcher { sortWorkload(workload) var shards uint for _, sh := range hostShardCnt { @@ -70,7 +70,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin mu: sync.Mutex{}, wait: make(chan struct{}), workload: workload, - workloadProgress: newWorkloadProgress(workload, locationHosts), + workloadProgress: newWorkloadProgress(workload, locationInfo), batchSize: batchSize, expectedShardWorkload: workload.TotalSize / int64(shards), hostShardCnt: hostShardCnt, @@ -106,22 +106,22 @@ type remoteSSTableDirProgress struct { RemainingSSTables []RemoteSSTable } -func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress { +func newWorkloadProgress(workload Workload, locationInfo []LocationInfo) workloadProgress { dcBytes := make(map[string]int64) - locationDC := make(map[string][]string) p := make([]remoteSSTableDirProgress, len(workload.RemoteDir)) for i, rdw := range workload.RemoteDir { dcBytes[rdw.DC] += rdw.Size - locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC) p[i] = remoteSSTableDirProgress{ RemainingSize: rdw.Size, RemainingSSTables: rdw.SSTables, } } - hostDCAccess := make(map[string][]string) - for loc, hosts := range locationHosts { - for _, h := range hosts { - hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...) + hostDCAccess := map[string][]string{} + for _, l := range locationInfo { + for dc, hosts := range l.DCHosts { + for _, h := range hosts { + hostDCAccess[h] = append(hostDCAccess[h], dc) + } } } return workloadProgress{ @@ -201,8 +201,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error { for i, rdp := range bd.workloadProgress.remoteDir { if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 { rdw := bd.workload.RemoteDir[i] - return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info", - rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size) + return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info", + rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size) } } for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored { @@ -257,7 +257,7 @@ func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) { if slices.Contains(bd.workloadProgress.hostFailedDC[host], rdw.DC) { continue } - // Sip dir from location without access + // Skip dir from location without access if !slices.Contains(bd.workloadProgress.hostDCAccess[host], rdw.DC) { continue } diff --git a/pkg/service/restore/batch_test.go b/pkg/service/restore/batch_test.go index 9f206716e..a6561c5aa 100644 --- a/pkg/service/restore/batch_test.go +++ b/pkg/service/restore/batch_test.go @@ -5,6 +5,8 @@ package restore import ( "testing" + "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" ) @@ -104,17 +106,29 @@ func TestBatchDispatcher(t *testing.T) { workload := aggregateWorkload(rawWorkload) - locationHosts := map[backupspec.Location][]string{ - l1: {"h1", "h2"}, - l2: {"h3"}, - } hostToShard := map[string]uint{ "h1": 1, "h2": 2, "h3": 3, } - bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts) + locationInfo := []LocationInfo{ + { + Location: l1, + DCHosts: map[string][]string{ + "dc1": {"h1", "h2"}, + "dc2": {"h1", "h2"}, + }, + }, + { + Location: l2, + DCHosts: map[string][]string{ + "dc3": {"h3"}, + }, + }, + } + + bd := newBatchDispatcher(workload, 1, hostToShard, locationInfo) scenario := []struct { host string @@ -166,3 +180,140 @@ func TestBatchDispatcher(t *testing.T) { t.Fatalf("Expected sstables to be batched: %s", err) } } + +func TestNewWorkloadProgress(t *testing.T) { + testCases := []struct { + name string + + workload Workload + locationInfo []LocationInfo + + expected map[string][]string + }{ + { + name: "one location with one DC", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1"}}), + locationInfo: []LocationInfo{ + { + DCHosts: map[string][]string{ + "dc1": {"host1", "host2"}, + }, + }, + }, + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + }, + }, + { + name: "one location with two DC's", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}), + locationInfo: []LocationInfo{ + { + DCHosts: map[string][]string{ + "dc1": {"host1"}, + "dc2": {"host2"}, + }, + }, + }, + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + }, + { + name: "one location with two DC's, more nodes", + workload: generateWorkload(t, []string{""}, map[string][]string{"": {"dc1", "dc2"}}), + locationInfo: []LocationInfo{ + { + DCHosts: map[string][]string{ + "dc1": {"host1", "host2"}, + "dc2": {"host3", "host4"}, + }, + }, + }, + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc1"}, + "host3": {"dc2"}, + "host4": {"dc2"}, + }, + }, + { + name: "two locations with one DC each", + workload: generateWorkload(t, + []string{"location1", "location2"}, + map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}}, + ), + locationInfo: []LocationInfo{ + { + DCHosts: map[string][]string{ + "dc1": {"host1"}, + }, + }, + { + DCHosts: map[string][]string{ + "dc2": {"host2"}, + }, + }, + }, + expected: map[string][]string{ + "host1": {"dc1"}, + "host2": {"dc2"}, + }, + }, + { + name: "two locations with one DC each, but hosts maps to all dcs", + workload: generateWorkload(t, + []string{"location1", "location2"}, + map[string][]string{"location1": {"dc1"}, "location2": {"dc2"}}, + ), + locationInfo: []LocationInfo{ + { + DCHosts: map[string][]string{ + "dc1": {"host1", "host2"}, + }, + }, + { + DCHosts: map[string][]string{ + "dc2": {"host1", "host2"}, + }, + }, + }, + expected: map[string][]string{ + "host1": {"dc1", "dc2"}, + "host2": {"dc1", "dc2"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + wp := newWorkloadProgress(tc.workload, tc.locationInfo) + if diff := cmp.Diff(wp.hostDCAccess, tc.expected); diff != "" { + t.Fatalf("Actual != Expected: %s", diff) + } + }) + } +} + +func generateWorkload(t *testing.T, locationPaths []string, dcsInLocation map[string][]string) Workload { + t.Helper() + + var remoteDirs []RemoteDirWorkload + for _, path := range locationPaths { + dcs, ok := dcsInLocation[path] + if !ok { + t.Fatalf("each location should have corresponding entry in dcsInLocation map") + } + for _, dc := range dcs { + remoteDirs = append(remoteDirs, RemoteDirWorkload{ + ManifestInfo: &backupspec.ManifestInfo{ + DC: dc, + Location: backupspec.Location{Path: path}, + }, + }) + } + } + return Workload{RemoteDir: remoteDirs} +} diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 6f82d84c7..194dc02ef 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -45,12 +45,12 @@ type SSTable struct { } // IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. -func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) (Workload, error) { +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []LocationInfo) (Workload, error) { var rawWorkload []RemoteDirWorkload for _, l := range locations { lw, err := w.indexLocationWorkload(ctx, l) if err != nil { - return Workload{}, errors.Wrapf(err, "index workload in %s", l) + return Workload{}, errors.Wrapf(err, "index workload in %s", l.Location) } rawWorkload = append(rawWorkload, lw...) } @@ -59,7 +59,7 @@ func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) return workload, nil } -func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location LocationInfo) ([]RemoteDirWorkload, error) { rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) if err != nil { return nil, errors.Wrap(err, "create remote dir workloads") @@ -73,7 +73,7 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Locat return rawWorkload, nil } -func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { +func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location LocationInfo) ([]RemoteDirWorkload, error) { var rawWorkload []RemoteDirWorkload err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { @@ -86,7 +86,7 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Lo return errors.Wrapf(err, "convert files meta to sstables") } sstDir := m.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) - remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, w.randomHostFromLocation(location), sstDir, sstables) + remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, location.AnyHost(), sstDir, sstables) if err != nil { return errors.Wrap(err, "fetch sstables sizes") } diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 0e5ceecc9..f5b227a9e 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -3,13 +3,14 @@ package restore import ( + "encoding/json" "reflect" "slices" - "sort" "time" "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" @@ -33,9 +34,43 @@ type Target struct { RestoreSchema bool `json:"restore_schema,omitempty"` RestoreTables bool `json:"restore_tables,omitempty"` Continue bool `json:"continue"` + DCMappings DCMappings `json:"dc_mapping"` - // Cache for host with access to remote location - locationHosts map[Location][]string `json:"-"` + locationInfo []LocationInfo +} + +// LocationInfo contains information about Location, such as what DCs it has, +// what hosts can access what dcs, and the list of manifests from this location. +type LocationInfo struct { + // DC contains all data centers that can be found in this location + DC []string + // Contains hosts that should handle DCs from this location + // after DCMappings are applied + DCHosts map[string][]string + Location Location + + // Manifest in this Location. Shouldn't contain manifests from DCs + // that are not in the DCMappings + Manifest []*ManifestInfo +} + +// AnyHost returns random host with access to this Location. +func (l LocationInfo) AnyHost() string { + for _, hosts := range l.DCHosts { + if len(hosts) != 0 { + return hosts[0] + } + } + return "" +} + +// AllHosts returns all hosts with the access to this Location. +func (l LocationInfo) AllHosts() []string { + hosts := strset.New() + for _, h := range l.DCHosts { + hosts.Add(h...) + } + return hosts.List() } const ( @@ -54,9 +89,18 @@ func defaultTarget() Target { } } +// parseTarget parse Target from properties and applies defaults. +func parseTarget(properties json.RawMessage) (Target, error) { + t := defaultTarget() + if err := json.Unmarshal(properties, &t); err != nil { + return Target{}, err + } + return t, t.validateProperties() +} + // validateProperties makes a simple validation of params set by user. // It does not perform validations that require access to the service. -func (t Target) validateProperties(dcMap map[string][]string) error { +func (t Target) validateProperties() error { if len(t.Location) == 0 { return errors.New("missing location") } @@ -73,24 +117,24 @@ func (t Target) validateProperties(dcMap map[string][]string) error { return errors.New("transfers param has to be equal to -1 (set transfers to the value from scylla-manager-agent.yaml config) " + "or 0 (set transfers for fastest download) or greater than zero") } - if err := CheckDCs(t.RateLimit, dcMap); err != nil { - return errors.Wrap(err, "invalid rate limit") - } if t.RestoreSchema == t.RestoreTables { return errors.New("choose EXACTLY ONE restore type ('--restore-schema' or '--restore-tables' flag)") } if t.RestoreSchema && t.Keyspace != nil { return errors.New("restore schema always restores 'system_schema.*' tables only, no need to specify '--keyspace' flag") } + // Check for duplicates in Location + allLocations := strset.New() + for _, l := range t.Location { + p := l.RemotePath("") + if allLocations.Has(p) { + return errors.Errorf("location %s is specified multiple times", l) + } + allLocations.Add(p) + } return nil } -func (t Target) sortLocations() { - sort.SliceStable(t.Location, func(i, j int) bool { - return t.Location[i].String() < t.Location[j].String() - }) -} - // Run tracks restore progress, shares ID with scheduler.Run that initiated it. type Run struct { ClusterID uuid.UUID @@ -287,3 +331,24 @@ type HostInfo struct { Transfers int RateLimit int } + +// DCMappings represents how DCs from the backup cluster are mapped to DCs in the restore cluster. +// For details about how DCs can be mapped refer to --dc-mapping documentation. +type DCMappings []DCMapping + +// DCMapping represent single instance of datacenter mappings. See DCMappings for details. +type DCMapping struct { + Source string `json:"source"` + Target string `json:"target"` +} + +// calculateMappings creates two maps from DCMappings where each contains mapping between +// source and target data centers. +func (mappings DCMappings) calculateMappings() (sourceDC2TargetDCMap, targetDC2SourceDCMap map[string]string) { + sourceDC2TargetDCMap, targetDC2SourceDCMap = map[string]string{}, map[string]string{} + for _, mapping := range mappings { + sourceDC2TargetDCMap[mapping.Source] = mapping.Target + targetDC2SourceDCMap[mapping.Target] = mapping.Source + } + return sourceDC2TargetDCMap, targetDC2SourceDCMap +} diff --git a/pkg/service/restore/model_test.go b/pkg/service/restore/model_test.go new file mode 100644 index 000000000..854beb93e --- /dev/null +++ b/pkg/service/restore/model_test.go @@ -0,0 +1,68 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import ( + "maps" + "testing" +) + +func TestCalculateMappings(t *testing.T) { + testCases := []struct { + name string + + mappings DCMappings + expectedSourceMap map[string]string + expectedTargetMap map[string]string + }{ + { + name: "dc1=>dc2", + mappings: []DCMapping{ + { + Source: "dc1", + Target: "dc2", + }, + }, + expectedSourceMap: map[string]string{ + "dc1": "dc2", + }, + expectedTargetMap: map[string]string{ + "dc2": "dc1", + }, + }, + { + name: "dc1=>dc2;dc3=>dc4", + mappings: []DCMapping{ + { + Source: "dc1", + Target: "dc2", + }, + { + Source: "dc3", + Target: "dc4", + }, + }, + expectedSourceMap: map[string]string{ + "dc1": "dc2", + "dc3": "dc4", + }, + expectedTargetMap: map[string]string{ + "dc2": "dc1", + "dc4": "dc3", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sourceDC2TargetDCMap, targetDC2SourceDCMap := tc.mappings.calculateMappings() + + if !maps.Equal(sourceDC2TargetDCMap, tc.expectedSourceMap) { + t.Fatalf("Expected %v, but got %v", tc.expectedSourceMap, sourceDC2TargetDCMap) + } + + if !maps.Equal(targetDC2SourceDCMap, tc.expectedTargetMap) { + t.Fatalf("Expected %v, but got %v", tc.expectedTargetMap, targetDC2SourceDCMap) + } + }) + } +} diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 6a54dd43d..a3aa95bf5 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -1071,3 +1071,89 @@ func TestRestoreTablesProgressIntegration(t *testing.T) { } } } + +func TestRestoreOnlyOneDCFromLocationIntegration(t *testing.T) { + h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) + + Print("Keyspace setup") + // Source cluster + ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}" + ksTwoDC := randomizedName("two_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ksTwoDC)) + + // Keyspace thats only available in dc2 + ksStmtOneDC := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1':0, 'dc2': 1}" + ksOneDC := randomizedName("one_dc_") + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmtOneDC, ksOneDC)) + + // Target cluster + ksStmtDst := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}" + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksTwoDC)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmtDst, ksOneDC)) + + Print("Table setup") + tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)" + tab := randomizedName("tab_") + for _, ks := range []string{ksTwoDC, ksOneDC} { + ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab)) + } + + Print("Fill setup") + for _, ks := range []string{ksTwoDC, ksOneDC} { + fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + } + + Print("Save filled table into map") + srcMTwoDC := selectTableAsMap[int, int](t, h.srcCluster.rootSession, ksTwoDC, tab, "id", "data") + + Print("Run backup") + loc := []Location{ + testLocation("one-location-1", ""), + } + S3InitBucket(t, loc[0].Path) + ksFilter := []string{ksTwoDC, ksOneDC} + tag := h.runBackup(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + "batch_size": 100, + }) + + Print("Run restore") + grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) + res := make(chan struct{}) + go func() { + h.runRestore(t, map[string]any{ + "location": loc, + "keyspace": ksFilter, + // Test if batching does not hang with + // limited parallel and location access. + "parallel": 1, + "snapshot_tag": tag, + "restore_tables": true, + // DC Mapping is required + "dc_mapping": []map[string]string{ + {"source": "dc1", "target": "dc1"}, + }, + }) + close(res) + }() + + select { + case <-res: + case <-time.NewTimer(2 * time.Minute).C: + t.Fatal("Restore hanged") + } + + Print("Save restored table into map") + dstMTwoDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksTwoDC, tab, "id", "data") + dstMOneDC := selectTableAsMap[int, int](t, h.dstCluster.rootSession, ksOneDC, tab, "id", "data") + + Print("Validate success") + if !maps.Equal(srcMTwoDC, dstMTwoDC) { + t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcMTwoDC, dstMTwoDC) + } + if len(dstMOneDC) != 0 { + t.Fatalf("dc2 shouldn't be restored") + } +} diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index dcc5566d8..c0b156c99 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -28,6 +28,7 @@ type schemaWorker struct { worker generationCnt atomic.Int64 + locationInfo LocationInfo // Currently restored Location miwc ManifestInfoWithContent // Currently restored manifest // Maps original SSTable name to its existing older version (with respect to currently restored snapshot tag) // that should be used during the restore procedure. It should be initialized per each restored table. @@ -86,7 +87,7 @@ func (w *schemaWorker) stageRestoreData(ctx context.Context) error { } } // Download files - for _, l := range w.target.Location { + for _, l := range w.target.locationInfo { if err := w.locationDownloadHandler(ctx, l); err != nil { return err } @@ -193,9 +194,11 @@ func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error { return nil } -func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Location) error { - w.logger.Info(ctx, "Downloading schema from location", "location", location) - defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location) +func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location LocationInfo) error { + w.logger.Info(ctx, "Downloading schema from location", "location", location.Location) + defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location.Location) + + w.locationInfo = location tableDownloadHandler := func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { @@ -240,7 +243,10 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { "files", fm.Files, ) - hosts := w.target.locationHosts[w.miwc.Location] + hosts, ok := w.locationInfo.DCHosts[w.miwc.DC] + if !ok { + return errors.Errorf("hosts for the DC %s are not found", w.miwc.DC) + } w.versionedFiles, err = ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, hosts[0], srcDir) if err != nil { return errors.Wrap(err, "initialize versioned SSTables") @@ -333,7 +339,7 @@ func (w *schemaWorker) getFileNamesMapping(sstables []string, sstableUUIDFormat return sstable.RenameToIDs(sstables, &w.generationCnt) } -func getDescribedSchema(ctx context.Context, client *scyllaclient.Client, snapshotTag string, locHost map[Location][]string) (schema *query.DescribedSchema, err error) { +func getDescribedSchema(ctx context.Context, client *scyllaclient.Client, snapshotTag string, locationInfo []LocationInfo) (schema *query.DescribedSchema, err error) { baseDir := path.Join("backup", string(SchemaDirKind)) // It's enough to get a single schema file, but it's important to validate // that each location contains exactly one or none of them. @@ -342,11 +348,11 @@ func getDescribedSchema(ctx context.Context, client *scyllaclient.Client, snapsh schemaPath *string foundCnt int ) - for l, hosts := range locHost { - host = hosts[0] - schemaPath, err = getRemoteSchemaFilePath(ctx, client, snapshotTag, host, l.RemotePath(baseDir)) + for _, l := range locationInfo { + host = l.AnyHost() + schemaPath, err = getRemoteSchemaFilePath(ctx, client, snapshotTag, host, l.Location.RemotePath(baseDir)) if err != nil { - return nil, errors.Wrapf(err, "get schema file from %s", l.RemotePath(baseDir)) + return nil, errors.Wrapf(err, "get schema file from %s", l.Location.RemotePath(baseDir)) } if schemaPath != nil { foundCnt++ @@ -355,7 +361,7 @@ func getDescribedSchema(ctx context.Context, client *scyllaclient.Client, snapsh if foundCnt == 0 { return nil, nil // nolint: nilnil - } else if foundCnt < len(locHost) { + } else if foundCnt < len(locationInfo) { return nil, errors.New("only a subset of provided locations has schema files") } diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index 83112968f..b5393bfee 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -76,24 +76,10 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI defer w.clusterSession.Close() w.setRunInfo(taskID, runID) - if err := w.initTarget(ctx, properties); err != nil { - return errors.Wrap(err, "init target") - } - if err := w.decorateWithPrevRun(ctx); err != nil { + if err := w.init(ctx, properties); err != nil { return err } - if w.run.Units == nil { - // Cache must be initialised only once (even with continue=false), as it contains information already lost - // in the cluster (e.g. tombstone_gc mode, views definition, etc). - if err := w.initUnits(ctx); err != nil { - return errors.Wrap(err, "initialize units") - } - if err := w.initViews(ctx); err != nil { - return errors.Wrap(err, "initialize views") - } - } - if w.run.PrevID == uuid.Nil { // Reset metrics on fresh start w.metrics.ResetClusterMetrics(w.run.ClusterID) diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 8ad8fb0db..e32d4e2a6 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -771,7 +771,7 @@ func restoreWithAgentRestart(t *testing.T, target Target, keyspace string, loadC // Recreate schema on destination cluster if target.RestoreTables { - WriteDataSecondClusterSchema(t, dstSession, keyspace, 0, 0) + WriteData(t, dstSession, keyspace, 0) } srcH.prepareRestoreBackup(srcSession, keyspace, loadCnt, loadSize) @@ -876,7 +876,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo // Recreate schema on destination cluster if target.RestoreTables { - WriteDataSecondClusterSchema(t, dstSession, keyspace, 0, 0) + WriteData(t, dstSession, keyspace, 0) CreateMaterializedView(t, dstSession, keyspace, BigTableName, mv) } @@ -1085,7 +1085,7 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, if target.RestoreTables { Print("Recreate schema on destination cluster") - WriteDataSecondClusterSchema(t, dstSession, keyspace, 0, 0) + WriteData(t, dstSession, keyspace, 0) } else { // This test requires SSTables in Scylla data dir to remain unchanged. // This is achieved by NullCompactionStrategy in user table, but since system tables @@ -1331,7 +1331,7 @@ func restoreViewCQLSchema(t *testing.T, target Target, keyspace string, loadCnt, if target.RestoreTables { Print("When: Recreate dst schema from CQL") - WriteDataSecondClusterSchema(t, dstSession, keyspace, 0, 0, BigTableName) + WriteData(t, dstSession, keyspace, 0, BigTableName) createBigTableViews(t, dstSession, keyspace, BigTableName, mvName, siName) } diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index ad4576314..1d536c24a 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/pkg/errors" - "github.com/scylladb/go-set/strset" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" @@ -82,11 +81,10 @@ func newTablesWorker(ctx context.Context, w worker, repairSvc *repair.Service, t } } - hostsS := strset.New() - for _, h := range w.target.locationHosts { - hostsS.Add(h...) + var hosts []string + for _, l := range w.target.locationInfo { + hosts = append(hosts, l.AllHosts()...) } - hosts := hostsS.List() hostToShard, err := w.client.HostsShardCount(ctx, hosts) if err != nil { @@ -181,7 +179,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "Started restoring tables") defer w.logger.Info(ctx, "Restoring tables finished") - workload, err := w.IndexWorkload(ctx, w.target.Location) + workload, err := w.IndexWorkload(ctx, w.target.locationInfo) if err != nil { return err } @@ -213,7 +211,7 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { } } - bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationHosts) + bd := newBatchDispatcher(workload, w.target.BatchSize, w.hostShardCnt, w.target.locationInfo) f := func(n int) error { host := w.hosts[n] diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 7a7c23abc..c451df89d 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -6,7 +6,7 @@ import ( "context" "encoding/json" "fmt" - "math/rand" + "maps" "path" "regexp" "slices" @@ -44,37 +44,146 @@ type worker struct { clusterSession gocqlx.Session } -func (w *worker) randomHostFromLocation(loc Location) string { - hosts, ok := w.target.locationHosts[loc] - if !ok { - panic("no hosts for location: " + loc.String()) - } - return hosts[rand.Intn(len(hosts))] -} - func (w *worker) init(ctx context.Context, properties json.RawMessage) error { - if err := w.initTarget(ctx, properties); err != nil { + target, err := parseTarget(properties) + if err != nil { + return err + } + locationInfo, err := w.getLocationInfo(ctx, target) + if err != nil { + return err + } + if err := w.initTarget(ctx, target, locationInfo); err != nil { return errors.Wrap(err, "init target") } - if err := w.initUnits(ctx); err != nil { + if err := w.decorateWithPrevRun(ctx); err != nil { + return errors.Wrap(err, "get prev run") + } + if w.run.Units != nil { + return nil + } + // Cache must be initialised only once (even with continue=false), as it contains information already lost + // in the cluster (e.g. tombstone_gc mode, views definition, etc). + if err := w.initUnits(ctx, w.target.locationInfo); err != nil { return errors.Wrap(err, "init units") } return errors.Wrap(w.initViews(ctx), "init views") } -func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) error { - t := defaultTarget() - if err := json.Unmarshal(properties, &t); err != nil { - return err +func (w *worker) getLocationInfo(ctx context.Context, target Target) ([]LocationInfo, error) { + var result []LocationInfo + + nodeStatus, err := w.client.Status(ctx) + if err != nil { + return nil, errors.Wrap(err, "get status") + } + + sourceDC2TargetDCMap, targetDC2SourceDCMap := target.DCMappings.calculateMappings() + + for _, l := range target.Location { + nodes, err := w.getNodesWithAccess(ctx, nodeStatus, l, len(target.DCMappings) > 0) + if err != nil { + return nil, errors.Wrap(err, "getNodesWithAccess") + } + + manifests, err := w.getManifestInfo(ctx, nodes[0].Addr, l, target.SnapshotTag) + if err != nil { + return nil, errors.Wrap(err, "getManifestInfo") + } + if len(manifests) == 0 { + return nil, errors.Errorf("no snapshot with tag %s", target.SnapshotTag) + } + + manifests = filterManifests(manifests, sourceDC2TargetDCMap) + locationDCs := collectDCsFromManifests(manifests) + dcHosts := hostsByDC(nodes, targetDC2SourceDCMap, locationDCs) + + result = append(result, LocationInfo{ + DC: locationDCs, + DCHosts: dcHosts, + Manifest: manifests, + Location: l, + }) + } + + return result, nil +} + +func (w *worker) getNodesWithAccess(ctx context.Context, nodeStatus scyllaclient.NodeStatusInfoSlice, loc Location, useLocationDC bool) (scyllaclient.NodeStatusInfoSlice, error) { + if useLocationDC && loc.Datacenter() != "" { + nodeStatus = nodeStatus.Datacenter([]string{loc.Datacenter()}) + } + + nodes, err := w.client.GetNodesWithLocationAccess(ctx, nodeStatus, loc.RemotePath("")) + if err != nil { + if strings.Contains(err.Error(), "NoSuchBucket") { + return nil, errors.Errorf("specified bucket does not exist: %s", loc) + } + return nil, errors.Wrapf(err, "location %s is not accessible", loc) + } + if len(nodes) == 0 { + return nil, fmt.Errorf("no nodes with location %s access", loc) + } + return nodes, nil +} + +// hostsByDC creates map of which hosts are responsible for which DC, also applies DCMappings if available. +func hostsByDC(nodes scyllaclient.NodeStatusInfoSlice, targetDC2SourceDCMap map[string]string, locationDCs []string) map[string][]string { + dc2HostsMap := map[string][]string{} + // when --dc-mapping is not set all nodes with access to the location can handle all DCs from it + if len(targetDC2SourceDCMap) == 0 { + var hosts []string + for _, node := range nodes { + hosts = append(hosts, node.Addr) + } + for _, dc := range locationDCs { + dc2HostsMap[dc] = hosts + } + return dc2HostsMap + } + // when --dc-mapping is set, nodes can handle DCs only accordingly to mappings + for _, n := range nodes { + sourceDC, ok := targetDC2SourceDCMap[n.Datacenter] + if !ok { + continue + } + if !slices.Contains(locationDCs, sourceDC) { + continue + } + dc2HostsMap[sourceDC] = append(dc2HostsMap[sourceDC], n.Addr) + } + return dc2HostsMap +} + +func collectDCsFromManifests(manifests []*ManifestInfo) []string { + dcs := strset.New() + for _, m := range manifests { + dcs.Add(m.DC) + } + return dcs.List() +} + +// keep only manifests that have dc mapping. if --dc-mapping is not set it will return all manifests. +func filterManifests(manifests []*ManifestInfo, sourceDC2TargetDCMap map[string]string) []*ManifestInfo { + if len(sourceDC2TargetDCMap) == 0 { + return manifests + } + var result []*ManifestInfo + for _, m := range manifests { + _, ok := sourceDC2TargetDCMap[m.DC] + if !ok { + continue + } + result = append(result, m) } + return result +} +func (w *worker) initTarget(ctx context.Context, t Target, locationInfo []LocationInfo) error { dcMap, err := w.client.Datacenters(ctx) if err != nil { return errors.Wrap(err, "get data centers") } - if err := t.validateProperties(dcMap); err != nil { - return err - } if t.Keyspace == nil { t.Keyspace = []string{"*"} @@ -91,64 +200,18 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err t.Keyspace = append(t.Keyspace, notRestored...) } - status, err := w.client.Status(ctx) - if err != nil { - return errors.Wrap(err, "get status") - } - // All nodes should be up during restore if err := w.client.VerifyNodesAvailability(ctx); err != nil { return errors.Wrap(err, "verify all nodes availability") } - allLocations := strset.New() - locationHosts := make(map[Location][]string) - for _, l := range t.Location { - p := l.RemotePath("") - if allLocations.Has(p) { - return errors.Errorf("location %s is specified multiple times", l) - } - allLocations.Add(p) - - var ( - remotePath = l.RemotePath("") - locationStatus = status - ) - - if l.DC == "" { - w.logger.Info(ctx, "No datacenter specified for location - using all nodes for this location", "location", l) - } else { - locationStatus = status.Datacenter([]string{l.DC}) - } - - nodes, err := w.client.GetNodesWithLocationAccess(ctx, locationStatus, remotePath) - if err != nil { - if strings.Contains(err.Error(), "NoSuchBucket") { - return errors.Errorf("specified bucket does not exist: %s", l) - } - return errors.Wrapf(err, "location %s is not accessible", l) - } - if len(nodes) == 0 { - return fmt.Errorf("no nodes with location %s access", l) - } - - var hosts []string - for _, n := range nodes { - hosts = append(hosts, n.Addr) - } - - w.logger.Info(ctx, "Found hosts with location access", "location", l, "hosts", hosts) - locationHosts[l] = hosts - } - t.locationHosts = locationHosts - t.sortLocations() - + t.locationInfo = locationInfo w.target = t w.run.SnapshotTag = t.SnapshotTag if t.RestoreSchema { w.logger.Info(ctx, "Look for schema file") - w.describedSchema, err = getDescribedSchema(ctx, w.client, t.SnapshotTag, locationHosts) + w.describedSchema, err = getDescribedSchema(ctx, w.client, t.SnapshotTag, t.locationInfo) if err != nil { return errors.Wrap(err, "look for schema file") } @@ -161,12 +224,56 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err } else { w.logger.Info(ctx, "Found schema file") } + return nil + } + + if len(t.DCMappings) > 0 { + sourceDC := strset.New() + for _, locInfo := range locationInfo { + sourceDC.Add(locInfo.DC...) + } + targetDC := slices.Collect(maps.Keys(dcMap)) + if err := w.validateDCMappings(t.DCMappings, sourceDC.List(), targetDC); err != nil { + w.logger.Debug(ctx, + "Validate dc mapping", + "source_dc", sourceDC, + "target_dc", targetDC, + "mappings", t.DCMappings, + ) + return err + } } w.logger.Info(ctx, "Initialized target", "target", t) return nil } +// validateDCMappings that every dc from mappings exists in source or target cluster respectevely. +func (w *worker) validateDCMappings(mappings DCMappings, sourceDC, targetDC []string) error { + sourceDCSet := strset.New(sourceDC...) + targetDCSet := strset.New(targetDC...) + sourceDCMappingSet, targetDCMappingSet := strset.New(), strset.New() + for _, m := range mappings { + if !sourceDCSet.Has(m.Source) { + return errors.Errorf("No such dc in source cluster: %s", m.Source) + } + if !targetDCSet.Has(m.Target) { + return errors.Errorf("No such dc in target cluster: %s", m.Target) + } + + if sourceDCMappingSet.Has(m.Source) { + return errors.Errorf("DC mapping contains duplicates in source DCs: %s", m.Source) + } + sourceDCMappingSet.Add(m.Source) + + if targetDCMappingSet.Has(m.Target) { + return errors.Errorf("DC mapping contains duplicates in target DCs: %s", m.Target) + } + targetDCMappingSet.Add(m.Target) + } + return nil +} + func skipRestorePatterns(ctx context.Context, client *scyllaclient.Client, session gocqlx.Session) ([]string, error) { keyspaces, err := client.KeyspacesByType(ctx) if err != nil { @@ -335,14 +442,14 @@ func IsRestoreAuthAndServiceLevelsFromSStablesSupported(ctx context.Context, cli } // initUnits should be called with already initialized target. -func (w *worker) initUnits(ctx context.Context) error { +func (w *worker) initUnits(ctx context.Context, locationInfo []LocationInfo) error { var ( units []Unit unitMap = make(map[string]Unit) ) var foundManifest bool - for _, l := range w.target.Location { + for _, l := range locationInfo { manifestHandler := func(miwc ManifestInfoWithContent) error { foundManifest = true diff --git a/pkg/service/restore/worker_manifest.go b/pkg/service/restore/worker_manifest.go index 1ec2fb092..48d76a9db 100644 --- a/pkg/service/restore/worker_manifest.go +++ b/pkg/service/restore/worker_manifest.go @@ -4,50 +4,25 @@ package restore import ( "context" - "fmt" "path" "sort" - "github.com/pkg/errors" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" - "github.com/scylladb/scylla-manager/v3/pkg/util/slice" "go.uber.org/multierr" ) -func (w *worker) forEachManifest(ctx context.Context, location Location, f func(ManifestInfoWithContent) error) error { - closest := w.client.Config().Hosts - hosts, ok := w.target.locationHosts[location] - if !ok { - return fmt.Errorf("no hosts for location %s", location) - } - - var host string - for _, h := range closest { - if slice.ContainsString(hosts, h) { - host = h - break - } - } - if host == "" { - host = hosts[0] - } - - manifests, err := w.getManifestInfo(ctx, host, location) - if err != nil { - return errors.Wrap(err, "list manifests") - } - +func (w *worker) forEachManifest(ctx context.Context, location LocationInfo, f func(ManifestInfoWithContent) error) error { // Load manifest content load := func(c *ManifestContentWithIndex, m *ManifestInfo) error { - r, err := w.client.RcloneOpen(ctx, host, m.Location.RemotePath(m.Path())) + r, err := w.client.RcloneOpen(ctx, location.AnyHost(), m.Location.RemotePath(m.Path())) if err != nil { return err } return multierr.Append(c.Read(r), r.Close()) } - for _, m := range manifests { + for _, m := range location.Manifest { c := new(ManifestContentWithIndex) if err := load(c, m); err != nil { return err @@ -65,7 +40,7 @@ func (w *worker) forEachManifest(ctx context.Context, location Location, f func( } // getManifestInfo returns manifests with receiver's snapshot tag for all nodes in the location. -func (w *worker) getManifestInfo(ctx context.Context, host string, location Location) ([]*ManifestInfo, error) { +func (w *worker) getManifestInfo(ctx context.Context, host string, location Location, snapshotTag string) ([]*ManifestInfo, error) { baseDir := path.Join("backup", string(MetaDirKind)) opts := scyllaclient.RcloneListDirOpts{ FilesOnly: true, @@ -79,7 +54,7 @@ func (w *worker) getManifestInfo(ctx context.Context, host string, location Loca return } m.Location = location - if m.SnapshotTag == w.run.SnapshotTag { + if m.SnapshotTag == snapshotTag { manifests = append(manifests, m) } }) diff --git a/pkg/service/restore/worker_test.go b/pkg/service/restore/worker_test.go new file mode 100644 index 000000000..0bb83bddb --- /dev/null +++ b/pkg/service/restore/worker_test.go @@ -0,0 +1,181 @@ +// Copyright (C) 2025 ScyllaDB +package restore + +import ( + "testing" + + gocmp "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" +) + +func TestValidateDCMappings(t *testing.T) { + testCases := []struct { + name string + sourceDC []string + targetDC []string + dcMappings DCMappings + + expectedErr bool + }{ + { + name: "sourceDC != targetDC, but with full mapping", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc2"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc2"}, + }, + expectedErr: false, + }, + { + name: "source != target, but will full mapping, two dcs per cluster", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc3", "dc4"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc3"}, + {Source: "dc2", Target: "dc4"}, + }, + expectedErr: false, + }, + { + name: "DC mappings has unknown source dc", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc3", "dc4"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc3"}, + {Source: "dc0", Target: "dc4"}, + }, + expectedErr: true, + }, + { + name: "DC mappings has unknown target dc", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc3", "dc4"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc3"}, + {Source: "dc2", Target: "dc5"}, + }, + expectedErr: true, + }, + { + name: "Squeezing DCs is not supported", + sourceDC: []string{"dc1", "dc2"}, + targetDC: []string{"dc1"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc1"}, + {Source: "dc2", Target: "dc1"}, + }, + expectedErr: true, + }, + { + name: "Expanding DCs is not supported", + sourceDC: []string{"dc1"}, + targetDC: []string{"dc1", "dc2"}, + dcMappings: []DCMapping{ + {Source: "dc1", Target: "dc1"}, + {Source: "dc1", Target: "dc2"}, + }, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := &worker{} + err := w.validateDCMappings(tc.dcMappings, tc.sourceDC, tc.targetDC) + if tc.expectedErr && err == nil { + t.Fatalf("Expected err, but got nil") + } + if !tc.expectedErr && err != nil { + t.Fatalf("Unexpected err: %v", err) + } + }) + } +} + +func TestHostsByDC(t *testing.T) { + testCases := []struct { + name string + nodes scyllaclient.NodeStatusInfoSlice + targetDC2SourceDCMap map[string]string + locationDCs []string + + expected map[string][]string + }{ + { + name: "When --dc-mapping is provided will all DCs", + nodes: scyllaclient.NodeStatusInfoSlice{ + {Addr: "n1", Datacenter: "target_dc1"}, + {Addr: "n2", Datacenter: "target_dc1"}, + {Addr: "n3", Datacenter: "target_dc2"}, + {Addr: "n4", Datacenter: "target_dc2"}, + }, + targetDC2SourceDCMap: map[string]string{ + "target_dc1": "source_dc1", + "target_dc2": "source_dc2", + }, + locationDCs: []string{"source_dc1", "source_dc2"}, + + expected: map[string][]string{ + "source_dc1": {"n1", "n2"}, + "source_dc2": {"n3", "n4"}, + }, + }, + { + name: "When --dc-mapping is provided will some DCs", + nodes: scyllaclient.NodeStatusInfoSlice{ + {Addr: "n1", Datacenter: "target_dc1"}, + {Addr: "n2", Datacenter: "target_dc1"}, + {Addr: "n3", Datacenter: "target_dc2"}, + {Addr: "n4", Datacenter: "target_dc2"}, + }, + targetDC2SourceDCMap: map[string]string{ + "target_dc1": "source_dc1", + }, + locationDCs: []string{"source_dc1", "source_dc2"}, + + expected: map[string][]string{ + "source_dc1": {"n1", "n2"}, + }, + }, + { + name: "When --dc-mapping is empty and location contains one DC", + nodes: scyllaclient.NodeStatusInfoSlice{ + {Addr: "n1", Datacenter: "target_dc1"}, + {Addr: "n2", Datacenter: "target_dc1"}, + {Addr: "n3", Datacenter: "target_dc2"}, + {Addr: "n4", Datacenter: "target_dc2"}, + }, + targetDC2SourceDCMap: map[string]string{}, + locationDCs: []string{"source_dc1"}, + + expected: map[string][]string{ + "source_dc1": {"n1", "n2", "n3", "n4"}, + }, + }, + { + name: "When --dc-mapping is empty and location contains multiple DCs", + nodes: scyllaclient.NodeStatusInfoSlice{ + {Addr: "n1", Datacenter: "target_dc1"}, + {Addr: "n2", Datacenter: "target_dc1"}, + {Addr: "n3", Datacenter: "target_dc2"}, + {Addr: "n4", Datacenter: "target_dc2"}, + }, + targetDC2SourceDCMap: map[string]string{}, + locationDCs: []string{"source_dc1", "source_dc2"}, + + expected: map[string][]string{ + "source_dc1": {"n1", "n2", "n3", "n4"}, + "source_dc2": {"n1", "n2", "n3", "n4"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := hostsByDC(tc.nodes, tc.targetDC2SourceDCMap, tc.locationDCs) + if diff := gocmp.Diff(actual, tc.expected); diff != "" { + t.Fatalf("Unexpected result: %v", diff) + } + }) + } +} diff --git a/testing/nodes_exec b/testing/nodes_exec index 86afaaa22..0945e1692 100755 --- a/testing/nodes_exec +++ b/testing/nodes_exec @@ -21,4 +21,4 @@ for name in $(docker ps -f name=dc1_node -f name=dc2_node --format {{.Names}}); echo "> ${name}" docker exec ${name} bash -c "$*" fi -done \ No newline at end of file +done diff --git a/testing/scylla/config/scylla.yaml b/testing/scylla/config/scylla.yaml index c9666d39f..d07a06d30 100644 --- a/testing/scylla/config/scylla.yaml +++ b/testing/scylla/config/scylla.yaml @@ -645,4 +645,4 @@ api_doc_dir: /usr/lib/scylla/api/api-doc/ alternator_port: 8000 alternator_write_isolation: only_rmw_uses_lwt alternator_enforce_authorization: true -enable_ipv6_dns_lookup: true \ No newline at end of file +enable_ipv6_dns_lookup: true