Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 4, 2025
1 parent 4a4d320 commit 3995080
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
12 changes: 6 additions & 6 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,9 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

const rawBinaryFormatTestcase = "\x00\x010123\x7f\xff"
const binaryFormatTestcase = "\x00\x010123\x7f\xff"

// PEERDB_BINARY_FORMAT
// PEERDB_CLICKHOUSE_BINARY_FORMAT
func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
dstTableName := "binary_format_" + format
srcFullName := s.attachSchemaSuffix(dstTableName)
Expand All @@ -680,7 +680,7 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
`, srcFullName))
require.NoError(s.t, err)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase))
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
Expand All @@ -690,14 +690,14 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_BINARY_FORMAT": format}
flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_BINARY_FORMAT": format}
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase))
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2)

Expand All @@ -714,7 +714,7 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
}

func (s ClickHouseSuite) Test_Binary_Format_Raw() {
s.testBinaryFormat("raw", rawBinaryFormatTestcase)
s.testBinaryFormat("raw", binaryFormatTestcase)
}

func (s ClickHouseSuite) Test_Binary_Format_Hex() {
Expand Down
12 changes: 7 additions & 5 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"strconv"
"strings"
"time"

"github.com/aws/smithy-go/ptr"
Expand Down Expand Up @@ -108,8 +109,8 @@ var DynamicSettings = [...]*protos.DynamicSetting{
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_BINARY_FORMAT",
Description: "Binary field encoding; either raw, hex, or base64",
Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT",
Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64",
DefaultValue: "base64",
ValueType: protos.DynconfValueType_STRING,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
Expand Down Expand Up @@ -249,7 +250,8 @@ var DynamicIndex = func() map[string]int {
type BinaryFormat int

const (
BinaryFormatRaw = iota
BinaryFormatInvalid = iota
BinaryFormatRaw
BinaryFormatBase64
BinaryFormatHex
)
Expand Down Expand Up @@ -429,11 +431,11 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_BINARY_FORMAT")
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
if err != nil {
return 0, err
}
switch format {
switch strings.ToLower(format) {
case "raw":
return BinaryFormatRaw, nil
case "hex":
Expand Down

0 comments on commit 3995080

Please sign in to comment.