From 85157c06108943ec3e54cc091e75e0de05a00080 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Tue, 7 Jan 2025 04:31:22 +0530 Subject: [PATCH] ClickHouse resync: remove soft-delete transfer (#2411) There are tricky situations pertaining to data consistency with transferring soft-deletes to the new table during a resync. For example, there is currently a bug where, in the old table, if a record was deleted and re-inserted (for the same primary key), we would then mark an existing row as deleted in the new resync table when in fact it is not This PR removes soft-delete transferring altogether for ClickHouse connector's resync --- flow/connectors/clickhouse/cdc.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5dc8a14628..5dc4a53dd3 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "strings" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -178,21 +177,6 @@ func (c *ClickHouseConnector) RenameTables( } if originalTableExists { - tableSchema := tableNameSchemaMapping[renameRequest.CurrentName] - columnNames := make([]string, 0, len(tableSchema.Columns)) - for _, col := range tableSchema.Columns { - columnNames = append(columnNames, col.Name) - } - - allCols := strings.Join(columnNames, ",") - c.logger.Info("handling soft-deletes for table before rename", slog.String("NewName", renameRequest.NewName)) - if err := c.execWithLogging(ctx, - fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` WHERE %s = 1", - renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName), - ); err != nil { - return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err) - } - // target table exists, so we can attempt to swap. In most cases, we will have Atomic engine, // which supports a special query to exchange two tables, allowing dependent (materialized) views and dictionaries on these tables c.logger.Info("attempting atomic exchange",