Skip to content

Commit

Permalink
refactored to fix corner case for backup size calculation for Object …
Browse files Browse the repository at this point in the history
…disks and Embedded backup, set consistent algorithm for CLI and API `list` command behavior
  • Loading branch information
Slach committed Jun 3, 2024
1 parent c5b2032 commit eb33790
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 163 deletions.
4 changes: 3 additions & 1 deletion ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# v2.5.12

BUG FIXES
- fixed corner case in `API server` hang when `watch` background command failures, fix [929](https://github.com/Altinity/clickhouse-backup/pull/929) thanks @tadus21
- remove requirement `compression: none` for `use_embedded_backup_restore: true`
- removed requirement `compression: none` for `use_embedded_backup_restore: true`
- refactored to fix corner case for backup size calculation for Object disks and Embedded backup, set consistent algorithm for CLI and API `list` command behavior

# v2.5.11
BUG FIXES
Expand Down
88 changes: 51 additions & 37 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
}
}

var backupDataSize, backupMetadataSize uint64
var backupDataSize, backupObjectDiskSize, backupMetadataSize uint64
var metaMutex sync.Mutex
createBackupWorkingGroup, createCtx := errgroup.WithContext(ctx)
createBackupWorkingGroup.SetLimit(b.cfg.ClickHouse.MaxConnections)
Expand All @@ -263,13 +263,13 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
idx := tableIdx
createBackupWorkingGroup.Go(func() error {
log := log.WithField("table", fmt.Sprintf("%s.%s", table.Database, table.Name))
var realSize map[string]int64
var realSize, objectDiskSize map[string]int64
var disksToPartsMap map[string][]metadata.Part
if doBackupData && table.BackupType == clickhouse.ShardBackupFull {
log.Debug("create data")
shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "")
var addTableToBackupErr error
disksToPartsMap, realSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
disksToPartsMap, realSize, objectDiskSize, addTableToBackupErr = b.AddTableToLocalBackup(createCtx, backupName, tablesDiffFromRemote, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
if addTableToBackupErr != nil {
log.Errorf("b.AddTableToLocalBackup error: %v", addTableToBackupErr)
return addTableToBackupErr
Expand All @@ -278,6 +278,9 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
for _, size := range realSize {
atomic.AddUint64(&backupDataSize, uint64(size))
}
for _, size := range objectDiskSize {
atomic.AddUint64(&backupObjectDiskSize, uint64(size))
}
}
// https://github.com/Altinity/clickhouse-backup/issues/529
log.Debug("get in progress mutations list")
Expand Down Expand Up @@ -323,7 +326,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
}

backupMetaFile := path.Join(b.DefaultDataPath, "backup", backupName, "metadata.json")
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, diffFromRemote, backupVersion, "regular", diskMap, diskTypes, disks, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize, tableMetas, allDatabases, allFunctions, log); err != nil {
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, diffFromRemote, backupVersion, "regular", diskMap, diskTypes, disks, backupDataSize, backupObjectDiskSize, backupMetadataSize, backupRBACSize, backupConfigSize, tableMetas, allDatabases, allFunctions, log); err != nil {
return fmt.Errorf("createBackupMetadata return error: %v", err)
}
log.WithField("version", backupVersion).WithField("duration", utils.HumanizeDuration(time.Since(startBackup))).Info("done")
Expand Down Expand Up @@ -369,7 +372,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
}
}

