Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make restore progress available even when cluster is not #4196

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/restapi/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type HealthCheckService interface {
// RepairService service interface for the REST API handlers.
type RepairService interface {
GetRun(ctx context.Context, clusterID, taskID, runID uuid.UUID) (*repair.Run, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (repair.Progress, error)
GetTarget(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (repair.Target, error)
SetIntensity(ctx context.Context, runID uuid.UUID, intensity float64) error
Expand All @@ -59,15 +60,18 @@ type BackupService interface {
ExtractLocations(ctx context.Context, properties []json.RawMessage) []backupspec.Location
List(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backup.ListItem, error)
ListFiles(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backupspec.FilesInfo, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (backup.Progress, error)
DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, snapshotTags []string) error
GetValidationTarget(_ context.Context, clusterID uuid.UUID, properties json.RawMessage) (backup.ValidationTarget, error)
// GetValidationProgress must work even when the cluster is no longer available.
GetValidationProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) ([]backup.ValidationHostProgress, error)
}

// RestoreService service interface for the REST API handlers.
type RestoreService interface {
GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.View, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (restore.Progress, error)
}

Expand Down
20 changes: 7 additions & 13 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ const (

// View represents statement used for recreating restored (dropped) views.
type View struct {
Keyspace string `json:"keyspace" db:"keyspace_name"`
View string `json:"view" db:"view_name"`
Type ViewType `json:"type" db:"view_type"`
BaseTable string `json:"base_table"`
CreateStmt string `json:"create_stmt"`
Keyspace string `json:"keyspace" db:"keyspace_name"`
View string `json:"view" db:"view_name"`
Type ViewType `json:"type" db:"view_type"`
BaseTable string `json:"base_table"`
CreateStmt string `json:"create_stmt"`
BuildStatus scyllaclient.ViewBuildStatus `json:"status"`
}

func (t View) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error) {
Expand Down Expand Up @@ -235,7 +236,7 @@ type Progress struct {
SnapshotTag string `json:"snapshot_tag"`
Keyspaces []KeyspaceProgress `json:"keyspaces,omitempty"`
Hosts []HostProgress `json:"hosts,omitempty"`
Views []ViewProgress `json:"views,omitempty"`
Views []View `json:"views,omitempty"`
Stage Stage `json:"stage"`
}

Expand Down Expand Up @@ -266,13 +267,6 @@ type TableProgress struct {
Error string `json:"error,omitempty"`
}

// ViewProgress defines restore progress for the view.
type ViewProgress struct {
View

Status scyllaclient.ViewBuildStatus `json:"status"`
}

// TableName represents full table name.
type TableName struct {
Keyspace string
Expand Down
27 changes: 3 additions & 24 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
package restore

import (
"context"
"slices"
"time"

"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -26,7 +25,7 @@ type tableKey struct {
}

// aggregateProgress returns restore progress information classified by keyspace and tables.
func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
func (w *worker) aggregateProgress() (Progress, error) {
var (
p = Progress{
SnapshotTag: w.run.SnapshotTag,
Expand Down Expand Up @@ -99,27 +98,7 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {

p.extremeToNil()

for _, v := range w.run.Views {
viewTableName := v.View
if v.Type == SecondaryIndex {
viewTableName += "_index"
}

status, err := w.client.ViewBuildStatus(ctx, v.Keyspace, viewTableName)
if err != nil {
w.logger.Error(ctx, "Couldn't get view build status",
"keyspace", v.Keyspace,
"view", v.View,
"error", err,
)
status = scyllaclient.StatusUnknown
}
p.Views = append(p.Views, ViewProgress{
View: v,
Status: status,
})
}
Michal-Leszczynski marked this conversation as resolved.
Show resolved Hide resolved

p.Views = slices.Clone(w.run.Views)
for _, hp := range hostProgress {
p.Hosts = append(p.Hosts, hp)
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,85 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

func TestRestoreTablesProgressIntegration(t *testing.T) {
// It verifies that:
// - view status progress is correct
// - progress is available even when cluster is not

if IsIPV6Network() {
t.Skip("nodes don't have ip6tables and related modules to properly simulate unavailable cluster")
}

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ks := randomizedName("progress_")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))

Print("Table setup")
tab := randomizedName("tab_")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("View setup")
mv := randomizedName("mv_")
CreateMaterializedView(t, h.srcCluster.rootSession, ks, tab, mv)
CreateMaterializedView(t, h.dstCluster.rootSession, ks, tab, mv)

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 1, ks, tab)

Print("Run backup")
loc := []Location{testLocation("progress", "")}
S3InitBucket(t, loc[0].Path)
tag := h.runBackup(t, map[string]any{
"location": loc,
})

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, nil, h.dstUser)
h.runRestore(t, map[string]any{
"location": loc,
"snapshot_tag": tag,
"restore_tables": true,
})

Print("Validate success")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, "id", "data")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, mv, "id", "data")

Print("Validate view progress")
pr, err := h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}

BlockREST(t, ManagedClusterHosts()...)
defer func() {
TryUnblockREST(t, ManagedClusterHosts())
if err := EnsureNodesAreUP(t, ManagedClusterHosts(), time.Minute); err != nil {
t.Fatal(err)
}
}()
time.Sleep(100 * time.Millisecond)

Print("Validate view progress when cluster is unavailable")
pr, err = h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}
}
18 changes: 4 additions & 14 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid
return Progress{}, errors.Wrap(err, "get run")
}

w, err := s.newProgressWorker(ctx, run)
if err != nil {
return Progress{}, errors.Wrap(err, "create progress worker")
}

pr, err := w.aggregateProgress(ctx)
w := s.newProgressWorker(run)
pr, err := w.aggregateProgress()
if err != nil {
return Progress{}, err
}
Expand Down Expand Up @@ -221,20 +217,14 @@ func (w *worker) setRunInfo(taskID, runID uuid.UUID) {
w.run.ID = runID
}

func (s *Service) newProgressWorker(ctx context.Context, run *Run) (worker, error) {
client, err := s.scyllaClient(ctx, run.ClusterID)
if err != nil {
return worker{}, errors.Wrap(err, "get client")
}

func (s *Service) newProgressWorker(run *Run) worker {
return worker{
run: run,
config: s.config,
logger: s.logger,
metrics: s.metrics,
client: client,
session: s.session,
}, nil
}
}

// GetRun returns run with specified cluster, task and run ID.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func (w *tablesWorker) restore(ctx context.Context) error {
return nil
},
StageRecreateViews: func() error {
for _, v := range w.run.Views {
for i, v := range w.run.Views {
if err := w.CreateView(ctx, v); err != nil {
return errors.Wrapf(err, "recreate %s.%s with statement %s", v.Keyspace, v.View, v.CreateStmt)
}
if err := w.WaitForViewBuilding(ctx, v); err != nil {
if err := w.WaitForViewBuilding(ctx, &w.run.Views[i]); err != nil {
return errors.Wrapf(err, "wait for %s.%s", v.Keyspace, v.View)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/restore/worker_views.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *worker) CreateView(ctx context.Context, view View) error {
return alterSchemaRetryWrapper(ctx, op, notify)
}

func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
func (w *worker) WaitForViewBuilding(ctx context.Context, view *View) error {
labels := metrics.RestoreViewBuildStatusLabels{
ClusterID: w.run.ClusterID.String(),
Keyspace: view.Keyspace,
Expand All @@ -90,6 +90,8 @@ func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
return retry.Permanent(err)
}

view.BuildStatus = status
w.insertRun(ctx)
switch status {
case scyllaclient.StatusUnknown:
w.metrics.SetViewBuildStatus(labels, metrics.BuildStatusUnknown)
Expand Down
8 changes: 5 additions & 3 deletions pkg/testutils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func WaitForNodeUPOrTimeout(h string, timeout time.Duration) error {
}

// BlockREST blocks the Scylla API ports on h machine by dropping TCP packets.
func BlockREST(t *testing.T, h string) {
func BlockREST(t *testing.T, hosts ...string) {
t.Helper()
if err := RunIptablesCommand(t, h, CmdBlockScyllaREST); err != nil {
t.Error(err)
for _, host := range hosts {
if err := RunIptablesCommand(t, host, CmdBlockScyllaREST); err != nil {
t.Error(err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions schema/v3.5.0.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE restore_view ADD build_status text;
Loading