From d2263f5b4efc5497a805408e3da5b3fc1dffd58b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 8 Jan 2025 08:37:11 -0600 Subject: [PATCH] Increase retry initial backoff to 1 minute (#2418) Retrying in 1 sec very rarely changes things as we typically tend to wait for database auto-scaling, network issue resolution or other error types wherre we would anyways need to wait in the order of minuted. It was also causing alert spamming in the systems basically making them not usable for a certain kind of errors. --- flow/e2e/postgres/peer_flow_pg_test.go | 48 -------------------------- flow/workflows/cdc_flow.go | 9 ++++- flow/workflows/drop_flow.go | 14 ++++++-- flow/workflows/setup_flow.go | 13 +++++++ flow/workflows/snapshot_flow.go | 10 ++++++ flow/workflows/xmin_flow.go | 4 +++ 6 files changed, 47 insertions(+), 51 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 994e612bd..df8a002c0 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -963,54 +963,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { e2e.RequireEnvCanceled(s.t, env) } -func (s PeerFlowE2ETestSuitePG) Test_CustomSync() { - srcTableName := s.attachSchemaSuffix("test_customsync") - dstTableName := s.attachSchemaSuffix("test_customsync_dst") - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_customsync_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.Peer().Name, - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - - require.NoError(s.t, err) - tc := e2e.NewTemporalClient(s.t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - - e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ - NumberOfSyncs: 1, - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING - }) - - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( - "INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName)) - e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value")) - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() { srcTableName := s.attachSchemaSuffix("test_typesystem_pg") dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1d55fa797..d3bf1a7a3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,6 +214,9 @@ func processTableRemovals( logger.Info("altering publication for removed tables") removeTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) alterPublicationRemovedTablesFuture := workflow.ExecuteActivity( removeTablesCtx, @@ -457,6 +460,9 @@ func CDCFlowWorkflow( renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { @@ -500,7 +506,8 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } - _ = workflow.Sleep(ctx, 30*time.Second) + logger.Info("sync flow errored, sleeping for 10 minutes before retrying") + _ = workflow.Sleep(ctx, 10*time.Minute) } else { logger.Info("sync finished") } diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 829fbe760..d6d058341 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -6,6 +6,7 @@ import ( "time" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -15,6 +16,9 @@ import ( func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) @@ -80,8 +84,11 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { if input.FlowConnectionConfigs != nil && input.DropFlowStats { dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, HeartbeatTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, input.FlowJobName) @@ -102,7 +109,10 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 0a8a7d4f3..5c666ac53 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -8,6 +8,7 @@ import ( "time" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/activities" @@ -89,6 +90,9 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( if destConnStatus.NeedsSetupMetadataTables { setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput) if err := fDst.Get(setupCtx, nil); err != nil { @@ -111,6 +115,9 @@ func (s *SetupFlowExecution) ensurePullability( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) srcTableIdNameMapping := make(map[uint32]string) @@ -149,6 +156,9 @@ func (s *SetupFlowExecution) createRawTable( s.Info("creating raw table on destination") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) // attempt to create the tables. @@ -176,6 +186,9 @@ func (s *SetupFlowExecution) setupNormalizedTables( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) sourceTables := slices.Sorted(maps.Keys(s.tableNameMapping)) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index b45292155..908a78ba7 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -42,6 +42,7 @@ func (s *SnapshotFlowExecution) setupReplication( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, MaximumAttempts: 20, }, }) @@ -78,6 +79,9 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil { @@ -126,6 +130,9 @@ func (s *SnapshotFlowExecution) cloneTable( schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) return workflow.ExecuteActivity( schemaCtx, @@ -343,6 +350,9 @@ func SnapshotFlowWorkflow( StartToCloseTimeout: sessionOpts.ExecutionTimeout, HeartbeatTimeout: 10 * time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fMaintain := workflow.ExecuteActivity( diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 898d67c66..f6f83d85a 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -73,6 +74,9 @@ func XminFlowWorkflow( replicateXminPartitionCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity( replicateXminPartitionCtx,