tableSizeSQL, backupSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup)
backupSQL, tablesSizeSQL, err := b.generateEmbeddedBackupSQL(ctx, backupName, schemaOnly, tables, tablesTitle, partitionsNameList, l, baseBackup)
if err != nil {
return err
}
Expand All @@ -385,14 +388,17 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
backupDataSize = append(backupDataSize, clickhouse.BackupDataSize{Size: 0})
} else {
if backupResult[0].CompressedSize == 0 {
chVersion, err := b.ch.GetVersion(ctx)
if err != nil {
return err
}
backupSizeSQL := fmt.Sprintf("SELECT sum(bytes_on_disk) AS backup_data_size FROM system.parts WHERE active AND concat(database,'.',table) IN (%s)", tableSizeSQL)
if chVersion >= 20005000 {
backupSizeSQL = fmt.Sprintf("SELECT sum(total_bytes) AS backup_data_size FROM system.tables WHERE concat(database,'.',name) IN (%s)", tableSizeSQL)
backupSizeSQL := "SELECT sum(bytes_on_disk) AS backup_data_size FROM system.parts WHERE active AND ("
for _, t := range tables {
if oneTableSizeSQL, exists := tablesSizeSQL[metadata.TableTitle{Database: t.Database, Table: t.Name}]; exists {
if strings.HasPrefix(oneTableSizeSQL, fmt.Sprintf("'%s::%s::", t.Database, t.Name)) {
backupSizeSQL += fmt.Sprintf(" concat(database,'::',table,'::',partition) IN (%s) OR ", oneTableSizeSQL)
} else {
backupSizeSQL += fmt.Sprintf(" concat(database,'::',table) IN (%s) OR ", tablesSizeSQL[metadata.TableTitle{Database: t.Database, Table: t.Name}])
}
}
}
backupSizeSQL = backupSizeSQL[:len(backupSizeSQL)-4] + ")"
if err := b.ch.SelectContext(ctx, &backupDataSize, backupSizeSQL); err != nil {
return err
}
Expand Down Expand Up @@ -456,7 +462,7 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
}
}
backupMetaFile := path.Join(backupPath, "metadata.json")
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, baseBackup, backupVersion, "embedded", diskMap, diskTypes, disks, backupDataSize[0].Size, backupMetadataSize, backupRBACSize, backupConfigSize, tablesTitle, allDatabases, allFunctions, log); err != nil {
if err := b.createBackupMetadata(ctx, backupMetaFile, backupName, baseBackup, backupVersion, "embedded", diskMap, diskTypes, disks, backupDataSize[0].Size, 0, backupMetadataSize, backupRBACSize, backupConfigSize, tablesTitle, allDatabases, allFunctions, log); err != nil {
return err
}

Expand All @@ -469,9 +475,9 @@ func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, baseBac
return nil
}

