diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index d233b668f..cf81acae7 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -481,7 +481,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { // Validate setup // Validate restore success - h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) + h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) Print("Keyspace setup") ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" @@ -498,15 +498,69 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + // Validate tombstone_gc mode + if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { + t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) + } + // Validate compaction + for _, host := range ch.Client.Config().Hosts { + enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) + if err != nil { + t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) + } + if compaction != enabled { + t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) + } + } + // Validate transfers + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetTransfers(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + if transfers != got { + t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) + } + } + } + + shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) + if err != nil { + t.Fatal(err) + } + transfers0 := 2 * int(shardCnt) + + // Set initial transfers + for _, host := range ManagedClusterHosts() { + err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + for _, host := range ManagedSecondClusterHosts() { + err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + + Print("Validate state before backup") + validateState(h.srcCluster, "repair", true, 10) + Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, }) + Print("Validate state after backup") + validateState(h.srcCluster, "repair", true, 3) + runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() @@ -514,6 +568,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "location": loc, "keyspace": ksFilter, "snapshot_tag": tag, + "transfers": 0, "restore_tables": true, }) if err != nil { @@ -522,44 +577,31 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } - validateState := func(tombstone string, compaction bool) { - // Validate tombstone_gc mode - if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got { - t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) - } - // Validate compaction - for _, host := range ManagedClusterHosts() { - enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) - if err != nil { - t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) - } - if compaction != enabled { - t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) - } - } - } - - makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) { + makeLASHang := func(reachedDataStageChan, hangLAS chan struct{}) { + cnt := atomic.Int64{} + cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts))) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { - if reachedDataStage.CompareAndSwap(false, true) { + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") { + if curr := cnt.Add(-1); curr == 0 { Print("Reached data stage") close(reachedDataStageChan) } - Print("Wait for copy paths to stop hanging") - <-hangCopyPaths + Print("Wait for LAS to stop hanging") + <-hangLAS } return nil, nil })) } var ( - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) + hangLAS = make(chan struct{}) ) - Print("Make copy paths hang") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + Print("Make LAS hang") + makeLASHang(reachedDataStageChan, hangLAS) + + Print("Validate state before restore") + validateState(h.dstCluster, "repair", true, 10) Print("Run restore") finishedRestore := make(chan error) @@ -570,28 +612,27 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) Print("Pause restore") restoreCancel() - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") - err := <-finishedRestore + err = <-finishedRestore if !errors.Is(err, context.Canceled) { t.Fatalf("Expected restore to be paused, got: %s", err) } Print("Validate state during pause") - validateState("disabled", true) + validateState(h.dstCluster, "disabled", true, transfers0) - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) - Print("Make copy paths hang after pause") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + hangLAS = make(chan struct{}) + Print("Make LAS hang after pause") + makeLASHang(reachedDataStageChan, hangLAS) Print("Run restore after pause") finishedRestore = make(chan error) @@ -601,10 +642,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data after pause") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") err = <-finishedRestore @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState("repair", true) + validateState(h.dstCluster, "repair", true, transfers0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})