diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 50cfbc426..beaae5cea 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -665,9 +665,9 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { s.testNumericFF(false) } -const rawBinaryFormatTestcase = "\x00\x010123\x7f\xff" +const binaryFormatTestcase = "\x00\x010123\x7f\xff" -// PEERDB_BINARY_FORMAT +// PEERDB_CLICKHOUSE_BINARY_FORMAT func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { dstTableName := "binary_format_" + format srcFullName := s.attachSchemaSuffix(dstTableName) @@ -680,7 +680,7 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { `, srcFullName)) require.NoError(s.t, err) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase)) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase)) require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ @@ -690,14 +690,14 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) flowConnConfig.DoInitialSnapshot = true - flowConnConfig.Env = map[string]string{"PEERDB_BINARY_FORMAT": format} + flowConnConfig.Env = map[string]string{"PEERDB_CLICKHOUSE_BINARY_FORMAT": format} tc := e2e.NewTemporalClient(s.t) env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase)) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(binaryFormatTestcase)) require.NoError(s.t, err) e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2) @@ -714,7 +714,7 @@ func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { } func (s ClickHouseSuite) Test_Binary_Format_Raw() { - s.testBinaryFormat("raw", rawBinaryFormatTestcase) + s.testBinaryFormat("raw", binaryFormatTestcase) } func (s ClickHouseSuite) Test_Binary_Format_Hex() { diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 9d2036d2a..147afd117 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "strconv" + "strings" "time" "github.com/aws/smithy-go/ptr" @@ -108,8 +109,8 @@ var DynamicSettings = [...]*protos.DynamicSetting{ TargetForSetting: protos.DynconfTarget_ALL, }, { - Name: "PEERDB_BINARY_FORMAT", - Description: "Binary field encoding; either raw, hex, or base64", + Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT", + Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64", DefaultValue: "base64", ValueType: protos.DynconfValueType_STRING, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, @@ -249,7 +250,8 @@ var DynamicIndex = func() map[string]int { type BinaryFormat int const ( - BinaryFormatRaw = iota + BinaryFormatInvalid = iota + BinaryFormatRaw BinaryFormatBase64 BinaryFormatHex ) @@ -429,11 +431,11 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { } func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { - format, err := dynLookup(ctx, env, "PEERDB_BINARY_FORMAT") + format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT") if err != nil { return 0, err } - switch format { + switch strings.ToLower(strings.TrimSpace(format)) { case "raw": return BinaryFormatRaw, nil case "hex":