Skip to content

Commit

Permalink
clickhouse-backup: Add support for sharded backup
Browse files Browse the repository at this point in the history
This change adds a new configuration 'general.sharded_operation'
which shards tables for backup across replicas, allowing for a
uniform backup and restore call to the server without consideration
for table replication state.

Fixes #639

clickhouse-backup/backup_shard: Use Array for active replicas

clickhouse-go v1 does not support clickhouse Map types. Force the
Map(String, UInt8) column replica_is_active to a string array for
now.

clickhouse-backup/backuper: Skip shard assignment for skipped tables

Skip shard assignment for skipped tables. Also add the new
ShardBackupType "ShardBackupNone", which is assigned to skipped
tables

clickhouse-backup/backuper: Use b.GetTables for CreateBackup

Use b.GetTables for CreateBackup instead of b.ch.GetTables and move
b.populateBackupShardField to b.GetTables so as to populate the
field for the server API.
  • Loading branch information
mskwon committed May 2, 2023
1 parent a6e0d2d commit f14d862
Show file tree
Hide file tree
Showing 12 changed files with 803 additions and 22 deletions.
4 changes: 3 additions & 1 deletion ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ general:
# The format for this env variable is "src_db1:target_db1,src_db2:target_db2". For YAML please continue using map syntax
restore_database_mapping: {}
retries_on_failure: 3 # RETRIES_ON_FAILURE, how many times to retry after a failure during upload or download
retries_pause: 30s # RETRIES_PAUSE, duration time to pause after each download or upload failure
retries_pause: 30s # RETRIES_PAUSE, duration time to pause after each download or upload failure

sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas.
clickhouse:
username: default # CLICKHOUSE_USERNAME
password: "" # CLICKHOUSE_PASSWORD
Expand Down
129 changes: 129 additions & 0 deletions pkg/backup/backup_shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package backup

import (
"context"
"errors"
"fmt"
"hash/fnv"
"sort"
)

var (
// errUnknownBackupShard is returned when sharding assignment is requested for a table for which
// active replication state is not known.
errUnknownBackupShard = errors.New("unknown backup shard")

// errNoActiveReplicas is returned when a table is has no current active replicas
errNoActiveReplicas = errors.New("no active replicas")
)

// shardDetermination is an object holding information on whether or not a table is within the
// backup shard
type shardDetermination map[string]bool

// inShard returns whether or not a given table is within a backup shard
func (d shardDetermination) inShard(database, table string) (bool, error) {
fullName := fmt.Sprintf("%s.%s", database, table)
presentInShard, ok := d[fullName]
if !ok {
return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName,
errUnknownBackupShard)
}
return presentInShard, nil
}

// backupSharder is an interface which can obtain a shard determination at a given point in time
type backupSharder interface {
determineShards(ctx context.Context) (shardDetermination, error)
}

// tableReplicaMetadata is data derived from `system.replicas`
type tableReplicaMetadata struct {
Database string `db:"database" json:"database"`
Table string `db:"table" json:"table"`
ReplicaName string `db:"replica_name" json:"replica_name"`
// TODO: Change type to use replica_is_active directly after upgrade to clickhouse-go v2
ActiveReplicas []string `db:"active_replicas" json:"replica_is_active"`
}

// fullName returns the table name in the form of `database.table`
func (md *tableReplicaMetadata) fullName() string {
return fmt.Sprintf("%s.%s", md.Database, md.Table)
}

// querier is an interface that can query Clickhouse
type querier interface {
StructSelectContext(ctx context.Context, dest any, query string) error
}

// shardFunc is a function that is used to determine whether or not a given database/table should be
// handled by a given replica
type shardFunc func(md *tableReplicaMetadata) (bool, error)

// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then
// performs a mod N operation (where N is the number of active replicas) in order to find an index
// in an alphabetically sorted list of active replicas which corresponds to the replica that will
// handle the backup of the table
func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) {
if len(md.ActiveReplicas) == 0 {
return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(),
errNoActiveReplicas)
}
sort.Strings(md.ActiveReplicas)

h := fnv.New32a()
h.Write([]byte(md.fullName()))
i := h.Sum32() % uint32(len(md.ActiveReplicas))
return md.ActiveReplicas[i] == md.ReplicaName, nil
}

// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination
// by examining replica information
type replicaDeterminer struct {
q querier
sf shardFunc
}

// newReplicaDeterminer returns a new shardDeterminer
func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer {
sd := &replicaDeterminer{
q: q,
sf: sf,
}
return sd
}

// getReplicaState obtains the local replication state through a query to `system.replicas`
func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) {
md := []tableReplicaMetadata{}
// TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2
query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table"
if err := rd.q.StructSelectContext(ctx, &md, query); err != nil {
return nil, fmt.Errorf("could not determine replication state: %w", err)
}

// Handle views and memory tables by putting in stand-in replication metadata
for i, entry := range md {
if entry.ReplicaName == "" && len(entry.ActiveReplicas) == 0 {
md[i].ReplicaName = "no-replicas"
md[i].ActiveReplicas = []string{"no-replicas"}
}
}
return md, nil
}

func (rd *replicaDeterminer) determineShards(ctx context.Context) (shardDetermination, error) {
md, err := rd.getReplicaState(ctx)
if err != nil {
return nil, err
}
sd := shardDetermination{}
for _, entry := range md {
assigned, err := rd.sf(&entry)
if err != nil {
return nil, err
}
sd[entry.fullName()] = assigned
}
return sd, nil
}
Loading

0 comments on commit f14d862

Please sign in to comment.