diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 318377cd58..300dc20a2d 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -10,6 +10,8 @@ import ( "encoding/json" "fmt" "net/http" + "runtime" + "slices" "strings" "sync/atomic" "testing" @@ -498,7 +500,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) - validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) { + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int, cpus []int64) { // Validate tombstone_gc mode if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) @@ -537,6 +539,24 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) } } + // Validate cpu pinning + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.GetPinnedCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + slices.Sort(cpus) + slices.Sort(got) + if !slices.Equal(cpus, got) { + t.Errorf("expected cpus=%v, got=%v on host %s", cpus, got, host) + } + } + } + + pinnedCPU := []int64{0} // Taken from scylla-manager-agent.yaml used for testing + unpinnedCPU := make([]int64, runtime.NumCPU()) + for i := range unpinnedCPU { + unpinnedCPU[i] = int64(i) } shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) @@ -545,7 +565,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } transfers0 := 2 * int(shardCnt) - setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) { + setTransfersAndRateLimitAndPinnedCPU := func(ch clusterHelper, transfers int, rateLimit int, pin bool) { for _, host := range ch.Client.Config().Hosts { err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers) if err != nil { @@ -555,15 +575,26 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { if err != nil { t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host)) } + if pin { + err = ch.Client.PinCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "pin CPUs on host %s", host)) + } + } else { + err = ch.Client.UnpinFromCPU(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "unpin CPUs on host %s", host)) + } + } } } Print("Set initial transfers and rate limit") - setTransfersAndRateLimit(h.srcCluster, 10, 99) - setTransfersAndRateLimit(h.dstCluster, 10, 99) + setTransfersAndRateLimitAndPinnedCPU(h.srcCluster, 10, 99, true) + setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 10, 99, true) Print("Validate state before backup") - validateState(h.srcCluster, "repair", true, 10, 99) + validateState(h.srcCluster, "repair", true, 10, 99, pinnedCPU) Print("Run backup") loc := []Location{testLocation("preparation", "")} @@ -577,18 +608,19 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { }) Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3, 88) + validateState(h.srcCluster, "repair", true, 3, 88, pinnedCPU) runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() rawProps, err := json.Marshal(map[string]any{ - "location": loc, - "keyspace": ksFilter, - "snapshot_tag": tag, - "transfers": 0, - "rate_limit": []string{"0"}, - "restore_tables": true, + "location": loc, + "keyspace": ksFilter, + "snapshot_tag": tag, + "transfers": 0, + "rate_limit": []string{"0"}, + "unpin_agent_cpu": true, + "restore_tables": true, }) if err != nil { finishedRestore <- err @@ -621,7 +653,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { makeLASHang(reachedDataStageChan, hangLAS) Print("Validate state before restore") - validateState(h.dstCluster, "repair", true, 10, 99) + validateState(h.dstCluster, "repair", true, 10, 99, pinnedCPU) Print("Run restore") finishedRestore := make(chan error) @@ -636,7 +668,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during restore data") - validateState(h.dstCluster, "disabled", false, transfers0, 0) + validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU) Print("Pause restore") restoreCancel() @@ -651,11 +683,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during pause") - validateState(h.dstCluster, "disabled", true, transfers0, 0) + validateState(h.dstCluster, "disabled", true, transfers0, 0, pinnedCPU) Print("Change transfers and rate limit during pause") - setTransfersAndRateLimit(h.srcCluster, 9, 55) - setTransfersAndRateLimit(h.dstCluster, 9, 55) + setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 9, 55, false) reachedDataStageChan = make(chan struct{}) hangLAS = make(chan struct{}) @@ -674,7 +705,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during restore data after pause") - validateState(h.dstCluster, "disabled", false, transfers0, 0) + validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU) Print("Release LAS") close(hangLAS) @@ -686,7 +717,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState(h.dstCluster, "repair", true, transfers0, 0) + validateState(h.dstCluster, "repair", true, transfers0, 0, pinnedCPU) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) diff --git a/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml b/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml index d7ee3d4f06..2f4398284d 100644 --- a/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml +++ b/testing/scylla-manager-agent/scylla-manager-agent-ipv6.yaml @@ -1,5 +1,7 @@ auth_token: token +cpu: 0 + debug: :5112 logger: diff --git a/testing/scylla-manager-agent/scylla-manager-agent.yaml b/testing/scylla-manager-agent/scylla-manager-agent.yaml index 9075bbc833..83431b07dd 100644 --- a/testing/scylla-manager-agent/scylla-manager-agent.yaml +++ b/testing/scylla-manager-agent/scylla-manager-agent.yaml @@ -1,5 +1,7 @@ auth_token: token +cpu: 0 + debug: :5112 logger: