diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index df8a002c0..1c90ab0fe 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -963,6 +963,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { e2e.RequireEnvCanceled(s.t, env) } +func (s PeerFlowE2ETestSuitePG) Test_CustomSync() { + srcTableName := s.attachSchemaSuffix("test_customsync") + dstTableName := s.attachSchemaSuffix("test_customsync_dst") + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_customsync_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED + }) + e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ + NumberOfSyncs: 1, + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING + }) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( + "INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName)) + e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED + }) + require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value")) + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} + func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() { srcTableName := s.attachSchemaSuffix("test_typesystem_pg") dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")