diff --git a/pkg/service/backup/backup.go b/pkg/service/backup/backup.go index e4bc54097..074b04575 100644 --- a/pkg/service/backup/backup.go +++ b/pkg/service/backup/backup.go @@ -56,7 +56,7 @@ func checkAllDCsCovered(locations []Location, dcs []string) error { return nil } -func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit) ([]hostInfo, error) { +func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit, transfers int) ([]hostInfo, error) { // DC location index dcl := map[string]Location{} for _, l := range locations { @@ -90,6 +90,7 @@ func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rat if !ok { hi[i].RateLimit = dcr[""] // no rate limit is ok, fallback to 0 - no limit } + hi[i].Transfers = transfers } return hi, errs diff --git a/pkg/service/backup/export_test.go b/pkg/service/backup/export_test.go index 699294090..02198f47c 100644 --- a/pkg/service/backup/export_test.go +++ b/pkg/service/backup/export_test.go @@ -35,6 +35,9 @@ func (s *Service) InitTarget(ctx context.Context, clusterID uuid.UUID, target *T // Get live nodes target.liveNodes, err = s.getLiveNodes(ctx, client, target.DC) + if target.Transfers == 0 { + target.Transfers = 2 + } return err } diff --git a/pkg/service/backup/list.go b/pkg/service/backup/list.go index b06885fbb..a059dd168 100644 --- a/pkg/service/backup/list.go +++ b/pkg/service/backup/list.go @@ -22,13 +22,13 @@ func listManifestsInAllLocations(ctx context.Context, client *scyllaclient.Clien manifests []*ManifestInfo ) - for _, hi := range hosts { - if _, ok := locations[hi.Location]; ok { + for i := range hosts { + if _, ok := locations[hosts[i].Location]; ok { continue } - locations[hi.Location] = struct{}{} + locations[hosts[i].Location] = struct{}{} - lm, err := listManifests(ctx, client, hi.IP, hi.Location, clusterID) + lm, err := listManifests(ctx, client, hosts[i].IP, hosts[i].Location, clusterID) if err != nil { return nil, err } diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index a8702123c..22bf048f7 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -51,6 +51,7 @@ type Target struct { RetentionDays int `json:"retention_days"` RetentionMap RetentionMap `json:"-"` // policy for all tasks, injected in runtime RateLimit []DCLimit `json:"rate_limit,omitempty"` + Transfers int `json:"transfers"` SnapshotParallel []DCLimit `json:"snapshot_parallel,omitempty"` UploadParallel []DCLimit `json:"upload_parallel,omitempty"` Continue bool `json:"continue,omitempty"` @@ -250,6 +251,7 @@ type taskProperties struct { RetentionDays *int `json:"retention_days"` RetentionMap RetentionMap `json:"retention_map"` RateLimit []DCLimit `json:"rate_limit"` + Transfers int `json:"transfers"` SnapshotParallel []DCLimit `json:"snapshot_parallel"` UploadParallel []DCLimit `json:"upload_parallel"` Continue bool `json:"continue"` @@ -264,6 +266,9 @@ func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error if policy := p.extractRetention(); policy.Retention < 0 || policy.RetentionDays < 0 { return errors.New("negative retention") } + if p.Transfers < 1 { + return errors.New("transfers param has to be greater than zero") + } // Validate location DCs if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil { @@ -310,6 +315,7 @@ func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Clien RetentionDays: policy.RetentionDays, RetentionMap: p.RetentionMap, RateLimit: rateLimit, + Transfers: p.Transfers, SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs), UploadParallel: filterDCLimits(p.UploadParallel, dcs), Continue: p.Continue, @@ -417,7 +423,8 @@ func (p taskProperties) extractRetention() RetentionPolicy { func defaultTaskProperties() taskProperties { return taskProperties{ - Continue: true, + Transfers: 2, + Continue: true, } } diff --git a/pkg/service/backup/parallel.go b/pkg/service/backup/parallel.go index 39b111444..c0296c3dc 100644 --- a/pkg/service/backup/parallel.go +++ b/pkg/service/backup/parallel.go @@ -32,8 +32,8 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit { } m := make(map[string]hostsLimit, len(dcLimit)+1) - for _, h := range hosts { - dc := h.DC + for i := range hosts { + dc := hosts[i].DC // If DC has no limit put host under an empty DC if _, ok := dcLimit[dc]; !ok { dc = "" @@ -42,14 +42,14 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit { v, ok := m[dc] if !ok { v = hostsLimit{} - v.hosts = []hostInfo{h} + v.hosts = []hostInfo{hosts[i]} if dc == "" { v.limit = globalLimit } else { v.limit = dcLimit[dc] } } else { - v.hosts = append(v.hosts, h) + v.hosts = append(v.hosts, hosts[i]) } m[dc] = v } diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index ac3116d42..cbb4d222c 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -484,8 +484,8 @@ func (s *Service) forEachManifest(ctx context.Context, clusterID uuid.UUID, loca return errors.Wrap(err, "resolve hosts") } locationHost := map[Location]string{} - for _, h := range hosts { - locationHost[h.Location] = h.IP + for i := range hosts { + locationHost[hosts[i].Location] = hosts[i].IP } manifests, err := listManifestsInAllLocations(ctx, client, hosts, filter.ClusterID) @@ -657,7 +657,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID } // Create hostInfo for run hosts - hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit) + hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit, target.Transfers) if err != nil { return err } diff --git a/pkg/service/backup/testdata/get_target/continue.golden.json b/pkg/service/backup/testdata/get_target/continue.golden.json index cdf7504ac..b221ca9a3 100644 --- a/pkg/service/backup/testdata/get_target/continue.golden.json +++ b/pkg/service/backup/testdata/get_target/continue.golden.json @@ -68,5 +68,6 @@ "retention": 3, "rate_limit": [ "100" - ] + ], + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_locations.golden.json b/pkg/service/backup/testdata/get_target/dc_locations.golden.json index 7f0e8c52d..3ae4be923 100644 --- a/pkg/service/backup/testdata/get_target/dc_locations.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_locations.golden.json @@ -70,5 +70,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json b/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json index ac7605cf7..819daee52 100644 --- a/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json @@ -69,5 +69,6 @@ "rate_limit": [ "0" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json b/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json index bc526693f..bd798bdc5 100644 --- a/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json @@ -70,5 +70,6 @@ "1000", "dc1:100" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json b/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json index ad8780b91..38cd13dd9 100644 --- a/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json @@ -73,5 +73,6 @@ "10", "dc1:20" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json b/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json index d0ed89e62..f5bfb460a 100644 --- a/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json @@ -73,5 +73,6 @@ "10", "dc1:20" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/everything.golden.json b/pkg/service/backup/testdata/get_target/everything.golden.json index 80479521a..941b11b53 100644 --- a/pkg/service/backup/testdata/get_target/everything.golden.json +++ b/pkg/service/backup/testdata/get_target/everything.golden.json @@ -69,5 +69,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/filter_dc.golden.json b/pkg/service/backup/testdata/get_target/filter_dc.golden.json index f9cd40371..658ab6e65 100644 --- a/pkg/service/backup/testdata/get_target/filter_dc.golden.json +++ b/pkg/service/backup/testdata/get_target/filter_dc.golden.json @@ -47,5 +47,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json b/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json index d193e9f41..50c0bdc2e 100644 --- a/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json +++ b/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json @@ -38,5 +38,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": 2 } \ No newline at end of file diff --git a/pkg/service/backup/validation.go b/pkg/service/backup/validation.go index a54765306..7274ec49c 100644 --- a/pkg/service/backup/validation.go +++ b/pkg/service/backup/validation.go @@ -147,7 +147,7 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU } } - hosts, err := makeHostInfo(target.liveNodes, target.Location, nil) + hosts, err := makeHostInfo(target.liveNodes, target.Location, nil, 0) if err != nil { return err } @@ -176,9 +176,9 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU p := newPurger(client, h.IP, log.NopLogger) hostForNodeID := func() string { - for _, h := range hosts { - if h.ID == nodeID { - return h.IP + for i := range hosts { + if hosts[i].ID == nodeID { + return hosts[i].IP } } if host := p.Host(nodeID); host != "" { diff --git a/pkg/service/backup/worker.go b/pkg/service/backup/worker.go index e597864c2..3555f8041 100644 --- a/pkg/service/backup/worker.go +++ b/pkg/service/backup/worker.go @@ -23,6 +23,7 @@ type hostInfo struct { ID string Location Location RateLimit DCLimit + Transfers int } func (h hostInfo) String() string { diff --git a/pkg/service/backup/worker_schema.go b/pkg/service/backup/worker_schema.go index 418da112c..c683bc140 100644 --- a/pkg/service/backup/worker_schema.go +++ b/pkg/service/backup/worker_schema.go @@ -26,8 +26,8 @@ import ( func (w *worker) DumpSchema(ctx context.Context, hi []hostInfo, sessionFunc cluster.SessionFunc) error { var hosts []string - for _, h := range hi { - hosts = append(hosts, h.IP) + for i := range hi { + hosts = append(hosts, hi[i].IP) } descSchemaHosts, err := backupAndRestoreFromDescSchemaHosts(ctx, w.Client, hosts) @@ -105,12 +105,12 @@ func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError // Select single host per location locations := map[string]hostInfo{} - for _, hi := range hosts { - locations[hi.Location.String()] = hi + for i := range hosts { + locations[hosts[i].Location.String()] = hosts[i] } hostPerLocation := make([]hostInfo, 0, len(locations)) - for _, hi := range locations { - hostPerLocation = append(hostPerLocation, hi) + for l := range locations { + hostPerLocation = append(hostPerLocation, locations[l]) } f := func(h hostInfo) error {