Skip to content

Commit

Permalink
ClickHouse resync: remove soft-delete transfer (#2411)
Browse files Browse the repository at this point in the history
There are tricky situations pertaining to data consistency with
transferring soft-deletes to the new table during a resync.
For example, there is currently a bug where, in the old table, if a
record was deleted and re-inserted (for the same primary key), we would
then mark an existing row as deleted in the new resync table when in
fact it is not

This PR removes soft-delete transferring altogether for ClickHouse
connector's resync
  • Loading branch information
Amogh-Bharadwaj authored Jan 6, 2025
1 parent bb88bd1 commit 85157c0
Showing 1 changed file with 0 additions and 16 deletions.
16 changes: 0 additions & 16 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"log/slog"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -178,21 +177,6 @@ func (c *ClickHouseConnector) RenameTables(
}

if originalTableExists {
tableSchema := tableNameSchemaMapping[renameRequest.CurrentName]
columnNames := make([]string, 0, len(tableSchema.Columns))
for _, col := range tableSchema.Columns {
columnNames = append(columnNames, col.Name)
}

allCols := strings.Join(columnNames, ",")
c.logger.Info("handling soft-deletes for table before rename", slog.String("NewName", renameRequest.NewName))
if err := c.execWithLogging(ctx,
fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` WHERE %s = 1",
renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName),
); err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err)
}

// target table exists, so we can attempt to swap. In most cases, we will have Atomic engine,
// which supports a special query to exchange two tables, allowing dependent (materialized) views and dictionaries on these tables
c.logger.Info("attempting atomic exchange",
Expand Down

0 comments on commit 85157c0

Please sign in to comment.