diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 813d3d35c5..7b0c06b8aa 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -635,7 +635,6 @@ func (s ClickHouseSuite) testNumericFF(ffValue bool) { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines) require.NoError(s.t, err) - e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2) rows, err := s.GetRows(dstTableName, "c") @@ -666,6 +665,63 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() { s.testNumericFF(false) } +const rawBinaryFormatTestcase = "\x00\x010123\x7f\xff" + +// PEERDB_BINARY_FORMAT +func (s ClickHouseSuite) testBinaryFormat(format string, expected string) { + dstTableName := "binary_format_" + format + srcFullName := s.attachSchemaSuffix(dstTableName) + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + val bytea + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("ch_binary_format_" + format), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{"PEERDB_BINARY_FORMAT": format} + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase)) + require.NoError(s.t, err) + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2) + + rows, err := s.GetRows(dstTableName, "val") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 1, "expected 1 column") + require.Equal(s.t, expected, row[0].Value()) + } +} + +func (s ClickHouseSuite) Test_Binary_Format_Raw() { + s.testBinaryFormat("raw", rawBinaryFormatTestcase) +} + +func (s ClickHouseSuite) Test_Binary_Format_Hex() { + s.testBinaryFormat("hex", "0001303132337FFF") +} + +func (s ClickHouseSuite) Test_Binary_Format_Base64() { + s.testBinaryFormat("base64", "AAEwMTIzf/8=") +} + func (s ClickHouseSuite) Test_Types_CH() { srcTableName := "test_types" srcFullName := s.attachSchemaSuffix("test_types")