Skip to content

Commit

Permalink
Increase retry initial backoff to 1 minute (#2418)
Browse files Browse the repository at this point in the history
Retrying in 1 sec very rarely changes things as we typically tend to
wait for database auto-scaling, network issue resolution or other error
types wherre we would anyways need to wait in the order of minuted.

It was also causing alert spamming in the systems basically making them
not usable for a certain kind of errors.
  • Loading branch information
iskakaushik authored Jan 8, 2025
1 parent 28b7384 commit d2263f5
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 51 deletions.
48 changes: 0 additions & 48 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,54 +963,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_CustomSync() {
srcTableName := s.attachSchemaSuffix("test_customsync")
dstTableName := s.attachSchemaSuffix("test_customsync_dst")

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_customsync_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))

require.NoError(s.t, err)
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})

e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
NumberOfSyncs: 1,
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING
})

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})

require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value"))
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() {
srcTableName := s.attachSchemaSuffix("test_typesystem_pg")
dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")
Expand Down
9 changes: 8 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func processTableRemovals(
logger.Info("altering publication for removed tables")
removeTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
alterPublicationRemovedTablesFuture := workflow.ExecuteActivity(
removeTablesCtx,
Expand Down Expand Up @@ -457,6 +460,9 @@ func CDCFlowWorkflow(
renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts)
if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil {
Expand Down Expand Up @@ -500,7 +506,8 @@ func CDCFlowWorkflow(
} else {
logger.Error("error in sync flow", slog.Any("error", err))
}
_ = workflow.Sleep(ctx, 30*time.Second)
logger.Info("sync flow errored, sleeping for 10 minutes before retrying")
_ = workflow.Sleep(ctx, 10*time.Minute)
} else {
logger.Info("sync finished")
}
Expand Down
14 changes: 12 additions & 2 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand All @@ -15,6 +16,9 @@ import (
func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter()))

Expand Down Expand Up @@ -80,8 +84,11 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {

if input.FlowConnectionConfigs != nil && input.DropFlowStats {
dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
StartToCloseTimeout: 2 * time.Minute,
HeartbeatTimeout: 1 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx,
flowable.DeleteMirrorStats, input.FlowJobName)
Expand All @@ -102,7 +109,10 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
}

removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
StartToCloseTimeout: 2 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx,
flowable.RemoveFlowEntryFromCatalog, input.FlowJobName)
Expand Down
13 changes: 13 additions & 0 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/activities"
Expand Down Expand Up @@ -89,6 +90,9 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
if destConnStatus.NeedsSetupMetadataTables {
setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput)
if err := fDst.Get(setupCtx, nil); err != nil {
Expand All @@ -111,6 +115,9 @@ func (s *SetupFlowExecution) ensurePullability(

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 4 * time.Hour,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
srcTableIdNameMapping := make(map[uint32]string)

Expand Down Expand Up @@ -149,6 +156,9 @@ func (s *SetupFlowExecution) createRawTable(
s.Info("creating raw table on destination")
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})

// attempt to create the tables.
Expand Down Expand Up @@ -176,6 +186,9 @@ func (s *SetupFlowExecution) setupNormalizedTables(
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Hour,
HeartbeatTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})

sourceTables := slices.Sorted(maps.Keys(s.tableNameMapping))
Expand Down
10 changes: 10 additions & 0 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *SnapshotFlowExecution) setupReplication(
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 4 * time.Hour,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
MaximumAttempts: 20,
},
})
Expand Down Expand Up @@ -78,6 +79,9 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive(

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 15 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})

if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil {
Expand Down Expand Up @@ -126,6 +130,9 @@ func (s *SnapshotFlowExecution) cloneTable(
schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
WaitForCancellation: true,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
return workflow.ExecuteActivity(
schemaCtx,
Expand Down Expand Up @@ -343,6 +350,9 @@ func SnapshotFlowWorkflow(
StartToCloseTimeout: sessionOpts.ExecutionTimeout,
HeartbeatTimeout: 10 * time.Minute,
WaitForCancellation: true,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})

fMaintain := workflow.ExecuteActivity(
Expand Down
4 changes: 4 additions & 0 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/google/uuid"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -73,6 +74,9 @@ func XminFlowWorkflow(
replicateXminPartitionCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 24 * 5 * time.Hour,
HeartbeatTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Minute,
},
})
if err := workflow.ExecuteActivity(
replicateXminPartitionCtx,
Expand Down

0 comments on commit d2263f5

Please sign in to comment.