Skip to content

Commit

Permalink
feat(backup): add Transfers to Target
Browse files Browse the repository at this point in the history
This is the first step to control transfers in the context of backup.
  • Loading branch information
Michal-Leszczynski committed Oct 8, 2024
1 parent 4ce2f63 commit eb6faeb
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 32 deletions.
3 changes: 2 additions & 1 deletion pkg/service/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func checkAllDCsCovered(locations []Location, dcs []string) error {
return nil
}

func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit) ([]hostInfo, error) {
func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit, transfers int) ([]hostInfo, error) {
// DC location index
dcl := map[string]Location{}
for _, l := range locations {
Expand Down Expand Up @@ -90,6 +90,7 @@ func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rat
if !ok {
hi[i].RateLimit = dcr[""] // no rate limit is ok, fallback to 0 - no limit
}
hi[i].Transfers = transfers
}

return hi, errs
Expand Down
3 changes: 3 additions & 0 deletions pkg/service/backup/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (s *Service) InitTarget(ctx context.Context, clusterID uuid.UUID, target *T

// Get live nodes
target.liveNodes, err = s.getLiveNodes(ctx, client, target.DC)
if target.Transfers == 0 {
target.Transfers = 2
}
return err
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/service/backup/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func listManifestsInAllLocations(ctx context.Context, client *scyllaclient.Clien
manifests []*ManifestInfo
)

for _, hi := range hosts {
if _, ok := locations[hi.Location]; ok {
for i := range hosts {
if _, ok := locations[hosts[i].Location]; ok {
continue
}
locations[hi.Location] = struct{}{}
locations[hosts[i].Location] = struct{}{}

lm, err := listManifests(ctx, client, hi.IP, hi.Location, clusterID)
lm, err := listManifests(ctx, client, hosts[i].IP, hosts[i].Location, clusterID)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/service/backup/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Target struct {
RetentionDays int `json:"retention_days"`
RetentionMap RetentionMap `json:"-"` // policy for all tasks, injected in runtime
RateLimit []DCLimit `json:"rate_limit,omitempty"`
Transfers int `json:"transfers"`
SnapshotParallel []DCLimit `json:"snapshot_parallel,omitempty"`
UploadParallel []DCLimit `json:"upload_parallel,omitempty"`
Continue bool `json:"continue,omitempty"`
Expand Down Expand Up @@ -250,6 +251,7 @@ type taskProperties struct {
RetentionDays *int `json:"retention_days"`
RetentionMap RetentionMap `json:"retention_map"`
RateLimit []DCLimit `json:"rate_limit"`
Transfers int `json:"transfers"`
SnapshotParallel []DCLimit `json:"snapshot_parallel"`
UploadParallel []DCLimit `json:"upload_parallel"`
Continue bool `json:"continue"`
Expand All @@ -264,6 +266,9 @@ func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error
if policy := p.extractRetention(); policy.Retention < 0 || policy.RetentionDays < 0 {
return errors.New("negative retention")
}
if p.Transfers < 1 {
return errors.New("transfers param has to be greater than zero")
}

// Validate location DCs
if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil {
Expand Down Expand Up @@ -310,6 +315,7 @@ func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Clien
RetentionDays: policy.RetentionDays,
RetentionMap: p.RetentionMap,
RateLimit: rateLimit,
Transfers: p.Transfers,
SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs),
UploadParallel: filterDCLimits(p.UploadParallel, dcs),
Continue: p.Continue,
Expand Down Expand Up @@ -417,7 +423,8 @@ func (p taskProperties) extractRetention() RetentionPolicy {

func defaultTaskProperties() taskProperties {
return taskProperties{
Continue: true,
Transfers: 2,
Continue: true,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/service/backup/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit {
}

m := make(map[string]hostsLimit, len(dcLimit)+1)
for _, h := range hosts {
dc := h.DC
for i := range hosts {
dc := hosts[i].DC
// If DC has no limit put host under an empty DC
if _, ok := dcLimit[dc]; !ok {
dc = ""
Expand All @@ -42,14 +42,14 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit {
v, ok := m[dc]
if !ok {
v = hostsLimit{}
v.hosts = []hostInfo{h}
v.hosts = []hostInfo{hosts[i]}
if dc == "" {
v.limit = globalLimit
} else {
v.limit = dcLimit[dc]
}
} else {
v.hosts = append(v.hosts, h)
v.hosts = append(v.hosts, hosts[i])
}
m[dc] = v
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ func (s *Service) forEachManifest(ctx context.Context, clusterID uuid.UUID, loca
return errors.Wrap(err, "resolve hosts")
}
locationHost := map[Location]string{}
for _, h := range hosts {
locationHost[h.Location] = h.IP
for i := range hosts {
locationHost[hosts[i].Location] = hosts[i].IP
}

manifests, err := listManifestsInAllLocations(ctx, client, hosts, filter.ClusterID)
Expand Down Expand Up @@ -657,7 +657,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
}

// Create hostInfo for run hosts
hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit)
hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit, target.Transfers)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/service/backup/testdata/get_target/continue.golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,6 @@
"retention": 3,
"rate_limit": [
"100"
]
],
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@
"rate_limit": [
"100"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@
"rate_limit": [
"0"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@
"1000",
"dc1:100"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@
"10",
"dc1:20"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@
"10",
"dc1:20"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@
"rate_limit": [
"100"
],
"continue": true
"continue": true,
"transfers": 2
}
3 changes: 2 additions & 1 deletion pkg/service/backup/testdata/get_target/filter_dc.golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@
"rate_limit": [
"100"
],
"continue": true
"continue": true,
"transfers": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@
"rate_limit": [
"100"
],
"continue": true
"continue": true,
"transfers": 2
}
8 changes: 4 additions & 4 deletions pkg/service/backup/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU
}
}

hosts, err := makeHostInfo(target.liveNodes, target.Location, nil)
hosts, err := makeHostInfo(target.liveNodes, target.Location, nil, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -176,9 +176,9 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU
p := newPurger(client, h.IP, log.NopLogger)

hostForNodeID := func() string {
for _, h := range hosts {
if h.ID == nodeID {
return h.IP
for i := range hosts {
if hosts[i].ID == nodeID {
return hosts[i].IP
}
}
if host := p.Host(nodeID); host != "" {
Expand Down
1 change: 1 addition & 0 deletions pkg/service/backup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type hostInfo struct {
ID string
Location Location
RateLimit DCLimit
Transfers int
}

func (h hostInfo) String() string {
Expand Down
12 changes: 6 additions & 6 deletions pkg/service/backup/worker_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

func (w *worker) DumpSchema(ctx context.Context, hi []hostInfo, sessionFunc cluster.SessionFunc) error {
var hosts []string
for _, h := range hi {
hosts = append(hosts, h.IP)
for i := range hi {
hosts = append(hosts, hi[i].IP)
}

descSchemaHosts, err := backupAndRestoreFromDescSchemaHosts(ctx, w.Client, hosts)
Expand Down Expand Up @@ -105,12 +105,12 @@ func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError

// Select single host per location
locations := map[string]hostInfo{}
for _, hi := range hosts {
locations[hi.Location.String()] = hi
for i := range hosts {
locations[hosts[i].Location.String()] = hosts[i]
}
hostPerLocation := make([]hostInfo, 0, len(locations))
for _, hi := range locations {
hostPerLocation = append(hostPerLocation, hi)
for l := range locations {
hostPerLocation = append(hostPerLocation, locations[l])
}

f := func(h hostInfo) error {
Expand Down

0 comments on commit eb6faeb

Please sign in to comment.