Skip to content

Commit

Permalink
PEERDB_BINARY_FORMAT
Browse files Browse the repository at this point in the history
In #2181 we consolidated on transmitting bytea columns as base64 strings

ClickHouse supports binary data in strings. They need not be valid utf8

This adds PEERDB_BINARY_FORMAT with 3 formats: raw, hex, & base64
Default to base64 to avoid breaking existing setups

Also add caching logic for non-immediate dynamic settings. Helps here where we'd hit catalog for every bytea field processed
  • Loading branch information
serprex committed Jan 3, 2025
1 parent ff1f11d commit 9737f76
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 34 deletions.
56 changes: 40 additions & 16 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,38 +366,62 @@ func (c *ClickHouseConnector) NormalizeRecords(
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName,
dstColName,
colName, dstColName,
))
}
default:
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName,
clickHouseType,
dstColName,
))
projLen := projection.Len()
if colType == qvalue.QValueKindBytes {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, req.Env)
if err != nil {
return model.NormalizeResponse{}, err
}
switch format {
case peerdbenv.BinaryFormatRaw:
projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
colName, dstColName,
))
}
case peerdbenv.BinaryFormatHex:
projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
colName, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
colName, dstColName,
))
}
}
}

// proceed with default logic if logic above didn't add any sql
if projection.Len() == projLen {
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,",
colName, clickHouseType, dstColName,
))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s
if err := ctx.Err(); err != nil {
return numRows.Load(), err
} else {
avroMap, err := avroConverter.Convert(qrecord)
avroMap, err := avroConverter.Convert(ctx, env, qrecord)
if err != nil {
logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err))
return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
Expand Down
9 changes: 3 additions & 6 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func NewQRecordAvroConverter(
}, nil
}

func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) {
func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) {
m := make(map[string]any, len(qrecord))
for idx, val := range qrecord {
avroVal, err := qvalue.QValueToAvro(
val,
&qac.Schema.Fields[idx],
qac.TargetDWH,
qac.logger,
qac.UnboundedNumericAsString,
ctx, env, val,
&qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString,
)
if err != nil {
return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err)
Expand Down
28 changes: 23 additions & 5 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package qvalue
import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -115,7 +116,11 @@ func GetAvroSchemaFromQValueKind(
case QValueKindBoolean:
return "boolean", nil
case QValueKindBytes:
if targetDWH == protos.DBType_CLICKHOUSE {
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
if targetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
return "string", nil
}
return "bytes", nil
Expand Down Expand Up @@ -245,6 +250,7 @@ type QValueAvroConverter struct {
}

func QValueToAvro(
ctx context.Context, env map[string]string,
value QValue, field *QField, targetDWH protos.DBType, logger log.Logger,
unboundedNumericAsString bool,
) (any, error) {
Expand Down Expand Up @@ -377,7 +383,11 @@ func QValueToAvro(
case QValueNumeric:
return c.processNumeric(v.Val), nil
case QValueBytes:
return c.processBytes(v.Val), nil
format, err := peerdbenv.PeerDBBinaryFormat(ctx, env)
if err != nil {
return nil, err
}
return c.processBytes(v.Val, format), nil
case QValueJSON:
return c.processJSON(v.Val), nil
case QValueHStore:
Expand Down Expand Up @@ -509,9 +519,17 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any {
return rat
}

func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE {
encoded := base64.StdEncoding.EncodeToString(byteData)
func (c *QValueAvroConverter) processBytes(byteData []byte, format peerdbenv.BinaryFormat) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw {
var encoded string
switch format {
case peerdbenv.BinaryFormatBase64:
encoded = base64.StdEncoding.EncodeToString(byteData)
case peerdbenv.BinaryFormatHex:
encoded = hex.EncodeToString(byteData)
default:
panic(fmt.Sprintf("unhandled binary format: %d", format))
}
if c.Nullable {
return goavro.Union("string", encoded)
}
Expand Down
6 changes: 2 additions & 4 deletions flow/model/record_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
if len(v.Val) > 15*1024*1024 {
jsonStruct[col] = "{}"
} else if _, ok := opts.UnnestColumns[col]; ok {
var unnestStruct map[string]interface{}
err := json.Unmarshal([]byte(v.Val), &unnestStruct)
if err != nil {
var unnestStruct map[string]any
if err := json.Unmarshal([]byte(v.Val), &unnestStruct); err != nil {
return nil, err
}

Expand Down Expand Up @@ -185,7 +184,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) {
}
}
jsonStruct[col] = nullableFloatArr

default:
jsonStruct[col] = v.Value()
}
Expand Down
51 changes: 49 additions & 2 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_BINARY_FORMAT",
Description: "Binary field encoding; either raw, hex, or base64",
DefaultValue: "base64",
ValueType: protos.DynconfValueType_STRING,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
Expand Down Expand Up @@ -238,6 +246,14 @@ var DynamicIndex = func() map[string]int {
return defaults
}()

type BinaryFormat int

const (
BinaryFormatRaw = iota
BinaryFormatBase64
BinaryFormatHex
)

func dynLookup(ctx context.Context, env map[string]string, key string) (string, error) {
if val, ok := env[key]; ok {
return val, nil
Expand All @@ -249,6 +265,11 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
return "", fmt.Errorf("failed to get catalog connection pool: %w", err)
}

var setting *protos.DynamicSetting
if idx, ok := DynamicIndex[key]; ok {
setting = DynamicSettings[idx]
}

var value pgtype.Text
query := "SELECT config_value FROM dynamic_settings WHERE config_name=$1"
if err := conn.QueryRow(ctx, query, key).Scan(&value); err != nil && err != pgx.ErrNoRows {
Expand All @@ -257,12 +278,21 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string,
}
if !value.Valid {
if val, ok := os.LookupEnv(key); ok {
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = val
}
return val, nil
}
if idx, ok := DynamicIndex[key]; ok {
return DynamicSettings[idx].DefaultValue, nil
if setting != nil {
if env != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = setting.DefaultValue
}
return setting.DefaultValue, nil
}
}
if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE {
env[key] = value.String
}
return value.String, nil
}

Expand Down Expand Up @@ -398,6 +428,23 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_BINARY_FORMAT")
if err != nil {
return 0, err
}
switch format {
case "raw":
return BinaryFormatRaw, nil
case "hex":
return BinaryFormatHex, nil
case "base64":
return BinaryFormatBase64, nil
default:
return 0, fmt.Errorf("unknown binary format %s", format)
}
}

func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}
Expand Down

0 comments on commit 9737f76

Please sign in to comment.