Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PEERDB_CLICKHOUSE_BINARY_FORMAT #2407

Merged
merged 4 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,38 +366,62 @@ func (c *ClickHouseConnector) NormalizeRecords(
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
default:
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName,
clickHouseType,
dstColName,
))
projLen := projection.Len()
if colType == qvalue.QValueKindBytes {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, req.Env)
if err != nil {
return model.NormalizeResponse{}, err
}
switch format {
case peerdbenv.BinaryFormatRaw:
projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
colName, dstColName,
))
}
case peerdbenv.BinaryFormatHex:
projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
colName, dstColName,
))
}
}
}

// proceed with default logic if logic above didn't add any sql
if projection.Len() == projLen {
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName, clickHouseType, dstColName,
))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s
if err := ctx.Err(); err != nil {
return numRows.Load(), err
} else {
avroMap, err := avroConverter.Convert(qrecord)
avroMap, err := avroConverter.Convert(ctx, env, qrecord)
if err != nil {
logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err))
return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
Expand Down
61 changes: 60 additions & 1 deletion flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,6 @@ func (s ClickHouseSuite) testNumericFF(ffValue bool) {

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines)
require.NoError(s.t, err)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2)

rows, err := s.GetRows(dstTableName, "c")
Expand Down Expand Up @@ -666,6 +665,66 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

const binaryFormatTestcase = "\x00\x010123\x7f\xff"

// PEERDB_CLICKHOUSE_BINARY_FORMAT
func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
dstTableName := "binary_format_" + format
srcFullName := s.attachSchemaSuffix(dstTableName)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
val bytea
);
`, srcFullName))
require.NoError(s.t, err)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_binary_format_" + format),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_BINARY_FORMAT": format}
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2)

rows, err := s.GetRows(dstTableName, "val")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2, "expected 2 rows")
for _, row := range rows.Records {
require.Len(s.t, row, 1, "expected 1 column")
require.Equal(s.t, expected, row[0].Value())
}

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_Binary_Format_Raw() {
s.testBinaryFormat("raw", binaryFormatTestcase)
}

func (s ClickHouseSuite) Test_Binary_Format_Hex() {
s.testBinaryFormat("hex", "0001303132337FFF")
}

func (s ClickHouseSuite) Test_Binary_Format_Base64() {
s.testBinaryFormat("base64", "AAEwMTIzf/8=")
}

func (s ClickHouseSuite) Test_Types_CH() {
srcTableName := "test_types"
srcFullName := s.attachSchemaSuffix("test_types")
Expand Down
9 changes: 3 additions & 6 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func NewQRecordAvroConverter(
}, nil
}

func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) {
func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) {
m := make(map[string]any, len(qrecord))
for idx, val := range qrecord {
avroVal, err := qvalue.QValueToAvro(
val,
&qac.Schema.Fields[idx],
qac.TargetDWH,
qac.logger,
qac.UnboundedNumericAsString,
ctx, env, val,
&qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString,
)
if err != nil {
return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err)
Expand Down
29 changes: 24 additions & 5 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package qvalue
import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -115,7 +117,11 @@ func GetAvroSchemaFromQValueKind(
case QValueKindBoolean:
return "boolean", nil
case QValueKindBytes:
if targetDWH == protos.DBType_CLICKHOUSE {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
if targetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
return "string", nil
}
return "bytes", nil
Expand Down Expand Up @@ -245,6 +251,7 @@ type QValueAvroConverter struct {
}

func QValueToAvro(
ctx context.Context, env map[string]string,
value QValue, field *QField, targetDWH protos.DBType, logger log.Logger,
unboundedNumericAsString bool,
) (any, error) {
Expand Down Expand Up @@ -377,7 +384,11 @@ func QValueToAvro(
case QValueNumeric:
return c.processNumeric(v.Val), nil
case QValueBytes:
return c.processBytes(v.Val), nil
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
return c.processBytes(v.Val, format), nil
case QValueJSON:
return c.processJSON(v.Val), nil
case QValueHStore:
Expand Down Expand Up @@ -509,9 +520,17 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any {
return rat
}

func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE {
encoded := base64.StdEncoding.EncodeToString(byteData)
func (c *QValueAvroConverter) processBytes(byteData []byte, format peerdbenv.BinaryFormat) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
var encoded string
switch format {
case peerdbenv.BinaryFormatBase64:
encoded = base64.StdEncoding.EncodeToString(byteData)
case peerdbenv.BinaryFormatHex:
encoded = strings.ToUpper(hex.EncodeToString(byteData))
default:
panic(fmt.Sprintf("unhandled binary format: %d", format))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a panic can cause issues with maintenance mode (relies on flow-worker being active to pause mirrors)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic can never happen unless a logic error is introduced in code, & bad code can always panic anyways

}
if c.Nullable {
return goavro.Union("string", encoded)
}
Expand Down
6 changes: 2 additions & 4 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
if len(v.Val) > 15*1024*1024 {
jsonStruct[col] = "{}"
} else if _, ok := opts.UnnestColumns[col]; ok {
var unnestStruct map[string]interface{}
err := json.Unmarshal([]byte(v.Val), &unnestStruct)
if err != nil {
var unnestStruct map[string]any
if err := json.Unmarshal([]byte(v.Val), &unnestStruct); err != nil {
return nil, err
}

Expand Down Expand Up @@ -185,7 +184,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
}
}
jsonStruct[col] = nullableFloatArr

default:
jsonStruct[col] = v.Value()
}
Expand Down
53 changes: 51 additions & 2 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"strconv"
"strings"
"time"

"github.com/aws/smithy-go/ptr"
Expand Down Expand Up @@ -107,6 +108,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT",
Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64",
DefaultValue: "base64",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to raw as a TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by time we change default to raw we can probably remove this option

ValueType: protos.DynconfValueType_STRING,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
Expand Down Expand Up @@ -238,6 +247,15 @@ var DynamicIndex = func() map[string]int {
return defaults
}()

type BinaryFormat int

const (
BinaryFormatInvalid = iota
BinaryFormatRaw
BinaryFormatBase64
BinaryFormatHex
)

func dynLookup(ctx context.Context, env map[string]string, key string) (string, error) {
if val, ok := env[key]; ok {
return val, nil
Expand All @@ -249,6 +267,11 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
return "", fmt.Errorf("failed to get catalog connection pool: %w", err)
}

var setting *protos.DynamicSetting
if idx, ok := DynamicIndex[key]; ok {
setting = DynamicSettings[idx]
}

var value pgtype.Text
query := "SELECT config_value FROM dynamic_settings WHERE config_name=$1"
if err := conn.QueryRow(ctx, query, key).Scan(&value); err != nil && err != pgx.ErrNoRows {
Expand All @@ -257,12 +280,21 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
}
if !value.Valid {
if val, ok := os.LookupEnv(key); ok {
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = val
}
return val, nil
}
if idx, ok := DynamicIndex[key]; ok {
return DynamicSettings[idx].DefaultValue, nil
if setting != nil {
if env != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = setting.DefaultValue
}
return setting.DefaultValue, nil
}
}
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = value.String
}
return value.String, nil
}

Expand Down Expand Up @@ -398,6 +430,23 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
if err != nil {
return 0, err
}
switch strings.ToLower(strings.TrimSpace(format)) {
case "raw":
serprex marked this conversation as resolved.
Show resolved Hide resolved
return BinaryFormatRaw, nil
case "hex":
return BinaryFormatHex, nil
case "base64":
return BinaryFormatBase64, nil
default:
return 0, fmt.Errorf("unknown binary format %s", format)
}
}

func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}
Expand Down
Loading