From b60a232d0f288301b622aa5085f2b0cb79db16f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 15 Oct 2024 10:55:08 +0200 Subject: [PATCH] feat(restore_test): extend TestRestoreTablesPreparationIntegration with rate limit This way this test also checks rate limit before and after backup. It also checks transfers before, in the middle, when paused, when resumed, and after restore. This commit also extends the test to change transfers and rate limit values when restore is paused, so that it validates that they are correctly re-set during restore data stage. --- .../restore/restore_integration_test.go | 82 +++++++++++++------ 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index cf81acae7..318377cd5 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -498,7 +498,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) - validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) { // Validate tombstone_gc mode if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) @@ -523,6 +523,20 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) } } + // Validate rate limit + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + rawLimit := fmt.Sprintf("%dM", rateLimit) + if rateLimit == 0 { + rawLimit = "off" + } + if rawLimit != got { + t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) + } + } } shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) @@ -531,35 +545,39 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } transfers0 := 2 * int(shardCnt) - // Set initial transfers - for _, host := range ManagedClusterHosts() { - err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) - } - } - for _, host := range ManagedSecondClusterHosts() { - err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) { + for _, host := range ch.Client.Config().Hosts { + err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers) + if err != nil { + t.Fatal(errors.Wrapf(err, "set transfers on host %s", host)) + } + err = ch.Client.RcloneSetBandwidthLimit(context.Background(), host, rateLimit) + if err != nil { + t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host)) + } } } + Print("Set initial transfers and rate limit") + setTransfersAndRateLimit(h.srcCluster, 10, 99) + setTransfersAndRateLimit(h.dstCluster, 10, 99) + Print("Validate state before backup") - validateState(h.srcCluster, "repair", true, 10) + validateState(h.srcCluster, "repair", true, 10, 99) Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, - "transfers": 3, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, + "rate_limit": []string{"88"}, }) Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3) + validateState(h.srcCluster, "repair", true, 3, 88) runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) @@ -569,10 +587,12 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "transfers": 0, + "rate_limit": []string{"0"}, "restore_tables": true, }) if err != nil { - t.Error(err) + finishedRestore <- err + return } finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } @@ -601,7 +621,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { makeLASHang(reachedDataStageChan, hangLAS) Print("Validate state before restore") - validateState(h.dstCluster, "repair", true, 10) + validateState(h.dstCluster, "repair", true, 10, 99) Print("Run restore") finishedRestore := make(chan error) @@ -609,10 +629,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(restoreCtx, finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Pause restore") restoreCancel() @@ -627,7 +651,11 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during pause") - validateState(h.dstCluster, "disabled", true, transfers0) + validateState(h.dstCluster, "disabled", true, transfers0, 0) + + Print("Change transfers and rate limit during pause") + setTransfersAndRateLimit(h.srcCluster, 9, 55) + setTransfersAndRateLimit(h.dstCluster, 9, 55) reachedDataStageChan = make(chan struct{}) hangLAS = make(chan struct{}) @@ -639,10 +667,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(context.Background(), finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data after pause") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Release LAS") close(hangLAS) @@ -654,7 +686,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState(h.dstCluster, "repair", true, transfers0) + validateState(h.dstCluster, "repair", true, transfers0, 0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})