Skip to content

Commit

Permalink
revive custom sync test (#2419)
Browse files Browse the repository at this point in the history
removed in #2418
  • Loading branch information
serprex authored Jan 9, 2025
1 parent f806e04 commit 2bde31f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
41 changes: 41 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_CustomSync() {
srcTableName := s.attachSchemaSuffix("test_customsync")
dstTableName := s.attachSchemaSuffix("test_customsync_dst")
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_customsync_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})
e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
NumberOfSyncs: 1,
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING
})
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})
require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value"))
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() {
srcTableName := s.attachSchemaSuffix("test_typesystem_pg")
dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")
Expand Down
11 changes: 9 additions & 2 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,15 @@ func CDCFlowWorkflow(
} else {
logger.Error("error in sync flow", slog.Any("error", err))
}
logger.Info("sync flow errored, sleeping for 10 minutes before retrying")
_ = workflow.Sleep(ctx, 10*time.Minute)

// cannot use shared.IsSQLStateError because temporal serialize/deserialize
if strings.Contains(err.Error(), "(SQLSTATE 55006)") {
logger.Info("sync flow errored, sleeping for 1 minute before retrying")
_ = workflow.Sleep(ctx, time.Minute)
} else {
logger.Info("sync flow errored, sleeping for 10 minutes before retrying")
_ = workflow.Sleep(ctx, 10*time.Minute)
}
} else {
logger.Info("sync finished")
}
Expand Down

0 comments on commit 2bde31f

Please sign in to comment.