diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d3bf1a7a3..b16048011 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgerrcode" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -506,8 +507,15 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } - logger.Info("sync flow errored, sleeping for 10 minutes before retrying") - _ = workflow.Sleep(ctx, 10*time.Minute) + + // cannot use shared.IsSQLStateError because temporal serialize/deserialize + if strings.Contains(err.Error(), "(SQLSTATE 55006)") { + logger.Info("sync flow errored, sleeping for 1 minute before retrying") + _ = workflow.Sleep(ctx, time.Minute) + } else { + logger.Info("sync flow errored, sleeping for 10 minutes before retrying") + _ = workflow.Sleep(ctx, 10*time.Minute) + } } else { logger.Info("sync finished") }