Skip to content

Commit

Permalink
after drop table, before create table, will check if replica path alr…
Browse files Browse the repository at this point in the history
…eady exists, and will try to, helpfull for restoring Replicated tables which not contains macros in replication parameters fix #849
  • Loading branch information
Slach committed Nov 1, 2024
1 parent 54f96dc commit 7a2b4a2
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 23 deletions.
4 changes: 3 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# v2.6.3
IMPROVEMENTS
- implement new format for *.state files boltdb
- implement new format for *.state2 files boltdb key value (please, check memory RSS usage)
- clean resumable state if backup parameters changed, fix [840](https://github.com/Altinity/clickhouse-backup/issues/840)
- switch to golang 1.23
- add `clickhouse_backup_local_data_size` metric as alias for `TotalBytesOfMergeTreeTablesm` from `system.asychnrous_metrics`, fix [573](https://github.com/Altinity/clickhouse-backup/issues/573)
- API refactoring, query options with snake case, also allow with dash case.
- add `--resume` parameter to `create` and `restore` command to avoid unnecessary copy object disk data fix [828](https://github.com/Altinity/clickhouse-backup/issues/828)


BUG FIXES
- after drop table, before create table, will check if replica path already exists, and will try to, helpfull for restoring Replicated tables which not contains macros in replication parameters fix [849](https://github.com/Altinity/clickhouse-backup/issues/849)
- fix `TestLongListRemote` for properly time measurement
- fix log_pointer handle from system.replicas during restore, fix [967](https://github.com/Altinity/clickhouse-backup/issues/967)
- fix `use_embedded_backup_restore: true` behavior for azblob, fix [1031](https://github.com/Altinity/clickhouse-backup/issues/1031)
Expand Down
2 changes: 2 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ clickhouse:
restart_command: "exec:systemctl restart clickhouse-server"
ignore_not_exists_error_during_freeze: true # CLICKHOUSE_IGNORE_NOT_EXISTS_ERROR_DURING_FREEZE, helps to avoid backup failures when running frequent CREATE / DROP tables and databases during backup, `clickhouse-backup` will ignore `code: 60` and `code: 81` errors during execution of `ALTER TABLE ... FREEZE`
check_replicas_before_attach: true # CLICKHOUSE_CHECK_REPLICAS_BEFORE_ATTACH, helps avoiding concurrent ATTACH PART execution when restoring ReplicatedMergeTree tables
default_replica_path: "/clickhouse/tables/{cluster}/{shard}/{database}/{table}" # CLICKHOUSE_DEFAULT_REPLICA_PATH, will use during restore Replicated tables without macros in replication_path if replica already exists, to avoid restoring conflicts
default_replica_name: "{replica}" # CLICKHOUSE_DEFAULT_REPLICA_NAME, will use during restore Replicated tables without macros in replica_name if replica already exists, to avoid restoring conflicts
use_embedded_backup_restore: false # CLICKHOUSE_USE_EMBEDDED_BACKUP_RESTORE, use BACKUP / RESTORE SQL statements instead of regular SQL queries to use features of modern ClickHouse server versions
embedded_backup_disk: "" # CLICKHOUSE_EMBEDDED_BACKUP_DISK - disk from system.disks which will use when `use_embedded_backup_restore: true`
backup_mutations: true # CLICKHOUSE_BACKUP_MUTATIONS, allow backup mutations from system.mutations WHERE is_done=0 and apply it during restore
Expand Down
107 changes: 89 additions & 18 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (b *Backuper) RestoreSchema(ctx context.Context, backupName string, backupM
if b.isEmbedded {
restoreErr = b.restoreSchemaEmbedded(ctx, backupName, backupMetadata, disks, tablesForRestore, version)
} else {
restoreErr = b.restoreSchemaRegular(tablesForRestore, version)
restoreErr = b.restoreSchemaRegular(ctx, tablesForRestore, version)
}
if restoreErr != nil {
return restoreErr
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func (b *Backuper) fixEmbeddedMetadataSQLQuery(ctx context.Context, sqlBytes []b
return sqlQuery, sqlMetadataChanged, nil
}

func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version int) error {
func (b *Backuper) restoreSchemaRegular(ctx context.Context, tablesForRestore ListOfTables, version int) error {
totalRetries := len(tablesForRestore)
restoreRetries := 0
isDatabaseCreated := common.EmptyMap{}
Expand All @@ -1095,23 +1095,14 @@ func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version i
}
}
//materialized and window views should restore via ATTACH
schema.Query = strings.Replace(
schema.Query, "CREATE MATERIALIZED VIEW", "ATTACH MATERIALIZED VIEW", 1,
)
schema.Query = strings.Replace(
schema.Query, "CREATE WINDOW VIEW", "ATTACH WINDOW VIEW", 1,
)
schema.Query = strings.Replace(
schema.Query, "CREATE LIVE VIEW", "ATTACH LIVE VIEW", 1,
)
b.replaceCreateToAttachForView(&schema)
// https://github.com/Altinity/clickhouse-backup/issues/849
log.Info().Msgf("SUKA BEFORE!!! schema.Query=%s", schema.Query)
b.checkReplicaAlreadyExistsAndChangeReplicationPath(ctx, &schema)
log.Info().Msgf("SUKA AFTER!!! schema.Query=%s", schema.Query)

// https://github.com/Altinity/clickhouse-backup/issues/466
if b.cfg.General.RestoreSchemaOnCluster == "" && strings.Contains(schema.Query, "{uuid}") && strings.Contains(schema.Query, "Replicated") {
if !strings.Contains(schema.Query, "UUID") {
log.Warn().Msgf("table query doesn't contains UUID, can't guarantee properly restore for ReplicatedMergeTree")
} else {
schema.Query = UUIDWithMergeTreeRE.ReplaceAllString(schema.Query, "$1$2$3'$4'$5$4$7")
}
}
b.replaceUUIDMacroValue(&schema)
restoreErr = b.ch.CreateTable(clickhouse.Table{
Database: schema.Database,
Name: schema.Table,
Expand Down Expand Up @@ -1140,6 +1131,86 @@ func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version i
return nil
}

var replicatedParamsRE = regexp.MustCompile(`(Replicated[a-zA-Z]*MergeTree)\('([^']+)'(\s*,\s*)'([^']+)'\)|(Replicated[a-zA-Z]*MergeTree)\(\)`)
var replicatedUuidRE = regexp.MustCompile(` UUID '([^']+)'`)

func (b *Backuper) checkReplicaAlreadyExistsAndChangeReplicationPath(ctx context.Context, schema *metadata.TableMetadata) {
if matches := replicatedParamsRE.FindAllStringSubmatch(schema.Query, -1); len(matches) > 0 {
var err error
if len(matches[0]) < 1 {
log.Warn().Msgf("can't find Replicated paramaters in %s", schema.Query)
return
}
shortSyntax := true
var engine, replicaPath, replicaName, delimiter string
if len(matches[0]) == 6 && matches[0][5] == "" {
shortSyntax = false
engine = matches[0][1]
replicaPath = matches[0][2]
delimiter = matches[0][3]
replicaName = matches[0][4]
} else {
engine = matches[0][4]
var settingsValues map[string]string
settingsValues, err = b.ch.GetSettingsValues(ctx, []interface{}{"default_replica_path", "default_replica_name"})
if err != nil {
log.Fatal().Msgf("can't get from `system.settings` -> `default_replica_path`, `default_replica_name` error: %v", err)
}
replicaPath = settingsValues["default_replica_path"]
replicaName = settingsValues["default_replica_name"]
}
var resolvedReplicaPath, resolvedReplicaName string
if resolvedReplicaPath, err = b.ch.ApplyMacros(ctx, replicaPath); err != nil {
log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
}
if resolvedReplicaName, err = b.ch.ApplyMacros(ctx, replicaName); err != nil {
log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err)
}
if matches = replicatedUuidRE.FindAllStringSubmatch(schema.Query, 1); len(matches) > 0 {
resolvedReplicaPath = strings.Replace(resolvedReplicaPath, "{uuid}", matches[0][1], -1)
}

isReplicaPresent := uint64(0)
fullReplicaPath := path.Join(resolvedReplicaPath, "replicas", resolvedReplicaName)
if err = b.ch.SelectSingleRow(ctx, &isReplicaPresent, "SELECT count() FROM system.zookeeper WHERE path=?", fullReplicaPath); err != nil {
log.Fatal().Msgf("can't check replica %s in system.zookeeper error: %v", fullReplicaPath, err)
}
if isReplicaPresent == 0 {
return
}
newReplicaPath := b.cfg.ClickHouse.DefaultReplicaPath
newReplicaName := b.cfg.ClickHouse.DefaultReplicaName
log.Warn().Msgf("replica %s already exists in system.zookeeper will replace to %s", fullReplicaPath, path.Join(newReplicaPath, "replicas", newReplicaName))
if shortSyntax {
schema.Query = strings.Replace(schema.Query, engine+"()", engine+"('"+newReplicaPath+"','"+newReplicaName+"')", 1)
} else {
schema.Query = strings.Replace(schema.Query, engine+"('"+replicaPath+"'"+delimiter+"'"+replicaName+"')", engine+"('"+newReplicaPath+"', '"+newReplicaName+"')", 1)
}
}
}

func (b *Backuper) replaceUUIDMacroValue(schema *metadata.TableMetadata) {
if b.cfg.General.RestoreSchemaOnCluster == "" && strings.Contains(schema.Query, "{uuid}") && strings.Contains(schema.Query, "Replicated") {
if !strings.Contains(schema.Query, "UUID") {
log.Warn().Msgf("table query doesn't contains UUID, can't guarantee properly restore for ReplicatedMergeTree")
} else {
schema.Query = UUIDWithMergeTreeRE.ReplaceAllString(schema.Query, "$1$2$3'$4'$5$4$7")
}
}
}

func (b *Backuper) replaceCreateToAttachForView(schema *metadata.TableMetadata) {
schema.Query = strings.Replace(
schema.Query, "CREATE MATERIALIZED VIEW", "ATTACH MATERIALIZED VIEW", 1,
)
schema.Query = strings.Replace(
schema.Query, "CREATE WINDOW VIEW", "ATTACH WINDOW VIEW", 1,
)
schema.Query = strings.Replace(
schema.Query, "CREATE LIVE VIEW", "ATTACH LIVE VIEW", 1,
)
}

func (b *Backuper) dropExistsTables(tablesForDrop ListOfTables, ignoreDependencies bool, version int) error {
var dropErr error
dropRetries := 0
Expand Down
19 changes: 19 additions & 0 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ func (ch *ClickHouse) CreateTable(table Table, query string, dropTable, ignoreDe
return err
}

// CREATE
if err := ch.Query(query); err != nil {
return err
}
Expand Down Expand Up @@ -1295,6 +1296,24 @@ func (ch *ClickHouse) CheckTypesConsistency(table *Table, partColumnsDataTypes [
return nil
}

func (ch *ClickHouse) GetSettingsValues(ctx context.Context, settings []interface{}) (map[string]string, error) {
settingsValues := make([]struct {
Name string `ch:"name"`
Value string `ch:"value"`
}, 0)
queryStr := "SELECT name, value FROM system.settings WHERE name IN (" + strings.Repeat("?, ", len(settings))
queryStr = queryStr[:len(queryStr)-2]
queryStr += ")"
if err := ch.SelectContext(ctx, &settingsValues, queryStr, settings...); err != nil {
return nil, err
}
settingsValuesMap := map[string]string{}
for _, v := range settingsValues {
settingsValuesMap[v.Name] = v.Value
}
return settingsValuesMap, nil
}

func (ch *ClickHouse) CheckSettingsExists(ctx context.Context, settings map[string]bool) (map[string]bool, error) {
isSettingsPresent := make([]struct {
Name string `ch:"name"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ type ClickHouseConfig struct {
RestartCommand string `yaml:"restart_command" envconfig:"CLICKHOUSE_RESTART_COMMAND"`
IgnoreNotExistsErrorDuringFreeze bool `yaml:"ignore_not_exists_error_during_freeze" envconfig:"CLICKHOUSE_IGNORE_NOT_EXISTS_ERROR_DURING_FREEZE"`
CheckReplicasBeforeAttach bool `yaml:"check_replicas_before_attach" envconfig:"CLICKHOUSE_CHECK_REPLICAS_BEFORE_ATTACH"`
DefaultReplicaPath string `yaml:"default_replica_path" envconfig:"CLICKHOUSE_DEFAULT_REPLICA_PATH"`
DefaultReplicaName string `yaml:"default_replica_name" envconfig:"CLICKHOUSE_DEFAULT_REPLICA_NAME"`
TLSKey string `yaml:"tls_key" envconfig:"CLICKHOUSE_TLS_KEY"`
TLSCert string `yaml:"tls_cert" envconfig:"CLICKHOUSE_TLS_CERT"`
TLSCa string `yaml:"tls_ca" envconfig:"CLICKHOUSE_TLS_CA"`
Expand Down Expand Up @@ -570,6 +572,8 @@ func DefaultConfig() *Config {
BackupMutations: true,
RestoreAsAttach: false,
CheckPartsColumns: true,
DefaultReplicaPath: "/clickhouse/tables/{cluster}/{shard}/{database}/{table}",
DefaultReplicaName: "{replica}",
MaxConnections: int(downloadConcurrency),
},
AzureBlob: AzureBlobConfig{
Expand Down
2 changes: 1 addition & 1 deletion test/integration/install_delve.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ CGO_ENABLED=0 GO111MODULE=on go install -ldflags "-s -w -extldflags '-static'" g
# /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- -c /etc/clickhouse-backup/config-s3.yml download --partitions="test_partitions_TestIntegrationS3.t?:(0,'2022-01-02'),(0,'2022-01-03')" full_backup_5643339940028285692
# EMBEDDED_S3_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/config-s3-embedded.yml /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- upload TestIntegrationEmbedded_full_5990789107828261693
# S3_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/config-s3.yml /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- upload --resume TestIntegrationS3_full_8761350380051000966

# /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- -c /etc/clickhouse-backup/config-s3.yml restore --tables default.test_replica_wrong_path test_wrong_path


59 changes: 56 additions & 3 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"path"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -449,7 +448,7 @@ var defaultIncrementData = []TestDataStruct{
}

func NewTestEnvironment(t *testing.T) (*TestEnvironment, *require.Assertions) {
isParallel := os.Getenv("RUN_PARALLEL") != "1" && slices.Index([]string{"TestLongListRemote" /*,"TestIntegrationAzure"*/}, t.Name()) == -1
isParallel := os.Getenv("RUN_PARALLEL") != "1"
if os.Getenv("COMPOSE_FILE") == "" || os.Getenv("CUR_DIR") == "" {
t.Fatal("please setup COMPOSE_FILE and CUR_DIR environment variables")
}
Expand Down Expand Up @@ -507,7 +506,6 @@ func (env *TestEnvironment) Cleanup(t *testing.T, r *require.Assertions) {

var listTimeMsRE = regexp.MustCompile(`list_duration=(\d+.\d+)`)

// TestLongListRemote - no parallel, cause need to restart minio
func TestLongListRemote(t *testing.T) {
env, r := NewTestEnvironment(t)
env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute)
Expand Down Expand Up @@ -573,6 +571,61 @@ func TestLongListRemote(t *testing.T) {
env.Cleanup(t, r)
}

func TestChangeReplicationPathIfReplicaExists(t *testing.T) {
env, r := NewTestEnvironment(t)
env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute)
version, err := env.ch.GetVersion(context.Background())
r.NoError(err)
createReplicatedTable := func(table, uuid, engine string) string {
createSQL := fmt.Sprintf("CREATE TABLE default.%s %s ON CLUSTER '{cluster}' (id UInt64) ENGINE=ReplicatedMergeTree(%s) ORDER BY id", table, uuid, engine)
env.queryWithNoError(r, createSQL)
env.queryWithNoError(r, fmt.Sprintf("INSERT INTO default.%s SELECT number FROM numbers(10)", table))
return createSQL
}
createUUID := uuid.New()
createSQL := createReplicatedTable("test_replica_wrong_path", "", "'/clickhouse/tables/wrong_path','{replica}'")
createWithUUIDSQL := createReplicatedTable("test_replica_wrong_path_uuid", fmt.Sprintf(" UUID '%s' ", createUUID.String()), "")
r.NoError(env.DockerExec("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "create", "--tables", "default.test_replica_wrong_path*", "test_wrong_path"))

r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path"}, createSQL, "", false, version, ""))
r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path_uuid"}, createWithUUIDSQL, "", false, version, ""))

// hack for drop tables without drop data from keeper
_ = createReplicatedTable("test_replica_wrong_path2", "", "'/clickhouse/tables/wrong_path','{replica}'")
_ = createReplicatedTable("test_replica_wrong_path_uuid2", fmt.Sprintf(" UUID '%s' ", createUUID.String()), "")
r.NoError(env.DockerExec("clickhouse", "rm", "-fv", "/var/lib/clickhouse/metadata/default/test_replica_wrong_path2.sql"))
r.NoError(env.DockerExec("clickhouse", "rm", "-fv", "/var/lib/clickhouse/metadata/default/test_replica_wrong_path_uuid2.sql"))
r.NoError(env.DockerExec("clickhouse", "rm", "-rfv", fmt.Sprintf("/var/lib/clickhouse/store/%s/%s", createUUID.String()[:3], createUUID.String())))
env.ch.Close()
r.NoError(utils.ExecCmd(context.Background(), 180*time.Second, "docker", append(env.GetDefaultComposeCommand(), "restart", "clickhouse")...))
env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute)

var restoreOut string
restoreOut, err = env.DockerExecOut("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "restore", "--tables", "default.test_replica_wrong_path*", "test_wrong_path")
log.Debug().Msg(restoreOut)
r.NoError(err)
r.Contains(restoreOut, "replica /clickhouse/tables/wrong_path/replicas/clickhouse already exists in system.zookeeper will replace to /clickhouse/tables/{cluster}/{shard}/{database}/{table}/replicas/{replica}")
r.Contains(restoreOut, fmt.Sprintf("replica /clickhouse/tables/%s/0/replicas/clickhouse already exists in system.zookeeper will replace to /clickhouse/tables/{cluster}/{shard}/{database}/{table}/replicas/{replica}", createUUID.String()))

checkRestoredTable := func(table string, expectedRows uint64, expectedEngine string) {
rows := uint64(0)
r.NoError(env.ch.SelectSingleRowNoCtx(&rows, fmt.Sprintf("SELECT count() FROM default.%s", table)))
r.Equal(expectedRows, rows)

engineFull := ""
r.NoError(env.ch.SelectSingleRowNoCtx(&engineFull, "SELECT engine_full FROM system.tables WHERE database=? AND table=?", "default", table))
r.Contains(engineFull, expectedEngine)

}
checkRestoredTable("test_replica_wrong_path", 10, "/clickhouse/tables/{cluster}/{shard}/default/test_replica_wrong_path")
checkRestoredTable("test_replica_wrong_path_uuid", 10, "/clickhouse/tables/{cluster}/{shard}/default/test_replica_wrong_path_uuid")

r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path"}, createSQL, "", false, version, ""))
r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path_uuid"}, createWithUUIDSQL, "", false, version, ""))

r.NoError(env.DockerExec("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "delete", "local", "test_wrong_path"))
}

func TestIntegrationEmbedded(t *testing.T) {
version := os.Getenv("CLICKHOUSE_VERSION")
if compareVersion(version, "23.3") < 0 {
Expand Down

0 comments on commit 7a2b4a2

Please sign in to comment.