func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string) (string, string, error) {
func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName string, schemaOnly bool, tables []clickhouse.Table, tablesTitle []metadata.TableTitle, partitionsNameList map[metadata.TableTitle][]string, tablesListLen int, baseBackup string) (string, map[metadata.TableTitle]string, error) {
tablesSQL := ""
tableSizeSQL := ""
tableSizeSQL := map[metadata.TableTitle]string{}
i := 0
for _, table := range tables {
if table.Skip {
Expand All @@ -484,11 +490,13 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str
i += 1

tablesSQL += "TABLE `" + table.Database + "`.`" + table.Name + "`"
tableSizeSQL += "'" + table.Database + "." + table.Name + "'"
fullTableNameForTableSizeSQL := table.Database + "::" + table.Name
tableSizeSQLOneTable := ""
if nameList, exists := partitionsNameList[metadata.TableTitle{Database: table.Database, Table: table.Name}]; exists && len(nameList) > 0 {
partitionsSQL := ""
for _, partitionName := range nameList {
if partitionName != "*" {
tableSizeSQLOneTable += "'" + fullTableNameForTableSizeSQL + "::" + partitionName + "',"
if strings.HasPrefix(partitionName, "(") {
partitionsSQL += partitionName + ","
} else {
Expand All @@ -497,15 +505,21 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str
}
}
tablesSQL += fmt.Sprintf(" PARTITIONS %s", partitionsSQL[:len(partitionsSQL)-1])
if tableSizeSQLOneTable != "" {
tableSizeSQLOneTable = tableSizeSQLOneTable[:len(tableSizeSQLOneTable)-1]
}
}
if tableSizeSQLOneTable == "" {
tableSizeSQLOneTable = "'" + fullTableNameForTableSizeSQL + "'"
}
tableSizeSQL[metadata.TableTitle{Database: table.Database, Table: table.Name}] = tableSizeSQLOneTable
if i < tablesListLen {
tablesSQL += ", "
tableSizeSQL += ", "
}
}
embeddedBackupLocation, err := b.getEmbeddedBackupLocation(ctx, backupName)
if err != nil {
return "", "", err
return "", nil, err
}
backupSQL := fmt.Sprintf("BACKUP %s TO %s", tablesSQL, embeddedBackupLocation)
var backupSettings []string
Expand All @@ -519,14 +533,14 @@ func (b *Backuper) generateEmbeddedBackupSQL(ctx context.Context, backupName str
if baseBackup != "" {
baseBackup, err = b.getEmbeddedBackupLocation(ctx, baseBackup)
if err != nil {
return "", "", err
return "", nil, err
}
backupSettings = append(backupSettings, "base_backup="+baseBackup)
}
if len(backupSettings) > 0 {
backupSQL += " SETTINGS " + strings.Join(backupSettings, ", ")
}
return tableSizeSQL, backupSQL, nil
return backupSQL, tableSizeSQL, nil
}

func (b *Backuper) getPartsFromRemoteEmbeddedBackup(ctx context.Context, backupName string, table clickhouse.Table, partitionsIdsMap common.EmptyMap, log *apexLog.Entry) (map[string][]metadata.Part, error) {
Expand Down Expand Up @@ -714,43 +728,44 @@ func (b *Backuper) createBackupRBACReplicated(ctx context.Context, rbacBackup st
return rbacDataSize, nil
}

func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap) (map[string][]metadata.Part, map[string]int64, error) {
func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string, tablesDiffFromRemote map[metadata.TableTitle]metadata.TableMetadata, shadowBackupUUID string, diskList []clickhouse.Disk, table *clickhouse.Table, partitionsIdsMap common.EmptyMap) (map[string][]metadata.Part, map[string]int64, map[string]int64, error) {
log := b.log.WithFields(apexLog.Fields{
"backup": backupName,
"operation": "create",
"table": fmt.Sprintf("%s.%s", table.Database, table.Name),
})
if backupName == "" {
return nil, nil, fmt.Errorf("backupName is not defined")
return nil, nil, nil, fmt.Errorf("backupName is not defined")
}

if !strings.HasSuffix(table.Engine, "MergeTree") && table.Engine != "MaterializedMySQL" && table.Engine != "MaterializedPostgreSQL" {
if table.Engine != "MaterializedView" {
log.WithField("engine", table.Engine).Warnf("supports only schema backup")
}
return nil, nil, nil
return nil, nil, nil, nil
}
if b.cfg.ClickHouse.CheckPartsColumns {
if err := b.ch.CheckSystemPartsColumns(ctx, table); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}
// backup data
if err := b.ch.FreezeTable(ctx, table, shadowBackupUUID); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
log.Debug("frozen")
version, err := b.ch.GetVersion(ctx)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
realSize := map[string]int64{}
objectDiskSize := map[string]int64{}
disksToPartsMap := map[string][]metadata.Part{}

for _, disk := range diskList {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
return nil, nil, nil, ctx.Err()
default:
shadowPath := path.Join(disk.Path, "shadow", shadowBackupUUID)
if _, err := os.Stat(shadowPath); err != nil && os.IsNotExist(err) {
Expand All @@ -760,29 +775,30 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string,
encodedTablePath := path.Join(common.TablePathEncode(table.Database), common.TablePathEncode(table.Name))
backupShadowPath := path.Join(backupPath, "shadow", encodedTablePath, disk.Name)
if err := filesystemhelper.MkdirAll(backupShadowPath, b.ch, diskList); err != nil && !os.IsExist(err) {
return nil, nil, err
return nil, nil, nil, err
}
// If partitionsIdsMap is not empty, only parts in this partition will back up.
parts, size, err := filesystemhelper.MoveShadowToBackup(shadowPath, backupShadowPath, partitionsIdsMap, tablesDiffFromRemote[metadata.TableTitle{Database: table.Database, Table: table.Name}], disk, version)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
realSize[disk.Name] = size

disksToPartsMap[disk.Name] = parts
log.WithField("disk", disk.Name).Debug("shadow moved")
if len(parts) > 0 && (b.isDiskTypeObject(disk.Type) || b.isDiskTypeEncryptedObject(disk, diskList)) {
start := time.Now()
log.WithField("disk", disk.Name).Info("upload object_disk start")
if size, err = b.uploadObjectDiskParts(ctx, backupName, tablesDiffFromRemote[metadata.TableTitle{Database: table.Database, Table: table.Name}], backupShadowPath, disk); err != nil {
return disksToPartsMap, realSize, err
return nil, nil, nil, err
}
realSize[disk.Name] += size
objectDiskSize[disk.Name] = size
log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("upload object_disk finish")
}
// Clean all the files under the shadowPath, cause UNFREEZE unavailable
if version < 21004000 {
if err := os.RemoveAll(shadowPath); err != nil {
return disksToPartsMap, realSize, err
return nil, nil, nil, err
}
}
}
Expand All @@ -792,14 +808,11 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string,
if err := b.ch.QueryContext(ctx, fmt.Sprintf("ALTER TABLE `%s`.`%s` UNFREEZE WITH NAME '%s'", table.Database, table.Name, shadowBackupUUID)); err != nil {
if (strings.Contains(err.Error(), "code: 60") || strings.Contains(err.Error(), "code: 81") || strings.Contains(err.Error(), "code: 218")) && b.cfg.ClickHouse.IgnoreNotExistsErrorDuringFreeze {
b.ch.Log.Warnf("can't unfreeze table: %v", err)
} else {
return disksToPartsMap, realSize, err
}

}
}
log.Debug("done")
return disksToPartsMap, realSize, nil
return disksToPartsMap, realSize, objectDiskSize, nil
}

func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName string, tableDiffFromRemote metadata.TableMetadata, backupShadowPath string, disk clickhouse.Disk) (int64, error) {
Expand Down Expand Up @@ -880,7 +893,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName string,
return size, nil
}

func (b *Backuper) createBackupMetadata(ctx context.Context, backupMetaFile, backupName, requiredBackup, version, tags string, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, backupDataSize, backupMetadataSize, backupRBACSize, backupConfigSize uint64, tableMetas []metadata.TableTitle, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry) error {
func (b *Backuper) createBackupMetadata(ctx context.Context, backupMetaFile, backupName, requiredBackup, version, tags string, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, backupDataSize, backupObjectDiskSize, backupMetadataSize, backupRBACSize, backupConfigSize uint64, tableMetas []metadata.TableTitle, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -895,6 +908,7 @@ func (b *Backuper) createBackupMetadata(ctx context.Context, backupMetaFile, bac
Tags: tags,
ClickHouseVersion: b.ch.GetVersionDescribe(ctx),
DataSize: backupDataSize,
ObjectDiskSize: backupObjectDiskSize,
MetadataSize: backupMetadataSize,
RBACSize: backupRBACSize,
ConfigSize: backupConfigSize,
Expand Down
6 changes: 4 additions & 2 deletions pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ func (b *Backuper) cleanEmbeddedAndObjectDiskLocalIfSameRemoteNotPresent(ctx con
return err
}
if !skip && (hasObjectDisks || (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk == "")) {
startTime := time.Now()
if deletedKeys, deleteErr := b.cleanBackupObjectDisks(ctx, backupName); deleteErr != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", deleteErr)
return err
} else {
log.Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
log.WithField("duration", utils.HumanizeDuration(time.Since(startTime))).Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
}
}
if !skip && (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "") {
Expand Down Expand Up @@ -337,10 +338,11 @@ func (b *Backuper) cleanEmbeddedAndObjectDiskRemoteIfSameLocalNotPresent(ctx con
return nil
}
if b.hasObjectDisksRemote(backup) || (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk == "") {
startTime := time.Now()
if deletedKeys, deleteErr := b.cleanBackupObjectDisks(ctx, backup.BackupName); deleteErr != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", deleteErr)
} else {
log.Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
log.WithField("duration", utils.HumanizeDuration(time.Since(startTime))).Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
if remoteBackup.RequiredBackup != "" {
if localBackups, _, err = b.GetLocalBackups(ctx, disks); err == nil {
for _, localBackup := range localBackups {
if localBackup.BackupName != remoteBackup.BackupName && localBackup.DataSize+localBackup.CompressedSize+localBackup.MetadataSize == 0 {
if localBackup.BackupName != remoteBackup.BackupName && localBackup.DataSize+localBackup.CompressedSize+localBackup.MetadataSize+localBackup.RBACSize == 0 {
if err = b.RemoveBackupLocal(ctx, localBackup.BackupName, disks); err != nil {
return fmt.Errorf("downloadWithDiff -> RemoveBackupLocal cleaning error: %v", err)
} else {
Expand Down
13 changes: 2 additions & 11 deletions pkg/backup/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ func printBackupsRemote(w io.Writer, backupList []storage.Backup, format string)
// fmt.Println("no backups found")
// }
for _, backup := range backupList {
size := utils.FormatBytes(backup.DataSize + backup.MetadataSize)
if backup.CompressedSize > 0 {
size = utils.FormatBytes(backup.CompressedSize + backup.MetadataSize)
}
size := utils.FormatBytes(backup.GetFullSize())
description := backup.DataFormat
uploadDate := backup.UploadDate.Format("02/01/2006 15:04:05")
if backup.Tags != "" {
Expand Down Expand Up @@ -96,18 +93,12 @@ func printBackupsLocal(ctx context.Context, w io.Writer, backupList []LocalBacku
}
fmt.Println(backupList[len(backupList)-2].BackupName)
case "all", "":
// if len(backupList) == 0 {
// fmt.Println("no backups found")
// }
for _, backup := range backupList {
select {
case <-ctx.Done():
return ctx.Err()
default:
size := utils.FormatBytes(backup.DataSize + backup.MetadataSize)
if backup.CompressedSize > 0 {
size = utils.FormatBytes(backup.CompressedSize + backup.MetadataSize)
}
size := utils.FormatBytes(backup.GetFullSize())
description := backup.DataFormat
if backup.Tags != "" {
if description != "" {
Expand Down
Loading

0 comments on commit eb33790

Please sign in to comment.