diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 60a79a6d9f..26e0cf9162 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -366,38 +366,62 @@ func (c *ClickHouseConnector) NormalizeRecords( case "Date32", "Nullable(Date32)": projection.WriteString(fmt.Sprintf( "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) } case "DateTime64(6)", "Nullable(DateTime64(6))": projection.WriteString(fmt.Sprintf( "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) } default: - projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName)) - if enablePrimaryUpdate { - projectionUpdate.WriteString(fmt.Sprintf( - "JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,", - colName, - clickHouseType, - dstColName, - )) + projLen := projection.Len() + if colType == qvalue.QValueKindBytes { + format, err := peerdbenv.PeerDBBinaryFormat(ctx, req.Env) + if err != nil { + return model.NormalizeResponse{}, err + } + switch format { + case peerdbenv.BinaryFormatRaw: + projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,", + colName, dstColName, + )) + } + case peerdbenv.BinaryFormatHex: + projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,", + colName, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,", + colName, dstColName, + )) + } + } + } + + // proceed with default logic if logic above didn't add any sql + if projection.Len() == projLen { + projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,", + colName, clickHouseType, dstColName, + )) + } } } } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 6357c46d94..277e0be44c 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -157,7 +157,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s if err := ctx.Err(); err != nil { return numRows.Load(), err } else { - avroMap, err := avroConverter.Convert(qrecord) + avroMap, err := avroConverter.Convert(ctx, env, qrecord) if err != nil { logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err)) return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 813d3d35c5..beaae5ceaf 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -635,7 +635,6 @@ func (s ClickHouseSuite) testNumericFF(ffValue bool) { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines) require.NoError(s.t, err) - e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2) rows, err := s.GetRows(dstTableName, "c") @@ -666,6 +665,66 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { s.testNumericFF(false) } +const binaryFormatTestcase = "\x00\x010123\x7f\xff" + +// PEERDB_CLICKHOUSE_BINARY_FORMAT +func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { + dstTableName := "binary_format_" + format + srcFullName := s.attachSchemaSuffix(dstTableName) + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + val bytea + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("ch_binary_format_" + format), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_BINARY_FORMAT": format} + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase)) + require.NoError(s.t, err) + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2) + + rows, err := s.GetRows(dstTableName, "val") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 1, "expected 1 column") + require.Equal(s.t, expected, row[0].Value()) + } + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_Binary_Format_Raw() { + s.testBinaryFormat("raw", binaryFormatTestcase) +} + +func (s ClickHouseSuite) Test_Binary_Format_Hex() { + s.testBinaryFormat("hex", "0001303132337FFF") +} + +func (s ClickHouseSuite) Test_Binary_Format_Base64() { + s.testBinaryFormat("base64", "AAEwMTIzf/8=") +} + func (s ClickHouseSuite) Test_Types_CH() { srcTableName := "test_types" srcFullName := s.attachSchemaSuffix("test_types") diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index ec7cfc6e37..b4cab1613a 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -46,15 +46,12 @@ func NewQRecordAvroConverter( }, nil } -func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) { +func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) { m := make(map[string]any, len(qrecord)) for idx, val := range qrecord { avroVal, err := qvalue.QValueToAvro( - val, - &qac.Schema.Fields[idx], - qac.TargetDWH, - qac.logger, - qac.UnboundedNumericAsString, + ctx, env, val, + &qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString, ) if err != nil { return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index df5aaee040..d860b5be70 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -3,9 +3,11 @@ package qvalue import ( "context" "encoding/base64" + "encoding/hex" "errors" "fmt" "log/slog" + "strings" "time" "github.com/google/uuid" @@ -115,7 +117,11 @@ func GetAvroSchemaFromQValueKind( case QValueKindBoolean: return "boolean", nil case QValueKindBytes: - if targetDWH == protos.DBType_CLICKHOUSE { + format, err := peerdbenv.PeerDBBinaryFormat(ctx, env) + if err != nil { + return nil, err + } + if targetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw { return "string", nil } return "bytes", nil @@ -245,6 +251,7 @@ type QValueAvroConverter struct { } func QValueToAvro( + ctx context.Context, env map[string]string, value QValue, field *QField, targetDWH protos.DBType, logger log.Logger, unboundedNumericAsString bool, ) (any, error) { @@ -377,7 +384,11 @@ func QValueToAvro( case QValueNumeric: return c.processNumeric(v.Val), nil case QValueBytes: - return c.processBytes(v.Val), nil + format, err := peerdbenv.PeerDBBinaryFormat(ctx, env) + if err != nil { + return nil, err + } + return c.processBytes(v.Val, format), nil case QValueJSON: return c.processJSON(v.Val), nil case QValueHStore: @@ -509,9 +520,17 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any { return rat } -func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} { - if c.TargetDWH == protos.DBType_CLICKHOUSE { - encoded := base64.StdEncoding.EncodeToString(byteData) +func (c *QValueAvroConverter) processBytes(byteData []byte, format peerdbenv.BinaryFormat) interface{} { + if c.TargetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw { + var encoded string + switch format { + case peerdbenv.BinaryFormatBase64: + encoded = base64.StdEncoding.EncodeToString(byteData) + case peerdbenv.BinaryFormatHex: + encoded = strings.ToUpper(hex.EncodeToString(byteData)) + default: + panic(fmt.Sprintf("unhandled binary format: %d", format)) + } if c.Nullable { return goavro.Union("string", encoded) } diff --git a/flow/model/record_items.go b/flow/model/record_items.go index 13b6dfef5a..33c2ac4941 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -102,9 +102,8 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { if len(v.Val) > 15*1024*1024 { jsonStruct[col] = "{}" } else if _, ok := opts.UnnestColumns[col]; ok { - var unnestStruct map[string]interface{} - err := json.Unmarshal([]byte(v.Val), &unnestStruct) - if err != nil { + var unnestStruct map[string]any + if err := json.Unmarshal([]byte(v.Val), &unnestStruct); err != nil { return nil, err } @@ -185,7 +184,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { } } jsonStruct[col] = nullableFloatArr - default: jsonStruct[col] = v.Value() } diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 6501a6bce1..147afd1174 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "strconv" + "strings" "time" "github.com/aws/smithy-go/ptr" @@ -107,6 +108,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT", + Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64", + DefaultValue: "base64", + ValueType: protos.DynconfValueType_STRING, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit", @@ -238,6 +247,15 @@ var DynamicIndex = func() map[string]int { return defaults }() +type BinaryFormat int + +const ( + BinaryFormatInvalid = iota + BinaryFormatRaw + BinaryFormatBase64 + BinaryFormatHex +) + func dynLookup(ctx context.Context, env map[string]string, key string) (string, error) { if val, ok := env[key]; ok { return val, nil @@ -249,6 +267,11 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string, return "", fmt.Errorf("failed to get catalog connection pool: %w", err) } + var setting *protos.DynamicSetting + if idx, ok := DynamicIndex[key]; ok { + setting = DynamicSettings[idx] + } + var value pgtype.Text query := "SELECT config_value FROM dynamic_settings WHERE config_name=$1" if err := conn.QueryRow(ctx, query, key).Scan(&value); err != nil && err != pgx.ErrNoRows { @@ -257,12 +280,21 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string, } if !value.Valid { if val, ok := os.LookupEnv(key); ok { + if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = val + } return val, nil } - if idx, ok := DynamicIndex[key]; ok { - return DynamicSettings[idx].DefaultValue, nil + if setting != nil { + if env != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = setting.DefaultValue + } + return setting.DefaultValue, nil } } + if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = value.String + } return value.String, nil } @@ -398,6 +430,23 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_NULLABLE") } +func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { + format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT") + if err != nil { + return 0, err + } + switch strings.ToLower(strings.TrimSpace(format)) { + case "raw": + return BinaryFormatRaw, nil + case "hex": + return BinaryFormatHex, nil + case "base64": + return BinaryFormatBase64, nil + default: + return 0, fmt.Errorf("unknown binary format %s", format) + } +} + func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE") }