diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index df8a002c0..1c90ab0fe 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -963,6 +963,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { e2e.RequireEnvCanceled(s.t, env) } +func (s PeerFlowE2ETestSuitePG) Test_CustomSync() { + srcTableName := s.attachSchemaSuffix("test_customsync") + dstTableName := s.attachSchemaSuffix("test_customsync_dst") + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_customsync_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED + }) + e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ + NumberOfSyncs: 1, + }) + e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING + }) + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( + "INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName)) + e2e.EnvNoError(s.t, env, err) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool { + return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED + }) + require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value")) + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} + func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() { srcTableName := s.attachSchemaSuffix("test_typesystem_pg") dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d3bf1a7a3..8a260e244 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -506,8 +506,15 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } - logger.Info("sync flow errored, sleeping for 10 minutes before retrying") - _ = workflow.Sleep(ctx, 10*time.Minute) + + // cannot use shared.IsSQLStateError because temporal serialize/deserialize + if strings.Contains(err.Error(), "(SQLSTATE 55006)") { + logger.Info("sync flow errored, sleeping for 1 minute before retrying") + _ = workflow.Sleep(ctx, time.Minute) + } else { + logger.Info("sync flow errored, sleeping for 10 minutes before retrying") + _ = workflow.Sleep(ctx, 10*time.Minute) + } } else { logger.Info("sync finished") }