Skip to content

Commit 64b9b8c

Browse files
refactor(backup): unify stage start/end logging
1 parent cd32066 commit 64b9b8c

File tree

8 files changed

+32
-88
lines changed

8 files changed

+32
-88
lines changed

pkg/service/backup/service.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,10 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
810810
}
811811
defer clusterSession.Close()
812812

813-
w.AwaitSchemaAgreement(ctx, clusterSession)
814-
813+
if err := w.AwaitSchemaAgreement(ctx, clusterSession); err != nil {
814+
w.Logger.Info(ctx, "Couldn't await schema agreement, backup of schema as CQL files will be skipped", "error", err)
815+
return nil
816+
}
815817
if err = w.DumpSchema(ctx, clusterSession); err != nil {
816818
w.Logger.Info(ctx, "Couldn't dump schema, backup of schema as CQL files will be skipped", "error", err)
817819
w.Schema = nil
@@ -848,7 +850,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
848850
prevStage := run.Stage
849851

850852
// Execute stages according to the stage order.
851-
execStage := func(stage Stage, f func() error) error {
853+
execStage := func(stage Stage, f func() error) (err error) {
852854
// In purge only mode skip all stages before purge.
853855
if target.PurgeOnly {
854856
if stage.Index() < StagePurge.Index() {
@@ -874,6 +876,18 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
874876
// Always cleanup stats
875877
defer w.cleanup(ctx, hi)
876878

879+
if desc, ok := stageDescription[stage]; ok {
880+
up := strings.ToUpper(desc[:1]) + desc[1:]
881+
w.Logger.Info(ctx, up+"...")
882+
defer func(start time.Time) {
883+
if err != nil {
884+
w.Logger.Error(ctx, up+" failed see exact errors above", "duration", timeutc.Since(start))
885+
} else {
886+
w.Logger.Info(ctx, "Done "+desc, "duration", timeutc.Since(start))
887+
}
888+
}(timeutc.Now())
889+
}
890+
877891
// Run function
878892
return errors.Wrap(f(), strings.ReplaceAll(name, "_", " "))
879893
}

pkg/service/backup/stage.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ const (
2222
StageDone Stage = "DONE"
2323
)
2424

25+
var stageDescription = map[Stage]string{
26+
StageInit: "initialising",
27+
StageAwaitSchema: "awaiting schema agreement",
28+
StageSnapshot: "taking snapshot",
29+
StageIndex: "indexing snapshot files",
30+
StageManifest: "uploading manifest files",
31+
StageSchema: "uploading cql schema",
32+
StageUpload: "uploading snapshot files",
33+
StageMoveManifest: "moving manifest files",
34+
StagePurge: "purging stale snapshots",
35+
}
36+
2537
// StageOrder listing of all stages in the order of execution.
2638
func StageOrder() []Stage {
2739
return []Stage{

pkg/service/backup/worker_index.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,9 @@ import (
1414
"github.com/scylladb/go-set/strset"
1515
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
1616
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
17-
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1817
)
1918

2019
func (w *worker) Index(ctx context.Context, hosts []hostInfo, limits []DCLimit) (err error) {
21-
w.Logger.Info(ctx, "Indexing snapshot files...")
22-
defer func(start time.Time) {
23-
if err != nil {
24-
w.Logger.Error(ctx, "Indexing snapshot files failed see exact errors above", "duration", timeutc.Since(start))
25-
} else {
26-
w.Logger.Info(ctx, "Done indexing snapshot files", "duration", timeutc.Since(start))
27-
}
28-
}(timeutc.Now())
29-
3020
f := func(h hostInfo) error {
3121
w.Logger.Info(ctx, "Indexing snapshot files on host", "host", h.IP)
3222

pkg/service/backup/worker_manifest.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,14 @@ import (
66
"bytes"
77
"context"
88
"net/http"
9-
"time"
109

1110
"github.com/pkg/errors"
1211
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
1312
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
1413
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
15-
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1614
)
1715

1816
func (w *worker) UploadManifest(ctx context.Context, hosts []hostInfo) (stepError error) {
19-
w.Logger.Info(ctx, "Uploading manifest files...")
20-
defer func(start time.Time) {
21-
if stepError != nil {
22-
w.Logger.Error(ctx, "Uploading manifest files failed see exact errors above", "duration", timeutc.Since(start))
23-
} else {
24-
w.Logger.Info(ctx, "Done uploading manifest files", "duration", timeutc.Since(start))
25-
}
26-
}(timeutc.Now())
27-
2817
// Limit parallelism level, on huge clusters creating manifest content in
2918
// memory for all nodes at the same time can lead to memory issues.
3019
const maxParallel = 12
@@ -124,15 +113,6 @@ func (w *worker) uploadHostManifest(ctx context.Context, h hostInfo, m ManifestI
124113
}
125114

126115
func (w *worker) MoveManifest(ctx context.Context, hosts []hostInfo) (err error) {
127-
w.Logger.Info(ctx, "Moving manifest files...")
128-
defer func(start time.Time) {
129-
if err != nil {
130-
w.Logger.Error(ctx, "Moving manifest files failed see exact errors above", "duration", timeutc.Since(start))
131-
} else {
132-
w.Logger.Info(ctx, "Done moving manifest files", "duration", timeutc.Since(start))
133-
}
134-
}(timeutc.Now())
135-
136116
rollbacks := make([]func(context.Context) error, len(hosts))
137117

138118
f := func(i int) (hostErr error) {

pkg/service/backup/worker_purge.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,14 @@ package backup
44

55
import (
66
"context"
7-
"time"
87

98
"github.com/pkg/errors"
109
"github.com/scylladb/go-log"
1110
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
1211
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
13-
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1412
)
1513

1614
func (w *worker) Purge(ctx context.Context, hosts []hostInfo, retentionMap RetentionMap) (err error) {
17-
w.Logger.Info(ctx, "Purging stale snapshots...")
18-
defer func(start time.Time) {
19-
if err != nil {
20-
w.Logger.Error(ctx, "Purging stale snapshots failed see exact errors above", "duration", timeutc.Since(start))
21-
} else {
22-
w.Logger.Info(ctx, "Done purging stale snapshots", "duration", timeutc.Since(start))
23-
}
24-
}(timeutc.Now())
25-
2615
// List manifests in all locations
2716
manifests, err := listManifestsInAllLocations(ctx, w.Client, hosts, w.ClusterID)
2817
if err != nil {

pkg/service/backup/worker_schema.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,9 @@ import (
1212
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
1313
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
1414
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
15-
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1615
)
1716

18-
func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession gocqlx.Session) {
19-
w.Logger.Info(ctx, "Awaiting schema agreement...")
20-
21-
var stepError error
22-
defer func(start time.Time) {
23-
if stepError != nil {
24-
w.Logger.Error(ctx, "Awaiting schema agreement failed see exact errors above", "duration", timeutc.Since(start))
25-
} else {
26-
w.Logger.Info(ctx, "Done awaiting schema agreement", "duration", timeutc.Since(start))
27-
}
28-
}(timeutc.Now())
29-
17+
func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession gocqlx.Session) error {
3018
const (
3119
waitMin = 15 * time.Second // nolint: revive
3220
waitMax = 1 * time.Minute
@@ -52,7 +40,7 @@ func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession g
5240
localSchemaStmt = "SELECT schema_version FROM system.local WHERE key='local'"
5341
)
5442

55-
stepError = retry.WithNotify(ctx, func() error {
43+
return retry.WithNotify(ctx, func() error {
5644
var v []string
5745
if err := clusterSession.Query(peerSchemasStmt, nil).SelectRelease(&v); err != nil {
5846
return retry.Permanent(err)
@@ -84,19 +72,10 @@ func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session)
8472

8573
func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError error) {
8674
if w.Schema == nil {
75+
w.Logger.Info(ctx, "No schema CQL file to upload")
8776
return nil
8877
}
8978

90-
w.Logger.Info(ctx, "Uploading schema...")
91-
92-
defer func(start time.Time) {
93-
if stepError != nil {
94-
w.Logger.Error(ctx, "Uploading schema failed see exact errors above", "duration", timeutc.Since(start))
95-
} else {
96-
w.Logger.Info(ctx, "Done uploading schema", "duration", timeutc.Since(start))
97-
}
98-
}(timeutc.Now())
99-
10079
// Select single host per location
10180
locations := map[string]hostInfo{}
10281
for _, hi := range hosts {

pkg/service/backup/worker_snapshot.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,12 @@ package backup
44

55
import (
66
"context"
7-
"time"
87

98
"github.com/pkg/errors"
109
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
11-
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
1210
)
1311

1412
func (w *worker) Snapshot(ctx context.Context, hosts []hostInfo, limits []DCLimit) (err error) {
15-
w.Logger.Info(ctx, "Taking snapshots...")
16-
defer func(start time.Time) {
17-
if err != nil {
18-
w.Logger.Error(ctx, "Taking snapshots failed see exact errors above", "duration", timeutc.Since(start))
19-
} else {
20-
w.Logger.Info(ctx, "Done taking snapshots", "duration", timeutc.Since(start))
21-
}
22-
}(timeutc.Now())
23-
2413
f := func(h hostInfo) error {
2514
w.Logger.Info(ctx, "Taking snapshots on host", "host", h.IP)
2615
err := w.snapshotHost(ctx, h)

pkg/service/backup/worker_upload.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,6 @@ import (
1414
)
1515

1616
func (w *worker) Upload(ctx context.Context, hosts []hostInfo, limits []DCLimit) (err error) {
17-
w.Logger.Info(ctx, "Uploading snapshot files...")
18-
defer func(start time.Time) {
19-
if err != nil {
20-
w.Logger.Error(ctx, "Uploading snapshot files failed see exact errors above", "duration", timeutc.Since(start))
21-
} else {
22-
w.Logger.Info(ctx, "Done uploading snapshot files", "duration", timeutc.Since(start))
23-
}
24-
}(timeutc.Now())
25-
2617
f := func(h hostInfo) error {
2718
w.Logger.Info(ctx, "Uploading snapshot files on host", "host", h.IP)
2819
err := w.uploadHost(ctx, h)

0 commit comments

Comments
 (0)