From 7a2b4a2a7a424ce156f7b83d7dc24290bda5b3b8 Mon Sep 17 00:00:00 2001 From: Slach Date: Fri, 1 Nov 2024 19:16:14 +0400 Subject: [PATCH] after drop table, before create table, will check if replica path already exists, and will try to, helpfull for restoring Replicated tables which not contains macros in replication parameters fix https://github.com/Altinity/clickhouse-backup/issues/849 --- ChangeLog.md | 4 +- ReadMe.md | 2 + pkg/backup/restore.go | 107 ++++++++++++++++++++++----- pkg/clickhouse/clickhouse.go | 19 +++++ pkg/config/config.go | 4 + test/integration/install_delve.sh | 2 +- test/integration/integration_test.go | 59 ++++++++++++++- 7 files changed, 174 insertions(+), 23 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index a52e0089..45fdc132 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,13 +1,15 @@ # v2.6.3 IMPROVEMENTS -- implement new format for *.state files boltdb +- implement new format for *.state2 files boltdb key value (please, check memory RSS usage) - clean resumable state if backup parameters changed, fix [840](https://github.com/Altinity/clickhouse-backup/issues/840) - switch to golang 1.23 - add `clickhouse_backup_local_data_size` metric as alias for `TotalBytesOfMergeTreeTablesm` from `system.asychnrous_metrics`, fix [573](https://github.com/Altinity/clickhouse-backup/issues/573) - API refactoring, query options with snake case, also allow with dash case. - add `--resume` parameter to `create` and `restore` command to avoid unnecessary copy object disk data fix [828](https://github.com/Altinity/clickhouse-backup/issues/828) + BUG FIXES +- after drop table, before create table, will check if replica path already exists, and will try to, helpfull for restoring Replicated tables which not contains macros in replication parameters fix [849](https://github.com/Altinity/clickhouse-backup/issues/849) - fix `TestLongListRemote` for properly time measurement - fix log_pointer handle from system.replicas during restore, fix [967](https://github.com/Altinity/clickhouse-backup/issues/967) - fix `use_embedded_backup_restore: true` behavior for azblob, fix [1031](https://github.com/Altinity/clickhouse-backup/issues/1031) diff --git a/ReadMe.md b/ReadMe.md index 95d84b96..c5b31407 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -188,6 +188,8 @@ clickhouse: restart_command: "exec:systemctl restart clickhouse-server" ignore_not_exists_error_during_freeze: true # CLICKHOUSE_IGNORE_NOT_EXISTS_ERROR_DURING_FREEZE, helps to avoid backup failures when running frequent CREATE / DROP tables and databases during backup, `clickhouse-backup` will ignore `code: 60` and `code: 81` errors during execution of `ALTER TABLE ... FREEZE` check_replicas_before_attach: true # CLICKHOUSE_CHECK_REPLICAS_BEFORE_ATTACH, helps avoiding concurrent ATTACH PART execution when restoring ReplicatedMergeTree tables + default_replica_path: "/clickhouse/tables/{cluster}/{shard}/{database}/{table}" # CLICKHOUSE_DEFAULT_REPLICA_PATH, will use during restore Replicated tables without macros in replication_path if replica already exists, to avoid restoring conflicts + default_replica_name: "{replica}" # CLICKHOUSE_DEFAULT_REPLICA_NAME, will use during restore Replicated tables without macros in replica_name if replica already exists, to avoid restoring conflicts use_embedded_backup_restore: false # CLICKHOUSE_USE_EMBEDDED_BACKUP_RESTORE, use BACKUP / RESTORE SQL statements instead of regular SQL queries to use features of modern ClickHouse server versions embedded_backup_disk: "" # CLICKHOUSE_EMBEDDED_BACKUP_DISK - disk from system.disks which will use when `use_embedded_backup_restore: true` backup_mutations: true # CLICKHOUSE_BACKUP_MUTATIONS, allow backup mutations from system.mutations WHERE is_done=0 and apply it during restore diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index 5e02e359..71a394c4 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -895,7 +895,7 @@ func (b *Backuper) RestoreSchema(ctx context.Context, backupName string, backupM if b.isEmbedded { restoreErr = b.restoreSchemaEmbedded(ctx, backupName, backupMetadata, disks, tablesForRestore, version) } else { - restoreErr = b.restoreSchemaRegular(tablesForRestore, version) + restoreErr = b.restoreSchemaRegular(ctx, tablesForRestore, version) } if restoreErr != nil { return restoreErr @@ -1078,7 +1078,7 @@ func (b *Backuper) fixEmbeddedMetadataSQLQuery(ctx context.Context, sqlBytes []b return sqlQuery, sqlMetadataChanged, nil } -func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version int) error { +func (b *Backuper) restoreSchemaRegular(ctx context.Context, tablesForRestore ListOfTables, version int) error { totalRetries := len(tablesForRestore) restoreRetries := 0 isDatabaseCreated := common.EmptyMap{} @@ -1095,23 +1095,14 @@ func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version i } } //materialized and window views should restore via ATTACH - schema.Query = strings.Replace( - schema.Query, "CREATE MATERIALIZED VIEW", "ATTACH MATERIALIZED VIEW", 1, - ) - schema.Query = strings.Replace( - schema.Query, "CREATE WINDOW VIEW", "ATTACH WINDOW VIEW", 1, - ) - schema.Query = strings.Replace( - schema.Query, "CREATE LIVE VIEW", "ATTACH LIVE VIEW", 1, - ) + b.replaceCreateToAttachForView(&schema) + // https://github.com/Altinity/clickhouse-backup/issues/849 + log.Info().Msgf("SUKA BEFORE!!! schema.Query=%s", schema.Query) + b.checkReplicaAlreadyExistsAndChangeReplicationPath(ctx, &schema) + log.Info().Msgf("SUKA AFTER!!! schema.Query=%s", schema.Query) + // https://github.com/Altinity/clickhouse-backup/issues/466 - if b.cfg.General.RestoreSchemaOnCluster == "" && strings.Contains(schema.Query, "{uuid}") && strings.Contains(schema.Query, "Replicated") { - if !strings.Contains(schema.Query, "UUID") { - log.Warn().Msgf("table query doesn't contains UUID, can't guarantee properly restore for ReplicatedMergeTree") - } else { - schema.Query = UUIDWithMergeTreeRE.ReplaceAllString(schema.Query, "$1$2$3'$4'$5$4$7") - } - } + b.replaceUUIDMacroValue(&schema) restoreErr = b.ch.CreateTable(clickhouse.Table{ Database: schema.Database, Name: schema.Table, @@ -1140,6 +1131,86 @@ func (b *Backuper) restoreSchemaRegular(tablesForRestore ListOfTables, version i return nil } +var replicatedParamsRE = regexp.MustCompile(`(Replicated[a-zA-Z]*MergeTree)\('([^']+)'(\s*,\s*)'([^']+)'\)|(Replicated[a-zA-Z]*MergeTree)\(\)`) +var replicatedUuidRE = regexp.MustCompile(` UUID '([^']+)'`) + +func (b *Backuper) checkReplicaAlreadyExistsAndChangeReplicationPath(ctx context.Context, schema *metadata.TableMetadata) { + if matches := replicatedParamsRE.FindAllStringSubmatch(schema.Query, -1); len(matches) > 0 { + var err error + if len(matches[0]) < 1 { + log.Warn().Msgf("can't find Replicated paramaters in %s", schema.Query) + return + } + shortSyntax := true + var engine, replicaPath, replicaName, delimiter string + if len(matches[0]) == 6 && matches[0][5] == "" { + shortSyntax = false + engine = matches[0][1] + replicaPath = matches[0][2] + delimiter = matches[0][3] + replicaName = matches[0][4] + } else { + engine = matches[0][4] + var settingsValues map[string]string + settingsValues, err = b.ch.GetSettingsValues(ctx, []interface{}{"default_replica_path", "default_replica_name"}) + if err != nil { + log.Fatal().Msgf("can't get from `system.settings` -> `default_replica_path`, `default_replica_name` error: %v", err) + } + replicaPath = settingsValues["default_replica_path"] + replicaName = settingsValues["default_replica_name"] + } + var resolvedReplicaPath, resolvedReplicaName string + if resolvedReplicaPath, err = b.ch.ApplyMacros(ctx, replicaPath); err != nil { + log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err) + } + if resolvedReplicaName, err = b.ch.ApplyMacros(ctx, replicaName); err != nil { + log.Fatal().Msgf("can't ApplyMacros to %s error: %v", replicaPath, err) + } + if matches = replicatedUuidRE.FindAllStringSubmatch(schema.Query, 1); len(matches) > 0 { + resolvedReplicaPath = strings.Replace(resolvedReplicaPath, "{uuid}", matches[0][1], -1) + } + + isReplicaPresent := uint64(0) + fullReplicaPath := path.Join(resolvedReplicaPath, "replicas", resolvedReplicaName) + if err = b.ch.SelectSingleRow(ctx, &isReplicaPresent, "SELECT count() FROM system.zookeeper WHERE path=?", fullReplicaPath); err != nil { + log.Fatal().Msgf("can't check replica %s in system.zookeeper error: %v", fullReplicaPath, err) + } + if isReplicaPresent == 0 { + return + } + newReplicaPath := b.cfg.ClickHouse.DefaultReplicaPath + newReplicaName := b.cfg.ClickHouse.DefaultReplicaName + log.Warn().Msgf("replica %s already exists in system.zookeeper will replace to %s", fullReplicaPath, path.Join(newReplicaPath, "replicas", newReplicaName)) + if shortSyntax { + schema.Query = strings.Replace(schema.Query, engine+"()", engine+"('"+newReplicaPath+"','"+newReplicaName+"')", 1) + } else { + schema.Query = strings.Replace(schema.Query, engine+"('"+replicaPath+"'"+delimiter+"'"+replicaName+"')", engine+"('"+newReplicaPath+"', '"+newReplicaName+"')", 1) + } + } +} + +func (b *Backuper) replaceUUIDMacroValue(schema *metadata.TableMetadata) { + if b.cfg.General.RestoreSchemaOnCluster == "" && strings.Contains(schema.Query, "{uuid}") && strings.Contains(schema.Query, "Replicated") { + if !strings.Contains(schema.Query, "UUID") { + log.Warn().Msgf("table query doesn't contains UUID, can't guarantee properly restore for ReplicatedMergeTree") + } else { + schema.Query = UUIDWithMergeTreeRE.ReplaceAllString(schema.Query, "$1$2$3'$4'$5$4$7") + } + } +} + +func (b *Backuper) replaceCreateToAttachForView(schema *metadata.TableMetadata) { + schema.Query = strings.Replace( + schema.Query, "CREATE MATERIALIZED VIEW", "ATTACH MATERIALIZED VIEW", 1, + ) + schema.Query = strings.Replace( + schema.Query, "CREATE WINDOW VIEW", "ATTACH WINDOW VIEW", 1, + ) + schema.Query = strings.Replace( + schema.Query, "CREATE LIVE VIEW", "ATTACH LIVE VIEW", 1, + ) +} + func (b *Backuper) dropExistsTables(tablesForDrop ListOfTables, ignoreDependencies bool, version int) error { var dropErr error dropRetries := 0 diff --git a/pkg/clickhouse/clickhouse.go b/pkg/clickhouse/clickhouse.go index b4cc43bb..be56c4f1 100644 --- a/pkg/clickhouse/clickhouse.go +++ b/pkg/clickhouse/clickhouse.go @@ -988,6 +988,7 @@ func (ch *ClickHouse) CreateTable(table Table, query string, dropTable, ignoreDe return err } + // CREATE if err := ch.Query(query); err != nil { return err } @@ -1295,6 +1296,24 @@ func (ch *ClickHouse) CheckTypesConsistency(table *Table, partColumnsDataTypes [ return nil } +func (ch *ClickHouse) GetSettingsValues(ctx context.Context, settings []interface{}) (map[string]string, error) { + settingsValues := make([]struct { + Name string `ch:"name"` + Value string `ch:"value"` + }, 0) + queryStr := "SELECT name, value FROM system.settings WHERE name IN (" + strings.Repeat("?, ", len(settings)) + queryStr = queryStr[:len(queryStr)-2] + queryStr += ")" + if err := ch.SelectContext(ctx, &settingsValues, queryStr, settings...); err != nil { + return nil, err + } + settingsValuesMap := map[string]string{} + for _, v := range settingsValues { + settingsValuesMap[v.Name] = v.Value + } + return settingsValuesMap, nil +} + func (ch *ClickHouse) CheckSettingsExists(ctx context.Context, settings map[string]bool) (map[string]bool, error) { isSettingsPresent := make([]struct { Name string `ch:"name"` diff --git a/pkg/config/config.go b/pkg/config/config.go index 1e1abc62..a3f6ae4f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -231,6 +231,8 @@ type ClickHouseConfig struct { RestartCommand string `yaml:"restart_command" envconfig:"CLICKHOUSE_RESTART_COMMAND"` IgnoreNotExistsErrorDuringFreeze bool `yaml:"ignore_not_exists_error_during_freeze" envconfig:"CLICKHOUSE_IGNORE_NOT_EXISTS_ERROR_DURING_FREEZE"` CheckReplicasBeforeAttach bool `yaml:"check_replicas_before_attach" envconfig:"CLICKHOUSE_CHECK_REPLICAS_BEFORE_ATTACH"` + DefaultReplicaPath string `yaml:"default_replica_path" envconfig:"CLICKHOUSE_DEFAULT_REPLICA_PATH"` + DefaultReplicaName string `yaml:"default_replica_name" envconfig:"CLICKHOUSE_DEFAULT_REPLICA_NAME"` TLSKey string `yaml:"tls_key" envconfig:"CLICKHOUSE_TLS_KEY"` TLSCert string `yaml:"tls_cert" envconfig:"CLICKHOUSE_TLS_CERT"` TLSCa string `yaml:"tls_ca" envconfig:"CLICKHOUSE_TLS_CA"` @@ -570,6 +572,8 @@ func DefaultConfig() *Config { BackupMutations: true, RestoreAsAttach: false, CheckPartsColumns: true, + DefaultReplicaPath: "/clickhouse/tables/{cluster}/{shard}/{database}/{table}", + DefaultReplicaName: "{replica}", MaxConnections: int(downloadConcurrency), }, AzureBlob: AzureBlobConfig{ diff --git a/test/integration/install_delve.sh b/test/integration/install_delve.sh index 90382e44..c36e09e6 100755 --- a/test/integration/install_delve.sh +++ b/test/integration/install_delve.sh @@ -37,6 +37,6 @@ CGO_ENABLED=0 GO111MODULE=on go install -ldflags "-s -w -extldflags '-static'" g # /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- -c /etc/clickhouse-backup/config-s3.yml download --partitions="test_partitions_TestIntegrationS3.t?:(0,'2022-01-02'),(0,'2022-01-03')" full_backup_5643339940028285692 # EMBEDDED_S3_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/config-s3-embedded.yml /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- upload TestIntegrationEmbedded_full_5990789107828261693 # S3_COMPRESSION_FORMAT=zstd CLICKHOUSE_BACKUP_CONFIG=/etc/clickhouse-backup/config-s3.yml /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- upload --resume TestIntegrationS3_full_8761350380051000966 - +# /root/go/bin/dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec /bin/clickhouse-backup -- -c /etc/clickhouse-backup/config-s3.yml restore --tables default.test_replica_wrong_path test_wrong_path diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index bf3a7054..9de8f487 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -14,7 +14,6 @@ import ( "path" "reflect" "regexp" - "slices" "strconv" "strings" "sync" @@ -449,7 +448,7 @@ var defaultIncrementData = []TestDataStruct{ } func NewTestEnvironment(t *testing.T) (*TestEnvironment, *require.Assertions) { - isParallel := os.Getenv("RUN_PARALLEL") != "1" && slices.Index([]string{"TestLongListRemote" /*,"TestIntegrationAzure"*/}, t.Name()) == -1 + isParallel := os.Getenv("RUN_PARALLEL") != "1" if os.Getenv("COMPOSE_FILE") == "" || os.Getenv("CUR_DIR") == "" { t.Fatal("please setup COMPOSE_FILE and CUR_DIR environment variables") } @@ -507,7 +506,6 @@ func (env *TestEnvironment) Cleanup(t *testing.T, r *require.Assertions) { var listTimeMsRE = regexp.MustCompile(`list_duration=(\d+.\d+)`) -// TestLongListRemote - no parallel, cause need to restart minio func TestLongListRemote(t *testing.T) { env, r := NewTestEnvironment(t) env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute) @@ -573,6 +571,61 @@ func TestLongListRemote(t *testing.T) { env.Cleanup(t, r) } +func TestChangeReplicationPathIfReplicaExists(t *testing.T) { + env, r := NewTestEnvironment(t) + env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute) + version, err := env.ch.GetVersion(context.Background()) + r.NoError(err) + createReplicatedTable := func(table, uuid, engine string) string { + createSQL := fmt.Sprintf("CREATE TABLE default.%s %s ON CLUSTER '{cluster}' (id UInt64) ENGINE=ReplicatedMergeTree(%s) ORDER BY id", table, uuid, engine) + env.queryWithNoError(r, createSQL) + env.queryWithNoError(r, fmt.Sprintf("INSERT INTO default.%s SELECT number FROM numbers(10)", table)) + return createSQL + } + createUUID := uuid.New() + createSQL := createReplicatedTable("test_replica_wrong_path", "", "'/clickhouse/tables/wrong_path','{replica}'") + createWithUUIDSQL := createReplicatedTable("test_replica_wrong_path_uuid", fmt.Sprintf(" UUID '%s' ", createUUID.String()), "") + r.NoError(env.DockerExec("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "create", "--tables", "default.test_replica_wrong_path*", "test_wrong_path")) + + r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path"}, createSQL, "", false, version, "")) + r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path_uuid"}, createWithUUIDSQL, "", false, version, "")) + + // hack for drop tables without drop data from keeper + _ = createReplicatedTable("test_replica_wrong_path2", "", "'/clickhouse/tables/wrong_path','{replica}'") + _ = createReplicatedTable("test_replica_wrong_path_uuid2", fmt.Sprintf(" UUID '%s' ", createUUID.String()), "") + r.NoError(env.DockerExec("clickhouse", "rm", "-fv", "/var/lib/clickhouse/metadata/default/test_replica_wrong_path2.sql")) + r.NoError(env.DockerExec("clickhouse", "rm", "-fv", "/var/lib/clickhouse/metadata/default/test_replica_wrong_path_uuid2.sql")) + r.NoError(env.DockerExec("clickhouse", "rm", "-rfv", fmt.Sprintf("/var/lib/clickhouse/store/%s/%s", createUUID.String()[:3], createUUID.String()))) + env.ch.Close() + r.NoError(utils.ExecCmd(context.Background(), 180*time.Second, "docker", append(env.GetDefaultComposeCommand(), "restart", "clickhouse")...)) + env.connectWithWait(r, 0*time.Second, 1*time.Second, 1*time.Minute) + + var restoreOut string + restoreOut, err = env.DockerExecOut("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "restore", "--tables", "default.test_replica_wrong_path*", "test_wrong_path") + log.Debug().Msg(restoreOut) + r.NoError(err) + r.Contains(restoreOut, "replica /clickhouse/tables/wrong_path/replicas/clickhouse already exists in system.zookeeper will replace to /clickhouse/tables/{cluster}/{shard}/{database}/{table}/replicas/{replica}") + r.Contains(restoreOut, fmt.Sprintf("replica /clickhouse/tables/%s/0/replicas/clickhouse already exists in system.zookeeper will replace to /clickhouse/tables/{cluster}/{shard}/{database}/{table}/replicas/{replica}", createUUID.String())) + + checkRestoredTable := func(table string, expectedRows uint64, expectedEngine string) { + rows := uint64(0) + r.NoError(env.ch.SelectSingleRowNoCtx(&rows, fmt.Sprintf("SELECT count() FROM default.%s", table))) + r.Equal(expectedRows, rows) + + engineFull := "" + r.NoError(env.ch.SelectSingleRowNoCtx(&engineFull, "SELECT engine_full FROM system.tables WHERE database=? AND table=?", "default", table)) + r.Contains(engineFull, expectedEngine) + + } + checkRestoredTable("test_replica_wrong_path", 10, "/clickhouse/tables/{cluster}/{shard}/default/test_replica_wrong_path") + checkRestoredTable("test_replica_wrong_path_uuid", 10, "/clickhouse/tables/{cluster}/{shard}/default/test_replica_wrong_path_uuid") + + r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path"}, createSQL, "", false, version, "")) + r.NoError(env.ch.DropTable(clickhouse.Table{Database: "default", Name: "test_replica_wrong_path_uuid"}, createWithUUIDSQL, "", false, version, "")) + + r.NoError(env.DockerExec("clickhouse-backup", "clickhouse-backup", "-c", "/etc/clickhouse-backup/config-s3.yml", "delete", "local", "test_wrong_path")) +} + func TestIntegrationEmbedded(t *testing.T) { version := os.Getenv("CLICKHOUSE_VERSION") if compareVersion(version, "23.3") < 0 {