Skip to content

Commit

Permalink
backup: Addressing changes for adding sharding support
Browse files Browse the repository at this point in the history
  • Loading branch information
mskwon committed Jun 14, 2023
1 parent 11c101c commit 9cb7ccb
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 208 deletions.
2 changes: 1 addition & 1 deletion ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ general:
full_interval: 24h # FULL_INTERVAL, use only for `watch` command, full backup will create every 24h
watch_backup_name_template: "shard{shard}-{type}-{time:20060102150405}" # WATCH_BACKUP_NAME_TEMPLATE, used only for `watch` command, macros values will apply from `system.macros` for time:XXX, look format in https://go.dev/src/time/format.go

sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas.
sharded_operation_mode: none # SHARDED_OPERATION_MODE, how different replicas will shard backing up data for tables. Options are: none (no sharding), table (table granularity), database (database granularity), first-replica (on the lexicographically sorted first active replica). If left empty, then the "none" option will be set as default.
clickhouse:
username: default # CLICKHOUSE_USERNAME
password: "" # CLICKHOUSE_PASSWORD
Expand Down
100 changes: 84 additions & 16 deletions pkg/backup/backup_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/fnv"
"sort"
)

var (
Expand All @@ -15,6 +14,14 @@ var (

// errNoActiveReplicas is returned when a table is has no current active replicas
errNoActiveReplicas = errors.New("no active replicas")

shardFuncRegistry = map[string]shardFunc{
"table": fnvHashModTableShardFunc,
"database": fnvHashModDatabaseShardFunc,
"first-replica": firstReplicaShardFunc,
"none": noneShardFunc,
"": noneShardFunc,
}
)

// shardDetermination is an object holding information on whether or not a table is within the
Expand All @@ -23,7 +30,7 @@ type shardDetermination map[string]bool

// inShard returns whether or not a given table is within a backup shard
func (d shardDetermination) inShard(database, table string) (bool, error) {
fullName := fmt.Sprintf("%s.%s", database, table)
fullName := fmt.Sprintf("`%s`.`%s`", database, table)
presentInShard, ok := d[fullName]
if !ok {
return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName,
Expand All @@ -48,33 +55,94 @@ type tableReplicaMetadata struct {

// fullName returns the table name in the form of `database.table`
func (md *tableReplicaMetadata) fullName() string {
return fmt.Sprintf("%s.%s", md.Database, md.Table)
return fmt.Sprintf("`%s`.`%s`", md.Database, md.Table)
}

// querier is an interface that can query Clickhouse
type querier interface {
SelectContext(context.Context, interface{}, string, ...interface{}) error
}

// shardFunc is a function that is used to determine whether or not a given database/table should be
// handled by a given replica
// shardFunc is a function that is determines whether or not a given database/table should have its
// data backed up by the replica calling this function
type shardFunc func(md *tableReplicaMetadata) (bool, error)

// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then
// performs a mod N operation (where N is the number of active replicas) in order to find an index
// in an alphabetically sorted list of active replicas which corresponds to the replica that will
// handle the backup of the table
func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) {
func shardFuncByName(name string) (shardFunc, error) {
chosen, ok := shardFuncRegistry[name]
if !ok {
validOptions := make([]string, len(shardFuncRegistry))
for k := range shardFuncRegistry {
if k == "" {
continue
}
validOptions = append(validOptions, k)
}
return nil, fmt.Errorf("unknown backup sharding option %q, valid options: %v", name,
validOptions)
}
return chosen, nil
}

// fnvShardReplicaFromString returns a replica assignment from a slice of active replicas by taking
// an arbitrary string, performing a FNV hash on it (mod NumActiveReplicas), and using the resulting
// number as an index of the sorted slice of active replicas. It is assumed that the active replicas
// slice is provided pre-sorted.
func fnvShardReplicaFromString(str string, activeReplicas []string) (string, error) {
if len(activeReplicas) == 0 {
return "", fmt.Errorf("could not determine in-shard state for %s: %w", str,
errNoActiveReplicas)
}

h := fnv.New32a()
h.Write([]byte(str))
i := h.Sum32() % uint32(len(activeReplicas))
return activeReplicas[i], nil
}

// fnvHashModTableShardFunc determines whether a replica should handle backing up data based on the
// table name in the form of `database.table`. It is assumed that the active replicas slice is
// provided pre-sorted.
func fnvHashModTableShardFunc(md *tableReplicaMetadata) (bool, error) {
assignedReplica, err := fnvShardReplicaFromString(md.fullName(), md.ActiveReplicas)
if err != nil {
return false, err
}
return assignedReplica == md.ReplicaName, nil
}

// fnvHashModDatabaseShardFunc determines whether a replica should handle backing up data based on
// database name. It is assumed that the active replicas slice is provided pre-sorted.
func fnvHashModDatabaseShardFunc(md *tableReplicaMetadata) (bool, error) {
assignedReplica, err := fnvShardReplicaFromString(md.Database, md.ActiveReplicas)
if err != nil {
return false, err
}
return assignedReplica == md.ReplicaName, nil
}

// firstReplicaShardFunc determines whether a replica should handle backing up data based on whether
// or not it is the lexicographically first active replica. It is assumed that the active replicas
// slice is provided pre-sorted.
func firstReplicaShardFunc(md *tableReplicaMetadata) (bool, error) {
if len(md.ActiveReplicas) == 0 {
return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(),
errNoActiveReplicas)
}
sort.Strings(md.ActiveReplicas)
return md.ReplicaName == md.ActiveReplicas[0], nil
}

h := fnv.New32a()
h.Write([]byte(md.fullName()))
i := h.Sum32() % uint32(len(md.ActiveReplicas))
return md.ActiveReplicas[i] == md.ReplicaName, nil
// noneShardFunc always returns true
func noneShardFunc(md *tableReplicaMetadata) (bool, error) {
return true, nil
}

// doesShard returns whether a ShardedOperationMode configuration performs sharding or not
func doesShard(mode string) bool {
_, ok := shardFuncRegistry[mode]
if !ok {
return false
}
return mode != "" && mode != "none"
}

// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination
Expand All @@ -97,7 +165,7 @@ func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer {
func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) {
md := []tableReplicaMetadata{}
// TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2
query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table"
query := "SELECT t.database, t.name AS table, r.replica_name, arraySort(mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active))) AS active_replicas FROM system.tables t LEFT JOIN system.replicas r ON t.database = r.database AND t.name = r.table"
if err := rd.q.SelectContext(ctx, &md, query); err != nil {
return nil, fmt.Errorf("could not determine replication state: %w", err)
}
Expand Down
Loading

0 comments on commit 9cb7ccb

Please sign in to comment.