Skip to content

Commit

Permalink
bring back custom sync test
Browse files Browse the repository at this point in the history
removed in #2418
  • Loading branch information
serprex committed Jan 9, 2025
1 parent f806e04 commit 980eb42
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_CustomSync() {
srcTableName := s.attachSchemaSuffix("test_customsync")
dstTableName := s.attachSchemaSuffix("test_customsync_dst")
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_customsync_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})
e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
NumberOfSyncs: 1,
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING
})
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})
require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value"))
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() {
srcTableName := s.attachSchemaSuffix("test_typesystem_pg")
dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")
Expand Down

0 comments on commit 980eb42

Please sign in to comment.