Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore: rework progress aggregation #4225

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, raw
w.logger.Info(ctx, "Filter out previously restored sstables")

remoteSSTableDirToRestoredIDs := make(map[string][]string)
err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) {
seq := newRunProgressSeq()
for pr := range seq.All(w.run.ClusterID, w.run.TaskID, w.run.ID, w.session) {
if validateTimeIsSet(pr.RestoreCompletedAt) {
remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...)
}
})
if err != nil {
return nil, errors.Wrap(err, "iterate over prev run progress")
}
if seq.err != nil {
return nil, errors.Wrap(seq.err, "iterate over prev run progress")
}
if len(remoteSSTableDirToRestoredIDs) == 0 {
return rawWorkload, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,15 @@ type RunProgress struct {
ShardCnt int64 // Host shard count used for bandwidth per shard calculation.
AgentJobID int64

// DownloadStartedAt and DownloadCompletedAt are within the
// RestoreStartedAt and RestoreCompletedAt time frame.
DownloadStartedAt *time.Time
DownloadCompletedAt *time.Time
RestoreStartedAt *time.Time
RestoreCompletedAt *time.Time
Error string
Downloaded int64
Skipped int64
Restored int64
Failed int64
VersionedProgress int64
}
Expand Down Expand Up @@ -252,6 +254,8 @@ type KeyspaceProgress struct {
type HostProgress struct {
Host string `json:"host"`
ShardCnt int64 `json:"shard_cnt"`
RestoredBytes int64 `json:"restored_bytes"`
RestoreDuration int64 `json:"restore_duration"`
DownloadedBytes int64 `json:"downloaded_bytes"`
DownloadDuration int64 `json:"download_duration"`
StreamedBytes int64 `json:"streamed_bytes"`
Expand Down
Loading
Loading