Skip to content

Commit

Permalink
e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 3, 2025
1 parent 9737f76 commit d175ab7
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,6 @@ func (s ClickHouseSuite) testNumericFF(ffValue bool) {

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(c) VALUES($1)", srcFullName), nines)
require.NoError(s.t, err)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c", 2)

rows, err := s.GetRows(dstTableName, "c")
Expand Down Expand Up @@ -666,6 +665,66 @@ func (s ClickHouseSuite) Test_Unbounded_Numeric_Without_FF() {
s.testNumericFF(false)
}

const rawBinaryFormatTestcase = "\x00\x010123\x7f\xff"

// PEERDB_BINARY_FORMAT
func (s ClickHouseSuite) testBinaryFormat(format string, expected string) {
dstTableName := "binary_format_" + format
srcFullName := s.attachSchemaSuffix(dstTableName)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
val bytea
);
`, srcFullName))
require.NoError(s.t, err)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_binary_format_" + format),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_BINARY_FORMAT": format}
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 1)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf("INSERT INTO %s(val) VALUES($1)", srcFullName), []byte(rawBinaryFormatTestcase))
require.NoError(s.t, err)
e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,val", 2)

rows, err := s.GetRows(dstTableName, "val")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2, "expected 2 rows")
for _, row := range rows.Records {
require.Len(s.t, row, 1, "expected 1 column")
require.Equal(s.t, expected, row[0].Value())
}

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_Binary_Format_Raw() {
s.testBinaryFormat("raw", rawBinaryFormatTestcase)
}

func (s ClickHouseSuite) Test_Binary_Format_Hex() {
s.testBinaryFormat("hex", "0001303132337FFF")
}

func (s ClickHouseSuite) Test_Binary_Format_Base64() {
s.testBinaryFormat("base64", "AAEwMTIzf/8=")
}

func (s ClickHouseSuite) Test_Types_CH() {
srcTableName := "test_types"
srcFullName := s.attachSchemaSuffix("test_types")
Expand Down

0 comments on commit d175ab7

Please sign in to comment.