diff --git a/ReadMe.md b/ReadMe.md index 3ca84773..7f673dc9 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -379,7 +379,7 @@ general: full_interval: 24h # FULL_INTERVAL, use only for `watch` command, full backup will create every 24h watch_backup_name_template: "shard{shard}-{type}-{time:20060102150405}" # WATCH_BACKUP_NAME_TEMPLATE, used only for `watch` command, macros values will apply from `system.macros` for time:XXX, look format in https://go.dev/src/time/format.go - sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas. + sharded_operation_mode: none # SHARDED_OPERATION_MODE, how different replicas will shard backing up data for tables. Options are: none (no sharding), table (table granularity), database (database granularity), first-replica (on the lexicographically sorted first active replica). If left empty, then the "none" option will be set as default. clickhouse: username: default # CLICKHOUSE_USERNAME password: "" # CLICKHOUSE_PASSWORD diff --git a/pkg/backup/backup_shard.go b/pkg/backup/backup_shard.go index 867e1971..6c6f1ca8 100644 --- a/pkg/backup/backup_shard.go +++ b/pkg/backup/backup_shard.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "hash/fnv" - "sort" ) var ( @@ -15,6 +14,14 @@ var ( // errNoActiveReplicas is returned when a table is has no current active replicas errNoActiveReplicas = errors.New("no active replicas") + + shardFuncRegistry = map[string]shardFunc{ + "table": fnvHashModTableShardFunc, + "database": fnvHashModDatabaseShardFunc, + "first-replica": firstReplicaShardFunc, + "none": noneShardFunc, + "": noneShardFunc, + } ) // shardDetermination is an object holding information on whether or not a table is within the @@ -23,7 +30,7 @@ type shardDetermination map[string]bool // inShard returns whether or not a given table is within a backup shard func (d shardDetermination) inShard(database, table string) (bool, error) { - fullName := fmt.Sprintf("%s.%s", database, table) + fullName := fmt.Sprintf("`%s`.`%s`", database, table) presentInShard, ok := d[fullName] if !ok { return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName, @@ -48,7 +55,7 @@ type tableReplicaMetadata struct { // fullName returns the table name in the form of `database.table` func (md *tableReplicaMetadata) fullName() string { - return fmt.Sprintf("%s.%s", md.Database, md.Table) + return fmt.Sprintf("`%s`.`%s`", md.Database, md.Table) } // querier is an interface that can query Clickhouse @@ -56,25 +63,86 @@ type querier interface { SelectContext(context.Context, interface{}, string, ...interface{}) error } -// shardFunc is a function that is used to determine whether or not a given database/table should be -// handled by a given replica +// shardFunc is a function that is determines whether or not a given database/table should have its +// data backed up by the replica calling this function type shardFunc func(md *tableReplicaMetadata) (bool, error) -// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then -// performs a mod N operation (where N is the number of active replicas) in order to find an index -// in an alphabetically sorted list of active replicas which corresponds to the replica that will -// handle the backup of the table -func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) { +func shardFuncByName(name string) (shardFunc, error) { + chosen, ok := shardFuncRegistry[name] + if !ok { + validOptions := make([]string, len(shardFuncRegistry)) + for k := range shardFuncRegistry { + if k == "" { + continue + } + validOptions = append(validOptions, k) + } + return nil, fmt.Errorf("unknown backup sharding option %q, valid options: %v", name, + validOptions) + } + return chosen, nil +} + +// fnvShardReplicaFromString returns a replica assignment from a slice of active replicas by taking +// an arbitrary string, performing a FNV hash on it (mod NumActiveReplicas), and using the resulting +// number as an index of the sorted slice of active replicas. It is assumed that the active replicas +// slice is provided pre-sorted. +func fnvShardReplicaFromString(str string, activeReplicas []string) (string, error) { + if len(activeReplicas) == 0 { + return "", fmt.Errorf("could not determine in-shard state for %s: %w", str, + errNoActiveReplicas) + } + + h := fnv.New32a() + h.Write([]byte(str)) + i := h.Sum32() % uint32(len(activeReplicas)) + return activeReplicas[i], nil +} + +// fnvHashModTableShardFunc determines whether a replica should handle backing up data based on the +// table name in the form of `database.table`. It is assumed that the active replicas slice is +// provided pre-sorted. +func fnvHashModTableShardFunc(md *tableReplicaMetadata) (bool, error) { + assignedReplica, err := fnvShardReplicaFromString(md.fullName(), md.ActiveReplicas) + if err != nil { + return false, err + } + return assignedReplica == md.ReplicaName, nil +} + +// fnvHashModDatabaseShardFunc determines whether a replica should handle backing up data based on +// database name. It is assumed that the active replicas slice is provided pre-sorted. +func fnvHashModDatabaseShardFunc(md *tableReplicaMetadata) (bool, error) { + assignedReplica, err := fnvShardReplicaFromString(md.Database, md.ActiveReplicas) + if err != nil { + return false, err + } + return assignedReplica == md.ReplicaName, nil +} + +// firstReplicaShardFunc determines whether a replica should handle backing up data based on whether +// or not it is the lexicographically first active replica. It is assumed that the active replicas +// slice is provided pre-sorted. +func firstReplicaShardFunc(md *tableReplicaMetadata) (bool, error) { if len(md.ActiveReplicas) == 0 { return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(), errNoActiveReplicas) } - sort.Strings(md.ActiveReplicas) + return md.ReplicaName == md.ActiveReplicas[0], nil +} - h := fnv.New32a() - h.Write([]byte(md.fullName())) - i := h.Sum32() % uint32(len(md.ActiveReplicas)) - return md.ActiveReplicas[i] == md.ReplicaName, nil +// noneShardFunc always returns true +func noneShardFunc(md *tableReplicaMetadata) (bool, error) { + return true, nil +} + +// doesShard returns whether a ShardedOperationMode configuration performs sharding or not +func doesShard(mode string) bool { + _, ok := shardFuncRegistry[mode] + if !ok { + return false + } + return mode != "" && mode != "none" } // replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination @@ -97,7 +165,7 @@ func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer { func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) { md := []tableReplicaMetadata{} // TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2 - query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table" + query := "SELECT t.database, t.name AS table, r.replica_name, arraySort(mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active))) AS active_replicas FROM system.tables t LEFT JOIN system.replicas r ON t.database = r.database AND t.name = r.table" if err := rd.q.SelectContext(ctx, &md, query); err != nil { return nil, fmt.Errorf("could not determine replication state: %w", err) } diff --git a/pkg/backup/backup_shard_test.go b/pkg/backup/backup_shard_test.go index f75a1a65..fda5a1e2 100644 --- a/pkg/backup/backup_shard_test.go +++ b/pkg/backup/backup_shard_test.go @@ -12,10 +12,12 @@ import ( func TestInShard(t *testing.T) { d := shardDetermination{ - "present_db.present_table": true, - "present_db.absent_table": false, - "absent_db.present_table": false, - "absent_db.absent_table": false, + "`present_db`.`present_table`": true, + "`present_db`.`absent_table`": false, + "`absent_db`.`present_table`": false, + "`absent_db`.`absent_table`": false, + "`a.b`.`c`": true, + "`a`.`b.c`": false, } testcases := []struct { name string @@ -62,16 +64,25 @@ func TestInShard(t *testing.T) { table: "absent_table", expectPresent: false, }, + { + name: "Test ambiguous database/table name combination", + database: "a.b", + table: "c", + expectPresent: true, + }, } for _, tc := range testcases { - t.Log(tc.name) - present, err := d.inShard(tc.database, tc.table) - if !errors.Is(err, tc.expectErr) { - t.Fatalf("expected err %q, got %q", tc.expectErr, err) - } - if present != tc.expectPresent { - t.Fatalf("expected in-shard status %v, got %v", tc.expectPresent, present) - } + t.Run(tc.name, + func(t *testing.T) { + present, err := d.inShard(tc.database, tc.table) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected err %q, got %q", tc.expectErr, err) + } + if present != tc.expectPresent { + t.Fatalf("expected in-shard status %v, got %v", tc.expectPresent, present) + } + }, + ) } } @@ -80,24 +91,31 @@ func TestFullName(t *testing.T) { md tableReplicaMetadata expect string }{ + { + md: tableReplicaMetadata{ + Database: "a.b", + Table: "c", + }, + expect: "`a.b`.`c`", + }, { md: tableReplicaMetadata{ Database: "a", - Table: "b", + Table: "b.c", }, - expect: "a.b", + expect: "`a`.`b.c`", }, { md: tableReplicaMetadata{ Database: "db", }, - expect: "db.", + expect: "`db`.``", }, { md: tableReplicaMetadata{ Table: "t", }, - expect: ".t", + expect: "``.`t`", }, } for _, tc := range testcases { @@ -107,11 +125,59 @@ func TestFullName(t *testing.T) { } } -func TestFNVHashModShardFunc(t *testing.T) { +func TestDoesShard(t *testing.T) { testcases := []struct { name string - md *tableReplicaMetadata + shardName string expect bool + }{ + { + name: "Test present and sharding function name string", + shardName: "table", + expect: true, + }, + { + name: "Test present and non-sharding function name string", + shardName: "none", + expect: false, + }, + { + name: "Test empty function name string", + shardName: "", + expect: false, + }, + { + name: "Test absent name string", + shardName: "nonexistent", + expect: false, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + got := doesShard(tc.shardName) + if got != tc.expect { + t.Fatalf("expected %v, got %v", tc.expect, got) + } + }, + ) + } +} + +func TestShardFunc(t *testing.T) { + t.Run("Test obtaining nonexistent shard func", + func(t *testing.T) { + _, err := shardFuncByName("non-existent") + if err == nil { + t.Fatalf("expected error when trying to get a nonexistent shard function") + } + }, + ) + + testcases := []struct { + name string + md *tableReplicaMetadata + expect map[string]bool expectErr error }{ { @@ -122,8 +188,25 @@ func TestFNVHashModShardFunc(t *testing.T) { ReplicaName: "replica", ActiveReplicas: []string{}, }, + expect: map[string]bool{ + "table": false, + "database": false, + "first-replica": false, + }, expectErr: errNoActiveReplicas, }, + { + name: "Test no active replicas for no sharding", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + expect: map[string]bool{ + "none": true, + }, + }, { name: "Test single active replica", md: &tableReplicaMetadata{ @@ -132,7 +215,12 @@ func TestFNVHashModShardFunc(t *testing.T) { ReplicaName: "replica", ActiveReplicas: []string{"replica"}, }, - expect: true, + expect: map[string]bool{ + "table": true, + "database": true, + "first-replica": true, + "none": true, + }, }, { name: "Test not assigned replica", @@ -142,16 +230,34 @@ func TestFNVHashModShardFunc(t *testing.T) { ReplicaName: "replica", ActiveReplicas: []string{"different"}, }, + expect: map[string]bool{ + "table": false, + "database": false, + "first-replica": false, + "none": true, + }, }, } for _, tc := range testcases { - t.Log(tc.name) - got, err := fnvHashModShardFunc(tc.md) - if !errors.Is(err, tc.expectErr) { - t.Fatalf("expected error %v, got %v", tc.expectErr, err) - } - if got != tc.expect { - t.Fatalf("expected shard membership %v, got %v", tc.expect, got) + for name, expect := range tc.expect { + t.Run(fmt.Sprintf("%s - shard mode: %s", tc.name, name), + func(t *testing.T) { + shardFunc, err := shardFuncByName(name) + if err != nil { + t.Fatalf("unable to get shard function: %v", err) + } + got, err := shardFunc(tc.md) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if tc.expectErr == nil { + return + } + if got != expect { + t.Fatalf("expected shard membership %v, got %v", tc.expect, got) + } + }, + ) } } } @@ -211,18 +317,22 @@ func TestGetReplicaState(t *testing.T) { }, } for _, tc := range testcases { - t.Log(tc.name) - rd := newReplicaDeterminer(tc.q, nil) - got, err := rd.getReplicaState(context.Background()) - if !errors.Is(err, tc.expectErr) { - t.Fatalf("expected error %v, got %v", tc.expectErr, err) - } - if err != nil { - continue - } - if !reflect.DeepEqual(got, tc.expect) { - t.Fatalf("expected data %v, got %v", tc.expect, got) - } + t.Run(tc.name, + func(t *testing.T) { + rd := newReplicaDeterminer(tc.q, nil) + got, err := rd.getReplicaState(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + }, + ) + } } @@ -283,24 +393,27 @@ func TestDetermineShards(t *testing.T) { }, }, expect: shardDetermination{ - "a.present": true, - "a.absent": false, - "b.present": true, + "`a`.`present`": true, + "`a`.`absent`": false, + "`b`.`present`": true, }, }, } for _, tc := range testcases { - t.Log(tc.name) - rd := newReplicaDeterminer(tc.q, nameSharder) - got, err := rd.determineShards(context.Background()) - if !errors.Is(err, tc.expectErr) { - t.Fatalf("expected %v, got %v", tc.expectErr, err) - } - if err != nil { - continue - } - if !reflect.DeepEqual(got, tc.expect) { - t.Fatalf("expected data %v, got %v", tc.expect, got) - } + t.Run(tc.name, + func(t *testing.T) { + rd := newReplicaDeterminer(tc.q, nameSharder) + got, err := rd.determineShards(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + }, + ) } } diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index 60fc1cf9..e2ff11e1 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -2,6 +2,7 @@ package backup import ( "context" + "errors" "fmt" "path" @@ -13,6 +14,13 @@ import ( "github.com/rs/zerolog/log" ) +var errShardOperationUnsupported = errors.New("sharded operations are not supported") + +// versioner is an interface for determining the version of Clickhouse +type versioner interface { + CanShardOperation(ctx context.Context) error +} + type BackuperOpt func(*Backuper) type Backuper struct { @@ -38,7 +46,7 @@ func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper { cfg: cfg, ch: ch, vers: ch, - bs: newReplicaDeterminer(ch, fnvHashModShardFunc), + bs: nil, } for _, opt := range opts { opt(b) @@ -107,12 +115,21 @@ func (b *Backuper) populateBackupShardField(ctx context.Context, tables []clickh tables[i].BackupType = clickhouse.ShardBackupNone } } - if !b.cfg.General.ShardedOperation { + if !doesShard(b.cfg.General.ShardedOperationMode) { return nil } - if err := canShardOperation(ctx, b.vers); err != nil { + if err := b.vers.CanShardOperation(ctx); err != nil { return err } + + if b.bs == nil { + // Parse shard config here to avoid error return in NewBackuper + shardFunc, err := shardFuncByName(b.cfg.General.ShardedOperationMode) + if err != nil { + return fmt.Errorf("could not determine shards for tables: %w", err) + } + b.bs = newReplicaDeterminer(b.ch, shardFunc) + } assignment, err := b.bs.determineShards(ctx) if err != nil { return err diff --git a/pkg/backup/backuper_test.go b/pkg/backup/backuper_test.go index cdeb8de3..c067ef79 100644 --- a/pkg/backup/backuper_test.go +++ b/pkg/backup/backuper_test.go @@ -10,6 +10,18 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/config" ) +type testVersioner struct { + err error +} + +func newTestVersioner(err error) *testVersioner { + return &testVersioner{err: err} +} + +func (v *testVersioner) CanShardOperation(ctx context.Context) error { + return v.err +} + type testBackupSharder struct { data shardDetermination err error @@ -24,9 +36,9 @@ func (bs *testBackupSharder) determineShards(_ context.Context) (shardDeterminat func TestPopulateBackupShardField(t *testing.T) { errVersion := errors.New("versioner error") - errVersioner := newTestVersioner(withVersionErr(errVersion)) - goodVersioner := newTestVersioner(withVersion(minVersShardOp)) - oldVersioner := newTestVersioner(withVersion(-1)) + errVersioner := newTestVersioner(errVersion) + goodVersioner := newTestVersioner(nil) + oldVersioner := newTestVersioner(clickhouse.ErrShardOperationVers) // Create tables to reset field state tableData := func() []clickhouse.Table { @@ -51,8 +63,8 @@ func TestPopulateBackupShardField(t *testing.T) { errSharder := &testBackupSharder{err: errShard} staticSharder := &testBackupSharder{ data: shardDetermination{ - "a.present": true, - "a.absent": false, + "`a`.`present`": true, + "`a`.`absent`": false, }, } emptySharder := &testBackupSharder{ @@ -61,7 +73,7 @@ func TestPopulateBackupShardField(t *testing.T) { testcases := []struct { name string - shardConfig bool + shardOpMode string v versioner bs backupSharder expect []clickhouse.Table @@ -69,21 +81,21 @@ func TestPopulateBackupShardField(t *testing.T) { }{ { name: "Test versioner error", - shardConfig: true, + shardOpMode: "table", v: errVersioner, bs: staticSharder, expectErr: errVersion, }, { name: "Test incompatible version", - shardConfig: true, + shardOpMode: "table", v: oldVersioner, bs: staticSharder, - expectErr: errShardOperationVers, + expectErr: clickhouse.ErrShardOperationVers, }, { name: "Test incompatible version without sharding config", - shardConfig: false, + shardOpMode: "none", v: oldVersioner, bs: staticSharder, expect: []clickhouse.Table{ @@ -107,21 +119,21 @@ func TestPopulateBackupShardField(t *testing.T) { }, { name: "Test sharder error", - shardConfig: true, + shardOpMode: "table", v: goodVersioner, bs: errSharder, expectErr: errShard, }, { name: "Test incomplete replica data", - shardConfig: true, + shardOpMode: "table", v: goodVersioner, bs: emptySharder, expectErr: errUnknownBackupShard, }, { name: "Test normal sharding", - shardConfig: true, + shardOpMode: "table", v: goodVersioner, bs: staticSharder, expect: []clickhouse.Table{ @@ -145,26 +157,29 @@ func TestPopulateBackupShardField(t *testing.T) { }, } for _, tc := range testcases { - t.Log(tc.name) - cfg := &config.Config{ - General: config.GeneralConfig{ - ShardedOperation: tc.shardConfig, + t.Run(tc.name, + func(t *testing.T) { + cfg := &config.Config{ + General: config.GeneralConfig{ + ShardedOperationMode: tc.shardOpMode, + }, + } + b := NewBackuper(cfg, + WithVersioner(tc.v), + WithBackupSharder(tc.bs), + ) + tables := tableData() + err := b.populateBackupShardField(context.Background(), tables) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(tables, tc.expect) { + t.Fatalf("expected %+v, got %+v", tc.expect, tables) + } }, - } - b := NewBackuper(cfg, - WithVersioner(tc.v), - WithBackupSharder(tc.bs), ) - tables := tableData() - err := b.populateBackupShardField(context.Background(), tables) - if !errors.Is(err, tc.expectErr) { - t.Fatalf("expected error %v, got %v", tc.expectErr, err) - } - if err != nil { - continue - } - if !reflect.DeepEqual(tables, tc.expect) { - t.Fatalf("expected %+v, got %+v", tc.expect, tables) - } } } diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 537eea36..f7ea0eca 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -276,7 +276,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitions []string, partitionsToBackupMap common.EmptyMap, schemaOnly, rbacOnly, configsOnly bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap map[string]string, logger zerolog.Logger, startBackup time.Time, backupVersion string) error { // TODO: Implement sharded backup operations for embedded backups - if b.cfg.General.ShardedOperation { + if doesShard(b.cfg.General.ShardedOperationMode) { return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported) } if _, isBackupDiskExists := diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk]; !isBackupDiskExists { diff --git a/pkg/backup/version.go b/pkg/backup/version.go deleted file mode 100644 index cb0a3129..00000000 --- a/pkg/backup/version.go +++ /dev/null @@ -1,33 +0,0 @@ -package backup - -import ( - "context" - "errors" -) - -const ( - minVersShardOp = 21000000 -) - -var ( - errShardOperationVers = errors.New("sharded operations are only supported for " + - "clickhouse-server >= v21.x") - errShardOperationUnsupported = errors.New("sharded operations are not supported") -) - -// versioner is an interface for determining the version of Clickhouse -type versioner interface { - GetVersion(context.Context) (int, error) -} - -// canShardOperation returns whether or not sharded backup creation is supported -func canShardOperation(ctx context.Context, v versioner) error { - version, err := v.GetVersion(ctx) - if err != nil { - return err - } - if version < minVersShardOp { - return errShardOperationVers - } - return nil -} diff --git a/pkg/backup/version_test.go b/pkg/backup/version_test.go deleted file mode 100644 index 2259aba3..00000000 --- a/pkg/backup/version_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package backup - -import ( - "context" - "errors" - "testing" -) - -type testVersionerOpt func(sd *testVersioner) - -type testVersioner struct { - version int - versionErr error -} - -func newTestVersioner(opts ...testVersionerOpt) *testVersioner { - v := &testVersioner{} - for _, opt := range opts { - opt(v) - } - return v -} - -func withVersion(version int) testVersionerOpt { - return func(v *testVersioner) { - v.version = version - } -} - -func withVersionErr(err error) testVersionerOpt { - return func(v *testVersioner) { - v.versionErr = err - } -} - -func (v *testVersioner) GetVersion(_ context.Context) (int, error) { - if v.versionErr != nil { - return -1, v.versionErr - } - return v.version, nil -} - -func TestCanShardOperation(t *testing.T) { - ctx := context.Background() - - t.Log("test error on version retrieval") - v := newTestVersioner(withVersionErr(errors.New("error"))) - if err := canShardOperation(ctx, v); err == nil { - t.Fatal("expected error when getting shard determiner error on version retrieval") - } - - t.Log("test version too low") - v = newTestVersioner(withVersion(-1)) - err := canShardOperation(ctx, v) - if err == nil { - t.Fatal("expected error when version number is too low") - } - if !errors.Is(err, errShardOperationVers) { - t.Fatalf("expected errShardOperationUnsupported, got %v", err) - } - - t.Log("test version should be OK") - v = newTestVersioner(withVersion(minVersShardOp)) - if err = canShardOperation(ctx, v); err != nil { - t.Fatalf("unexpected error: %v", err) - } -} diff --git a/pkg/clickhouse/version.go b/pkg/clickhouse/version.go new file mode 100644 index 00000000..9f98f8d8 --- /dev/null +++ b/pkg/clickhouse/version.go @@ -0,0 +1,34 @@ +package clickhouse + +import ( + "context" + "errors" +) + +const ( + minVersShardOp = 21000000 +) + +var ( + ErrShardOperationVers = errors.New("sharded operations are only supported for " + + "clickhouse-server >= v21.x") +) + +type versionGetter interface { + GetVersion(ctx context.Context) (int, error) +} + +func canShardOperation(ctx context.Context, v versionGetter) error { + version, err := v.GetVersion(ctx) + if err != nil { + return err + } + if version < minVersShardOp { + return ErrShardOperationVers + } + return nil +} + +func (ch *ClickHouse) CanShardOperation(ctx context.Context) error { + return canShardOperation(ctx, ch) +} diff --git a/pkg/clickhouse/version_test.go b/pkg/clickhouse/version_test.go new file mode 100644 index 00000000..a1d68f32 --- /dev/null +++ b/pkg/clickhouse/version_test.go @@ -0,0 +1,76 @@ +package clickhouse + +import ( + "context" + "errors" + "testing" +) + +type testVersionGetterOpt func(sd *testVersionGetter) + +type testVersionGetter struct { + version int + versionErr error +} + +func newTestVersionGetter(opts ...testVersionGetterOpt) *testVersionGetter { + v := &testVersionGetter{} + for _, opt := range opts { + opt(v) + } + return v +} + +func withVersion(version int) testVersionGetterOpt { + return func(v *testVersionGetter) { + v.version = version + } +} + +func withVersionErr(err error) testVersionGetterOpt { + return func(v *testVersionGetter) { + v.versionErr = err + } +} + +func (v *testVersionGetter) GetVersion(_ context.Context) (int, error) { + if v.versionErr != nil { + return -1, v.versionErr + } + return v.version, nil +} + +func TestCanShardOperation(t *testing.T) { + ctx := context.Background() + + t.Run("test error on version retrieval", + func(t *testing.T) { + v := newTestVersionGetter(withVersionErr(errors.New("error"))) + if err := canShardOperation(ctx, v); err == nil { + t.Fatal("expected error when getting shard determiner error on version retrieval") + } + }, + ) + + t.Run("test version too low", + func(t *testing.T) { + v := newTestVersionGetter(withVersion(-1)) + err := canShardOperation(ctx, v) + if err == nil { + t.Fatal("expected error when version number is too low") + } + if !errors.Is(err, ErrShardOperationVers) { + t.Fatalf("expected ErrShardOperationUnsupported, got %v", err) + } + }, + ) + + t.Run("test version should be OK", + func(t *testing.T) { + v := newTestVersionGetter(withVersion(minVersShardOp)) + if err := canShardOperation(ctx, v); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }, + ) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 5d0360ec..026f8199 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -56,7 +56,7 @@ type GeneralConfig struct { WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"` FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"` WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"` - ShardedOperation bool `yaml:"sharded_operation" envconfig:"SHARDED_OPERATION"` + ShardedOperationMode string `yaml:"sharded_operation_mode" envconfig:"SHARDED_OPERATION_MODE"` RetriesDuration time.Duration WatchDuration time.Duration FullDuration time.Duration diff --git a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot index 3680d108..5601fb5e 100644 --- a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot +++ b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot @@ -1,4 +1,4 @@ -default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation: false\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" +default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation_mode: ""\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --help, -h show help\n --version, -v print the version'"""