From 77ff40d46ca31d969c831290cb04eeb7b944a956 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 6 Jan 2025 23:45:33 +0530 Subject: [PATCH 1/2] add final to soft-delete transfer --- flow/connectors/clickhouse/cdc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5dc8a14628..ed68760598 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -187,7 +187,7 @@ func (c *ClickHouseConnector) RenameTables( allCols := strings.Join(columnNames, ",") c.logger.Info("handling soft-deletes for table before rename", slog.String("NewName", renameRequest.NewName)) if err := c.execWithLogging(ctx, - fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` WHERE %s = 1", + fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` FINAL WHERE %s = 1", renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName), ); err != nil { return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err) From e894ac83099d6e0e5e3c7192d39351dc3afadd81 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 7 Jan 2025 00:27:21 +0530 Subject: [PATCH 2/2] remove soft-deletes transfer altogether --- flow/connectors/clickhouse/cdc.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index ed68760598..5dc4a53dd3 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "strings" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -178,21 +177,6 @@ func (c *ClickHouseConnector) RenameTables( } if originalTableExists { - tableSchema := tableNameSchemaMapping[renameRequest.CurrentName] - columnNames := make([]string, 0, len(tableSchema.Columns)) - for _, col := range tableSchema.Columns { - columnNames = append(columnNames, col.Name) - } - - allCols := strings.Join(columnNames, ",") - c.logger.Info("handling soft-deletes for table before rename", slog.String("NewName", renameRequest.NewName)) - if err := c.execWithLogging(ctx, - fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` FINAL WHERE %s = 1", - renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName), - ); err != nil { - return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err) - } - // target table exists, so we can attempt to swap. In most cases, we will have Atomic engine, // which supports a special query to exchange two tables, allowing dependent (materialized) views and dictionaries on these tables c.logger.Info("attempting atomic exchange",