From 42f72f85fc345331de4e35a9021db9d1210b3e80 Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Fri, 31 Jan 2025 13:39:20 +0100 Subject: [PATCH] feat(restore): rework progress aggregation (#4225) * refactor(schema): remove unused restore_run_progress skipped column This column might be interesting for backup which performs deduplication, but it does not make any sense for restore. It wasn't even a part of restore progress API, so it's safe to remove it. * feat(schema): add restore_run_progress restored column This column wasn't needed with the sync load&stream Scylla API (restored is equal to either 0 or downloaded), but it is cleaner to have it and it can be useful for native restore Scylla API. * feat(restore): adjust Rclone and l&s restore progress update Followup to the e1127475. In the past we treated RestoredStartedAt/RestoreCompletedAt as the time frame for l&s, but we should look at it more inclusively, so as the time frame of restoring bytes, which for Rclone API + l&s approach starts with the download. * refactor(restore): separate progress aggregation from DB query Previously, the code aggregating restore progress and the code querying the run progress for aggregation were tightly tangled. This resulted in difficulties in writing unit tests and a more confusing code overall. This commit changes progress aggregation to use iterator over run progresses, which can be easily mocked in unit tests. * feat(restore): fill HostProgress RestoredBytes/Duration Followup to the e1127475. * feat(restore): allow for specifying now in progress aggregation It is important to specify now when testing. * feat(restore): add unit tests for progress aggregation Thanks to the previous commit, it is now possible to add unit tests for progress aggregation! --- pkg/schema/table/table.go | 2 +- pkg/service/restore/index.go | 9 +- pkg/service/restore/model.go | 6 +- pkg/service/restore/progress.go | 285 ++++++++---------- pkg/service/restore/progress_test.go | 185 ++++++++++++ pkg/service/restore/schema_worker.go | 19 +- pkg/service/restore/service.go | 15 +- pkg/service/restore/tablesdir_worker.go | 17 +- .../complete_restore.golden.json | 66 ++++ .../full_tab1_partial_tab2.golden.json | 66 ++++ .../full_tab1_zero_tab2.golden.json | 66 ++++ .../AggregateProgress/no_progress.golden.json | 54 ++++ .../partial_tab1_partial_tab2.golden.json | 66 ++++ .../zero_progress.golden.json | 66 ++++ pkg/service/restore/worker.go | 9 +- schema/v3.5.0.cql | 4 +- 16 files changed, 743 insertions(+), 192 deletions(-) create mode 100644 pkg/service/restore/progress_test.go create mode 100644 pkg/service/restore/testdata/AggregateProgress/complete_restore.golden.json create mode 100644 pkg/service/restore/testdata/AggregateProgress/full_tab1_partial_tab2.golden.json create mode 100644 pkg/service/restore/testdata/AggregateProgress/full_tab1_zero_tab2.golden.json create mode 100644 pkg/service/restore/testdata/AggregateProgress/no_progress.golden.json create mode 100644 pkg/service/restore/testdata/AggregateProgress/partial_tab1_partial_tab2.golden.json create mode 100644 pkg/service/restore/testdata/AggregateProgress/zero_progress.golden.json diff --git a/pkg/schema/table/table.go b/pkg/schema/table/table.go index 6163163e3..3407adb9b 100644 --- a/pkg/schema/table/table.go +++ b/pkg/schema/table/table.go @@ -220,9 +220,9 @@ var ( "manifest_path", "restore_completed_at", "restore_started_at", + "restored", "run_id", "shard_cnt", - "skipped", "sstable_id", "table_name", "task_id", diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go index 32f91dee9..6f82d84c7 100644 --- a/pkg/service/restore/index.go +++ b/pkg/service/restore/index.go @@ -122,13 +122,14 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, raw w.logger.Info(ctx, "Filter out previously restored sstables") remoteSSTableDirToRestoredIDs := make(map[string][]string) - err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + seq := newRunProgressSeq() + for pr := range seq.All(w.run.ClusterID, w.run.TaskID, w.run.ID, w.session) { if validateTimeIsSet(pr.RestoreCompletedAt) { remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...) } - }) - if err != nil { - return nil, errors.Wrap(err, "iterate over prev run progress") + } + if seq.err != nil { + return nil, errors.Wrap(seq.err, "iterate over prev run progress") } if len(remoteSSTableDirToRestoredIDs) == 0 { return rawWorkload, nil diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index b009af117..0e5ceecc9 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -194,13 +194,15 @@ type RunProgress struct { ShardCnt int64 // Host shard count used for bandwidth per shard calculation. AgentJobID int64 + // DownloadStartedAt and DownloadCompletedAt are within the + // RestoreStartedAt and RestoreCompletedAt time frame. DownloadStartedAt *time.Time DownloadCompletedAt *time.Time RestoreStartedAt *time.Time RestoreCompletedAt *time.Time Error string Downloaded int64 - Skipped int64 + Restored int64 Failed int64 VersionedProgress int64 } @@ -252,6 +254,8 @@ type KeyspaceProgress struct { type HostProgress struct { Host string `json:"host"` ShardCnt int64 `json:"shard_cnt"` + RestoredBytes int64 `json:"restored_bytes"` + RestoreDuration int64 `json:"restore_duration"` DownloadedBytes int64 `json:"downloaded_bytes"` DownloadDuration int64 `json:"download_duration"` StreamedBytes int64 `json:"streamed_bytes"` diff --git a/pkg/service/restore/progress.go b/pkg/service/restore/progress.go index 8011f8734..92bebc635 100644 --- a/pkg/service/restore/progress.go +++ b/pkg/service/restore/progress.go @@ -3,10 +3,10 @@ package restore import ( + "iter" "slices" "time" - "github.com/pkg/errors" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/gocqlx/v2/qb" "github.com/scylladb/scylla-manager/v3/pkg/schema/table" @@ -14,203 +14,182 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) -var ( - zeroTime time.Time - bigTime = time.Unix(1<<62-1, 0).UTC() -) +func getProgress(run *Run, s gocqlx.Session) (Progress, error) { + seq := newRunProgressSeq() + pr := aggregateProgress(run, seq.All(run.ClusterID, run.TaskID, run.ID, s), timeutc.Now()) + if seq.err != nil { + return Progress{}, seq.err + } + return pr, nil +} -type tableKey struct { - keyspace string - table string +// runProgressSeq serves as a body for iter.Seq[*RunProgress]. +// Since we can't return error on iteration, it stores it +// inside the err field. +// Users should validate err after using the iterator. +type runProgressSeq struct { + err error } -// aggregateProgress returns restore progress information classified by keyspace and tables. -func (w *worker) aggregateProgress() (Progress, error) { - var ( - p = Progress{ - SnapshotTag: w.run.SnapshotTag, - Stage: w.run.Stage, - } - tableMap = make(map[tableKey]*TableProgress) - key tableKey - hostProgress = make(map[string]HostProgress) - ) - - // Initialize tables and their size - for _, u := range w.run.Units { - key.keyspace = u.Keyspace - for _, t := range u.Tables { - key.table = t.Table - tableMap[key] = &TableProgress{ - Table: t.Table, - TombstoneGC: t.TombstoneGC, - progress: progress{ - Size: t.Size, - StartedAt: &bigTime, - CompletedAt: &zeroTime, - }, +func newRunProgressSeq() *runProgressSeq { + return &runProgressSeq{} +} + +func (seq *runProgressSeq) All(clusterID, taskID, runID uuid.UUID, s gocqlx.Session) iter.Seq[*RunProgress] { + return func(yield func(rp *RunProgress) bool) { + q := table.RestoreRunProgress.SelectQuery(s) + it := q.BindMap(qb.M{ + "cluster_id": clusterID, + "task_id": taskID, + "run_id": runID, + }).Iter() + defer func() { + seq.err = it.Close() + q.Release() + }() + + pr := new(RunProgress) + for it.StructScan(pr) { + if !yield(pr) { + break } } } +} - // Initialize tables' progress - atp := aggregateRestoreTableProgress(tableMap) - err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(runProgress *RunProgress) { - atp(runProgress) - hp := hostProgress[runProgress.Host] - hp.Host = runProgress.Host - hp.ShardCnt = runProgress.ShardCnt - hp.DownloadedBytes += runProgress.Downloaded - hp.DownloadDuration += timeSub(runProgress.DownloadStartedAt, runProgress.DownloadCompletedAt).Milliseconds() - if runProgress.RestoreCompletedAt != nil { - hp.StreamedBytes += runProgress.Downloaded - hp.StreamDuration += timeSub(runProgress.RestoreStartedAt, runProgress.RestoreCompletedAt).Milliseconds() - } - hostProgress[runProgress.Host] = hp - }) - if err != nil { - return p, errors.Wrap(err, "iterate over restore progress") +func aggregateProgress(run *Run, seq iter.Seq[*RunProgress], now time.Time) Progress { + p := Progress{ + SnapshotTag: run.SnapshotTag, + Stage: run.Stage, + } + tableProgress := make(map[TableName]TableProgress) + hostProgress := make(map[string]HostProgress) + for rp := range seq { + progressCB(tableProgress, hostProgress, rp, now) } - // Aggregate progress - for _, u := range w.run.Units { + // Aggregate keyspace progress + for _, u := range run.Units { + p.progress.Size += u.Size kp := KeyspaceProgress{ Keyspace: u.Keyspace, progress: progress{ - StartedAt: &bigTime, - CompletedAt: &zeroTime, + Size: u.Size, }, } for _, t := range u.Tables { - tp := tableMap[tableKey{keyspace: u.Keyspace, table: t.Table}] - tp.progress.extremeToNil() + tp := tableProgress[TableName{Keyspace: u.Keyspace, Table: t.Table}] + tp.Table = t.Table + tp.TombstoneGC = t.TombstoneGC + tp.Size = t.Size + if tp.Restored < tp.Size { + tp.CompletedAt = nil + } - kp.Tables = append(kp.Tables, *tp) - kp.calcParentProgress(tp.progress) + kp.Tables = append(kp.Tables, tp) + kp.updateParentProgress(tp.progress) } - kp.extremeToNil() - + if kp.Restored < kp.Size { + kp.CompletedAt = nil + } p.Keyspaces = append(p.Keyspaces, kp) - p.calcParentProgress(kp.progress) + p.updateParentProgress(kp.progress) + } + if p.Restored < p.Size { + p.CompletedAt = nil } - p.extremeToNil() - - p.Views = slices.Clone(w.run.Views) - for _, hp := range hostProgress { + p.Views = slices.Clone(run.Views) + for h, hp := range hostProgress { + hp.Host = h p.Hosts = append(p.Hosts, hp) } - return p, nil + return p } -// aggregateRestoreTableProgress returns function that can be used to aggregate -// restore progress per table. -func aggregateRestoreTableProgress(tableMap map[tableKey]*TableProgress) func(*RunProgress) { - return func(pr *RunProgress) { - var ( - key = tableKey{ - keyspace: pr.Keyspace, - table: pr.Table, - } - tab = tableMap[key] - ) - - totalDownloaded := pr.Downloaded + pr.Skipped + pr.VersionedProgress - if validateTimeIsSet(pr.RestoreCompletedAt) { - tab.Restored += totalDownloaded - } - tab.Downloaded += totalDownloaded - tab.Failed += pr.Failed - - tab.StartedAt = calcParentStartedAt(tab.StartedAt, pr.DownloadStartedAt) - tab.CompletedAt = calcParentCompletedAt(tab.CompletedAt, pr.RestoreCompletedAt) - - if tab.Error == "" { - tab.Error = pr.Error - } else if pr.Error != "" { - tab.Error = tab.Error + "\n" + pr.Error - } - - tableMap[key] = tab +func progressCB(tableProgress map[TableName]TableProgress, hostProgress map[string]HostProgress, pr *RunProgress, now time.Time) { + // Update table progress + tn := TableName{Keyspace: pr.Keyspace, Table: pr.Table} + tp := tableProgress[tn] + + tp.Downloaded += pr.Downloaded + pr.VersionedProgress + tp.Restored += pr.Restored + tp.Failed += pr.Failed + tp.StartedAt = minTime(tp.StartedAt, pr.RestoreStartedAt) + // We need to later validate that the table + // indeed been completely restored. + tp.CompletedAt = maxTime(tp.CompletedAt, pr.RestoreCompletedAt) + if tp.Error == "" { + tp.Error = pr.Error + } else if pr.Error != "" { + tp.Error = tp.Error + "\n" + pr.Error + } + tableProgress[tn] = tp + + // Update host progress + hp := hostProgress[pr.Host] + hp.ShardCnt = pr.ShardCnt + hp.DownloadedBytes += pr.Downloaded + pr.VersionedProgress + // We can update download duration on the fly, + // but it's not possible with sync load&stream API. + hp.DownloadDuration += timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt, now).Milliseconds() + if validateTimeIsSet(pr.RestoreCompletedAt) { + hp.RestoredBytes += pr.Restored + hp.RestoreDuration += timeSub(pr.RestoreStartedAt, pr.RestoreCompletedAt, now).Milliseconds() + hp.StreamedBytes += pr.Restored + hp.StreamDuration += timeSub(pr.DownloadCompletedAt, pr.RestoreCompletedAt, now).Milliseconds() } + hostProgress[pr.Host] = hp } -// extremeToNil converts from temporary extreme time values to nil. -func (rp *progress) extremeToNil() { - if rp.StartedAt == &bigTime { - rp.StartedAt = nil +// minTime chooses the smaller set time. +func minTime(a, b *time.Time) *time.Time { + if !validateTimeIsSet(a) { + return b } - if rp.CompletedAt == &zeroTime { - rp.CompletedAt = nil + if !validateTimeIsSet(b) { + return a } -} - -// calcParentProgress returns updated progress for the parent that will include -// child progress. -func (rp *progress) calcParentProgress(child progress) { - rp.Size += child.Size - rp.Restored += child.Restored - rp.Downloaded += child.Downloaded - rp.Failed += child.Failed - - rp.StartedAt = calcParentStartedAt(rp.StartedAt, child.StartedAt) - rp.CompletedAt = calcParentCompletedAt(rp.CompletedAt, child.CompletedAt) -} - -func calcParentStartedAt(parent, child *time.Time) *time.Time { - if child != nil { - // Use child start time as parent start time only if it started before - // parent. - if parent == nil || child.Before(*parent) { - return child - } + if a.Before(*b) { + return a } - return parent + return b } -func calcParentCompletedAt(parent, child *time.Time) *time.Time { - if child != nil { - // Use child end time as parent end time only if it ended after parent. - if parent != nil && child.After(*parent) { - return child - } - } else { - // Set parent end time to nil if any of its children are ending in nil. - return nil +// maxTime chooses the bigger set time. +func maxTime(a, b *time.Time) *time.Time { + if !validateTimeIsSet(a) { + return b } - return parent -} - -func forEachProgress(s gocqlx.Session, clusterID, taskID, runID uuid.UUID, cb func(*RunProgress)) error { - q := table.RestoreRunProgress.SelectQuery(s) - iter := q.BindMap(qb.M{ - "cluster_id": clusterID, - "task_id": taskID, - "run_id": runID, - }).Iter() - - pr := new(RunProgress) - for iter.StructScan(pr) { - cb(pr) + if !validateTimeIsSet(b) { + return a } + if a.After(*b) { + return a + } + return b +} - err := iter.Close() - q.Release() - return err +// updateParentProgress updates parents progress with child. +func (rp *progress) updateParentProgress(child progress) { + rp.Restored += child.Restored + rp.Downloaded += child.Downloaded + rp.Failed += child.Failed + rp.StartedAt = minTime(rp.StartedAt, child.StartedAt) + rp.CompletedAt = maxTime(rp.CompletedAt, child.CompletedAt) } // Returns duration between end and start. // If start is nil, returns 0. // If end is nil, returns duration between now and start. -func timeSub(start, end *time.Time) time.Duration { +func timeSub(start, end *time.Time, now time.Time) time.Duration { if start != nil { - endV := timeutc.Now() if end != nil { - endV = *end + return end.Sub(*start) } - return endV.Sub(*start) + return now.Sub(*start) } return time.Duration(0) } diff --git a/pkg/service/restore/progress_test.go b/pkg/service/restore/progress_test.go new file mode 100644 index 000000000..0ea14704a --- /dev/null +++ b/pkg/service/restore/progress_test.go @@ -0,0 +1,185 @@ +// Copyright (C) 2025 ScyllaDB + +package restore + +import ( + "reflect" + "slices" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/testutils" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" +) + +func TestAggregateProgress(t *testing.T) { + var ( + taskStart = timeutc.MustParse(time.RFC3339, "2024-02-23T01:12:00Z") + now = taskStart.Add(10 * time.Second) + host = "h" + ks = "ks" + tab1 = "tab1" + tab2 = "tab2" + // Make sure that the sizes can be divided by 2 and 3 + tab1size = int64(2 * 3 * 10) + tab2size = int64(2 * 3 * 20) + ) + + run := Run{ + ClusterID: uuid.NewFromUint64(10, 100), + TaskID: uuid.NewFromUint64(20, 200), + ID: uuid.NewFromUint64(30, 300), + SnapshotTag: backupspec.SnapshotTagAt(taskStart.Add(-time.Second)), + Stage: StageData, + Units: []Unit{ + { + Keyspace: ks, + Size: tab1size + tab2size, + Tables: []Table{ + {Table: tab1, TombstoneGC: modeRepair, Size: tab1size}, + {Table: tab2, TombstoneGC: modeTimeout, Size: tab2size}, + }, + }, + }, + Views: []View{ + { + Keyspace: ks, + View: "mv", + Type: MaterializedView, + BaseTable: tab1, + CreateStmt: "CREATE", + }, + }, + } + + timePtr := func(t time.Time) *time.Time { + return &t + } + + // The template RunProgresses use native Scylla API + fullTab1 := &RunProgress{ + ClusterID: run.ClusterID, + TaskID: run.TaskID, + RunID: run.ID, + RemoteSSTableDir: "path/" + tab1, + Keyspace: ks, + Table: tab1, + SSTableID: []string{"1", "2"}, + Host: host, + ShardCnt: 1, + AgentJobID: 1, + RestoreStartedAt: &taskStart, + RestoreCompletedAt: timePtr(taskStart.Add(2 * time.Second)), + DownloadStartedAt: &taskStart, + DownloadCompletedAt: timePtr(taskStart.Add(time.Second)), + Restored: tab1size, + Downloaded: tab1size, + } + fullTab2 := &RunProgress{ + ClusterID: run.ClusterID, + TaskID: run.TaskID, + RunID: run.ID, + RemoteSSTableDir: "path/" + tab2, + Keyspace: ks, + Table: tab2, + SSTableID: []string{"3", "4"}, + Host: host, + ShardCnt: 1, + AgentJobID: 1, + RestoreStartedAt: &taskStart, + RestoreCompletedAt: timePtr(taskStart.Add(10 * time.Second)), + DownloadStartedAt: &taskStart, + DownloadCompletedAt: timePtr(taskStart.Add(5 * time.Second)), + Restored: tab2size, + Downloaded: tab2size, + } + + // Transforms RunProgress from native Scylla API to Rclone API + const unsetCompletedAt = -1 + setRclone := func(rp *RunProgress, d, v int64, + downloadStartFromNowInSec, downloadEndFromNowInSec, restoreEndFromNowInSec int) *RunProgress { + clone := *rp + clone.RestoreStartedAt = timePtr(taskStart.Add(time.Duration(downloadStartFromNowInSec) * time.Second)) + clone.DownloadStartedAt = timePtr(taskStart.Add(time.Duration(downloadStartFromNowInSec) * time.Second)) + if downloadEndFromNowInSec != unsetCompletedAt { + clone.DownloadCompletedAt = timePtr(taskStart.Add(time.Duration(downloadEndFromNowInSec) * time.Second)) + } else { + clone.DownloadCompletedAt = nil + } + if restoreEndFromNowInSec != unsetCompletedAt { + clone.Restored = d + v + clone.RestoreCompletedAt = timePtr(taskStart.Add(time.Duration(restoreEndFromNowInSec) * time.Second)) + } else { + clone.Restored = 0 + clone.RestoreCompletedAt = nil + } + clone.Downloaded = d + clone.VersionedProgress = v + return &clone + } + + testCases := []struct { + name string + progress []*RunProgress + }{ + { + name: "complete restore", + progress: []*RunProgress{ + fullTab1, + fullTab2, + }, + }, + { + name: "no progress", + progress: []*RunProgress{}, + }, + { + name: "zero progress", + progress: []*RunProgress{ + setRclone(fullTab1, 0, 0, 0, unsetCompletedAt, unsetCompletedAt), + setRclone(fullTab2, 0, 0, 0, unsetCompletedAt, unsetCompletedAt), + }, + }, + { + name: "full tab1 zero tab2", + progress: []*RunProgress{ + fullTab1, + setRclone(fullTab2, 0, 0, 0, unsetCompletedAt, unsetCompletedAt), + }, + }, + { + name: "partial tab1 partial tab2", + progress: []*RunProgress{ + setRclone(fullTab1, tab1size/2, tab1size/2, 0, 5, unsetCompletedAt), + setRclone(fullTab2, tab2size/3, 0, 0, 3, 7), + setRclone(fullTab2, tab2size/3, 0, 2, unsetCompletedAt, unsetCompletedAt), + }, + }, + { + name: "full tab1 partial tab2", + progress: []*RunProgress{ + setRclone(fullTab1, tab1size/2, tab1size/2, 5, 7, 10), + setRclone(fullTab2, tab2size/3, 0, 1, 2, 3), + setRclone(fullTab2, tab2size/3, tab2size/3, 1, 2, unsetCompletedAt), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pr := aggregateProgress(&run, slices.Values(tc.progress), now) + testutils.SaveGoldenJSONFileIfNeeded(t, pr) + var expected Progress + testutils.LoadGoldenJSONFile(t, &expected) + if diff := cmp.Diff(pr, expected, cmp.Exporter(func(r reflect.Type) bool { + return true + })); diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index d60417b99..dcc5566d8 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -92,12 +92,13 @@ func (w *schemaWorker) stageRestoreData(ctx context.Context) error { } } // Set restore start in all run progresses - err = forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + seq := newRunProgressSeq() + for pr := range seq.All(w.run.ClusterID, w.run.TaskID, w.run.ID, w.session) { pr.setRestoreStartedAt() w.insertRunProgress(ctx, pr) - }) - if err != nil { - w.logger.Error(ctx, "Couldn't set restore start", "error", err) + } + if seq.err != nil { + w.logger.Error(ctx, "Couldn't set restore start", "error", seq.err) } // Load schema SSTables on all nodes @@ -124,12 +125,13 @@ func (w *schemaWorker) stageRestoreData(ctx context.Context) error { return err } // Set restore completed in all run progresses - err = forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + for pr := range seq.All(w.run.ClusterID, w.run.TaskID, w.run.ID, w.session) { pr.setRestoreCompletedAt() + pr.Restored = pr.Downloaded + pr.VersionedProgress w.insertRunProgress(ctx, pr) - }) - if err != nil { - w.logger.Error(ctx, "Couldn't set restore end", "error", err) + } + if seq.err != nil { + w.logger.Error(ctx, "Couldn't set restore end", "error", seq.err) } return nil @@ -181,6 +183,7 @@ func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error { RestoreStartedAt: &start, RestoreCompletedAt: &end, Downloaded: t.Size, + Restored: t.Size, } w.insertRunProgress(ctx, &pr) } diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index e21fc6df9..83112968f 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -158,8 +158,7 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid return Progress{}, errors.Wrap(err, "get run") } - w := s.newProgressWorker(run) - pr, err := w.aggregateProgress() + pr, err := getProgress(run, s.session) if err != nil { return Progress{}, err } @@ -169,7 +168,7 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid return pr, nil } - q := table.RepairRun.SelectQuery(w.session).BindMap(qb.M{ + q := table.RepairRun.SelectQuery(s.session).BindMap(qb.M{ "cluster_id": run.ClusterID, "task_id": run.RepairTaskID, }) @@ -217,16 +216,6 @@ func (w *worker) setRunInfo(taskID, runID uuid.UUID) { w.run.ID = runID } -func (s *Service) newProgressWorker(run *Run) worker { - return worker{ - run: run, - config: s.config, - logger: s.logger, - metrics: s.metrics, - session: s.session, - } -} - // GetRun returns run with specified cluster, task and run ID. // If run ID is not specified, it returns the latest run with specified cluster and task ID. func GetRun(s gocqlx.Session, clusterID, taskID, runID uuid.UUID) (*Run, error) { diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index eae7c386c..9dadcfe55 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -12,6 +12,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" + "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" ) func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgress) (err error) { @@ -219,19 +220,22 @@ func (w *tablesWorker) onDownloadUpdate(ctx context.Context, b batch, pr *RunPro // As we update metrics on download update, // we need to remember to update just the delta. w.metrics.IncreaseRestoreDownloadedBytes(w.run.ClusterID, b.Location.StringWithoutDC(), pr.Host, job.Uploaded-pr.Downloaded) - prevD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt) + now := timeutc.Now() + prevD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt, now) if t := time.Time(job.StartedAt); !t.IsZero() { pr.DownloadStartedAt = &t + pr.RestoreStartedAt = &t } if t := time.Time(job.CompletedAt); !t.IsZero() { pr.DownloadCompletedAt = &t } - currD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt) + currD := timeSub(pr.DownloadStartedAt, pr.DownloadCompletedAt, now) w.metrics.IncreaseRestoreDownloadDuration(w.run.ClusterID, b.Location.StringWithoutDC(), pr.Host, currD-prevD) pr.Error = job.Error - pr.Downloaded = job.Uploaded - pr.Skipped = job.Skipped + // Skipped should be equal to 0, + // as we don't perform any deduplication. + pr.Downloaded = job.Uploaded + job.Skipped pr.Failed = job.Failed w.insertRunProgress(ctx, pr) @@ -243,15 +247,14 @@ func (w *tablesWorker) onDownloadUpdate(ctx context.Context, b batch, pr *RunPro func (w *tablesWorker) onLasStart(ctx context.Context, b batch, pr *RunProgress) { w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) w.logger.Info(ctx, "Started restoring batch", "host", pr.Host) - pr.setRestoreStartedAt() - w.insertRunProgress(ctx, pr) } func (w *tablesWorker) onLasEnd(ctx context.Context, b batch, pr *RunProgress) { w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.target.SnapshotTag, pr.Host, metrics.RestoreStateIdle) pr.setRestoreCompletedAt() + pr.Restored = pr.Downloaded + pr.VersionedProgress w.metrics.IncreaseRestoreStreamedBytes(w.run.ClusterID, pr.Host, b.Size) - w.metrics.IncreaseRestoreStreamDuration(w.run.ClusterID, pr.Host, timeSub(pr.RestoreStartedAt, pr.RestoreCompletedAt)) + w.metrics.IncreaseRestoreStreamDuration(w.run.ClusterID, pr.Host, timeSub(pr.RestoreStartedAt, pr.RestoreCompletedAt, timeutc.Now())) labels := metrics.RestoreBytesLabels{ ClusterID: b.ClusterID.String(), diff --git a/pkg/service/restore/testdata/AggregateProgress/complete_restore.golden.json b/pkg/service/restore/testdata/AggregateProgress/complete_restore.golden.json new file mode 100644 index 000000000..eb1f14a10 --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/complete_restore.golden.json @@ -0,0 +1,66 @@ +{ + "size": 180, + "restored": 180, + "downloaded": 180, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": "2024-02-23T01:12:10Z", + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 180, + "downloaded": 180, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": "2024-02-23T01:12:10Z", + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 60, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": "2024-02-23T01:12:02Z", + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 120, + "downloaded": 120, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": "2024-02-23T01:12:10Z", + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "hosts": [ + { + "host": "h", + "shard_cnt": 1, + "restored_bytes": 180, + "restore_duration": 12000, + "downloaded_bytes": 180, + "download_duration": 6000, + "streamed_bytes": 180, + "stream_duration": 6000 + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/testdata/AggregateProgress/full_tab1_partial_tab2.golden.json b/pkg/service/restore/testdata/AggregateProgress/full_tab1_partial_tab2.golden.json new file mode 100644 index 000000000..94685a37e --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/full_tab1_partial_tab2.golden.json @@ -0,0 +1,66 @@ +{ + "size": 180, + "restored": 100, + "downloaded": 180, + "failed": 0, + "started_at": "2024-02-23T01:12:01Z", + "completed_at": null, + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 100, + "downloaded": 180, + "failed": 0, + "started_at": "2024-02-23T01:12:01Z", + "completed_at": null, + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 60, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:05Z", + "completed_at": "2024-02-23T01:12:10Z", + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 40, + "downloaded": 120, + "failed": 0, + "started_at": "2024-02-23T01:12:01Z", + "completed_at": null, + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "hosts": [ + { + "host": "h", + "shard_cnt": 1, + "restored_bytes": 100, + "restore_duration": 7000, + "downloaded_bytes": 180, + "download_duration": 4000, + "streamed_bytes": 100, + "stream_duration": 4000 + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/testdata/AggregateProgress/full_tab1_zero_tab2.golden.json b/pkg/service/restore/testdata/AggregateProgress/full_tab1_zero_tab2.golden.json new file mode 100644 index 000000000..4bd3033ce --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/full_tab1_zero_tab2.golden.json @@ -0,0 +1,66 @@ +{ + "size": 180, + "restored": 60, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 60, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 60, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": "2024-02-23T01:12:02Z", + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "hosts": [ + { + "host": "h", + "shard_cnt": 1, + "restored_bytes": 60, + "restore_duration": 2000, + "downloaded_bytes": 60, + "download_duration": 11000, + "streamed_bytes": 60, + "stream_duration": 1000 + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/testdata/AggregateProgress/no_progress.golden.json b/pkg/service/restore/testdata/AggregateProgress/no_progress.golden.json new file mode 100644 index 000000000..2726e6ab3 --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/no_progress.golden.json @@ -0,0 +1,54 @@ +{ + "size": 180, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": null, + "completed_at": null, + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": null, + "completed_at": null, + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": null, + "completed_at": null, + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": null, + "completed_at": null, + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/testdata/AggregateProgress/partial_tab1_partial_tab2.golden.json b/pkg/service/restore/testdata/AggregateProgress/partial_tab1_partial_tab2.golden.json new file mode 100644 index 000000000..2d394566e --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/partial_tab1_partial_tab2.golden.json @@ -0,0 +1,66 @@ +{ + "size": 180, + "restored": 40, + "downloaded": 140, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 40, + "downloaded": 140, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 0, + "downloaded": 60, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 40, + "downloaded": 80, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "hosts": [ + { + "host": "h", + "shard_cnt": 1, + "restored_bytes": 40, + "restore_duration": 7000, + "downloaded_bytes": 140, + "download_duration": 16000, + "streamed_bytes": 40, + "stream_duration": 4000 + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/testdata/AggregateProgress/zero_progress.golden.json b/pkg/service/restore/testdata/AggregateProgress/zero_progress.golden.json new file mode 100644 index 000000000..aad8cd387 --- /dev/null +++ b/pkg/service/restore/testdata/AggregateProgress/zero_progress.golden.json @@ -0,0 +1,66 @@ +{ + "size": 180, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "repair_progress": null, + "snapshot_tag": "sm_20240223011159UTC", + "keyspaces": [ + { + "size": 180, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "keyspace": "ks", + "tables": [ + { + "size": 60, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "table": "tab1", + "tombstone_gc": "repair" + }, + { + "size": 120, + "restored": 0, + "downloaded": 0, + "failed": 0, + "started_at": "2024-02-23T01:12:00Z", + "completed_at": null, + "table": "tab2", + "tombstone_gc": "timeout" + } + ] + } + ], + "hosts": [ + { + "host": "h", + "shard_cnt": 1, + "restored_bytes": 0, + "restore_duration": 0, + "downloaded_bytes": 0, + "download_duration": 20000, + "streamed_bytes": 0, + "stream_duration": 0 + } + ], + "views": [ + { + "keyspace": "ks", + "view": "mv", + "type": "MaterializedView", + "base_table": "tab1", + "create_stmt": "CREATE", + "status": "" + } + ], + "stage": "DATA" +} \ No newline at end of file diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index c6a42d1b3..7a7c23abc 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -693,7 +693,8 @@ func (w *worker) clonePrevProgress(ctx context.Context) { q := table.RestoreRunProgress.InsertQuery(w.session) defer q.Release() - err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.PrevID, func(pr *RunProgress) { + seq := newRunProgressSeq() + for pr := range seq.All(w.run.ClusterID, w.run.TaskID, w.run.PrevID, w.session) { // We don't support interrupted run progresses resume, // so only finished run progresses should be copied. if !validateTimeIsSet(pr.RestoreCompletedAt) { @@ -706,9 +707,9 @@ func (w *worker) clonePrevProgress(ctx context.Context) { "error", err, ) } - }) - if err != nil { - w.logger.Error(ctx, "Couldn't clone run progress", "error", err) + } + if seq.err != nil { + w.logger.Error(ctx, "Couldn't clone run progress", "error", seq.err) } } diff --git a/schema/v3.5.0.cql b/schema/v3.5.0.cql index 0981b597c..99654f2c5 100644 --- a/schema/v3.5.0.cql +++ b/schema/v3.5.0.cql @@ -1 +1,3 @@ -ALTER TYPE restore_view ADD build_status text; \ No newline at end of file +ALTER TYPE restore_view ADD build_status text; +ALTER TABLE restore_run_progress DROP skipped; +ALTER TABLE restore_run_progress ADD restored bigint; \ No newline at end of file