Skip to content

Commit

Permalink
feat(restore_test): extend TestRestoreTablesPreparationIntegration wi…
Browse files Browse the repository at this point in the history
…th cpu pinning

This way this test also checks cpu pinning before and after backup.
It also checks cpu pinning before, in the middle, when paused,
when resumed, and after restore.
  • Loading branch information
Michal-Leszczynski committed Oct 17, 2024
1 parent 85ab15d commit 9793daa
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 19 deletions.
69 changes: 50 additions & 19 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"runtime"
"slices"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -498,7 +500,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) {
validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int, cpus []int64) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
Expand Down Expand Up @@ -537,6 +539,24 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host)
}
}
// Validate cpu pinning
for _, host := range ch.Client.Config().Hosts {
got, err := ch.Client.GetPinnedCPU(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "check transfers on host %s", host))
}
slices.Sort(cpus)
slices.Sort(got)
if !slices.Equal(cpus, got) {
t.Errorf("expected cpus=%v, got=%v on host %s", cpus, got, host)
}
}
}

pinnedCPU := []int64{0} // Taken from scylla-manager-agent.yaml used for testing
unpinnedCPU := make([]int64, runtime.NumCPU())
for i := range unpinnedCPU {
unpinnedCPU[i] = int64(i)
}

shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost())
Expand All @@ -545,7 +565,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}
transfers0 := 2 * int(shardCnt)

setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) {
setTransfersAndRateLimitAndPinnedCPU := func(ch clusterHelper, transfers int, rateLimit int, pin bool) {
for _, host := range ch.Client.Config().Hosts {
err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers)
if err != nil {
Expand All @@ -555,15 +575,26 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
if err != nil {
t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host))
}
if pin {
err = ch.Client.PinCPU(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "pin CPUs on host %s", host))
}
} else {
err = ch.Client.UnpinFromCPU(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "unpin CPUs on host %s", host))
}
}
}
}

Print("Set initial transfers and rate limit")
setTransfersAndRateLimit(h.srcCluster, 10, 99)
setTransfersAndRateLimit(h.dstCluster, 10, 99)
setTransfersAndRateLimitAndPinnedCPU(h.srcCluster, 10, 99, true)
setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 10, 99, true)

Print("Validate state before backup")
validateState(h.srcCluster, "repair", true, 10, 99)
validateState(h.srcCluster, "repair", true, 10, 99, pinnedCPU)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
Expand All @@ -577,18 +608,19 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
})

Print("Validate state after backup")
validateState(h.srcCluster, "repair", true, 3, 88)
validateState(h.srcCluster, "repair", true, 3, 88, pinnedCPU)

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(map[string]any{
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"transfers": 0,
"rate_limit": []string{"0"},
"restore_tables": true,
"location": loc,
"keyspace": ksFilter,
"snapshot_tag": tag,
"transfers": 0,
"rate_limit": []string{"0"},
"unpin_agent_cpu": true,
"restore_tables": true,
})
if err != nil {
finishedRestore <- err
Expand Down Expand Up @@ -621,7 +653,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
makeLASHang(reachedDataStageChan, hangLAS)

Print("Validate state before restore")
validateState(h.dstCluster, "repair", true, 10, 99)
validateState(h.dstCluster, "repair", true, 10, 99, pinnedCPU)

Print("Run restore")
finishedRestore := make(chan error)
Expand All @@ -636,7 +668,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state during restore data")
validateState(h.dstCluster, "disabled", false, transfers0, 0)
validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU)

Print("Pause restore")
restoreCancel()
Expand All @@ -651,11 +683,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state during pause")
validateState(h.dstCluster, "disabled", true, transfers0, 0)
validateState(h.dstCluster, "disabled", true, transfers0, 0, pinnedCPU)

Print("Change transfers and rate limit during pause")
setTransfersAndRateLimit(h.srcCluster, 9, 55)
setTransfersAndRateLimit(h.dstCluster, 9, 55)
setTransfersAndRateLimitAndPinnedCPU(h.dstCluster, 9, 55, false)

reachedDataStageChan = make(chan struct{})
hangLAS = make(chan struct{})
Expand All @@ -674,7 +705,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state during restore data after pause")
validateState(h.dstCluster, "disabled", false, transfers0, 0)
validateState(h.dstCluster, "disabled", false, transfers0, 0, unpinnedCPU)

Print("Release LAS")
close(hangLAS)
Expand All @@ -686,7 +717,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state after restore success")
validateState(h.dstCluster, "repair", true, transfers0, 0)
validateState(h.dstCluster, "repair", true, transfers0, 0, pinnedCPU)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down
2 changes: 2 additions & 0 deletions testing/scylla-manager-agent/scylla-manager-agent.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
auth_token: token

cpu: 0

debug: :5112

logger:
Expand Down

0 comments on commit 9793daa

Please sign in to comment.