From c78cb7fe166b20427eb28a55767c342068564473 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 7 Jan 2025 19:31:57 -0600 Subject: [PATCH 1/3] Increase retry initial backoff to 1 minute Retrying in 1 sec very rarely changes things as we typically tend to wait for database auto-scaling, network issue resolution or other error types wherre we would anyways need to wait in the order of minuted. It was also causing alert spamming in the systems basically making them not usable for a certain kind of errors. --- flow/workflows/cdc_flow.go | 9 ++++++++- flow/workflows/drop_flow.go | 10 ++++++++++ flow/workflows/setup_flow.go | 13 +++++++++++++ flow/workflows/snapshot_flow.go | 10 ++++++++++ flow/workflows/xmin_flow.go | 4 ++++ 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1d55fa797..d3bf1a7a3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,6 +214,9 @@ func processTableRemovals( logger.Info("altering publication for removed tables") removeTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) alterPublicationRemovedTablesFuture := workflow.ExecuteActivity( removeTablesCtx, @@ -457,6 +460,9 @@ func CDCFlowWorkflow( renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { @@ -500,7 +506,8 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } - _ = workflow.Sleep(ctx, 30*time.Second) + logger.Info("sync flow errored, sleeping for 10 minutes before retrying") + _ = workflow.Sleep(ctx, 10*time.Minute) } else { logger.Info("sync finished") } diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 829fbe760..0cb2b173e 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -6,6 +6,7 @@ import ( "time" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -15,6 +16,9 @@ import ( func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) @@ -82,6 +86,9 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, HeartbeatTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, input.FlowJobName) @@ -103,6 +110,9 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 0a8a7d4f3..5c666ac53 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -8,6 +8,7 @@ import ( "time" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/activities" @@ -89,6 +90,9 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( if destConnStatus.NeedsSetupMetadataTables { setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput) if err := fDst.Get(setupCtx, nil); err != nil { @@ -111,6 +115,9 @@ func (s *SetupFlowExecution) ensurePullability( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) srcTableIdNameMapping := make(map[uint32]string) @@ -149,6 +156,9 @@ func (s *SetupFlowExecution) createRawTable( s.Info("creating raw table on destination") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) // attempt to create the tables. @@ -176,6 +186,9 @@ func (s *SetupFlowExecution) setupNormalizedTables( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) sourceTables := slices.Sorted(maps.Keys(s.tableNameMapping)) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index b45292155..908a78ba7 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -42,6 +42,7 @@ func (s *SnapshotFlowExecution) setupReplication( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, MaximumAttempts: 20, }, }) @@ -78,6 +79,9 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil { @@ -126,6 +130,9 @@ func (s *SnapshotFlowExecution) cloneTable( schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) return workflow.ExecuteActivity( schemaCtx, @@ -343,6 +350,9 @@ func SnapshotFlowWorkflow( StartToCloseTimeout: sessionOpts.ExecutionTimeout, HeartbeatTimeout: 10 * time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fMaintain := workflow.ExecuteActivity( diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 898d67c66..f6f83d85a 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -73,6 +74,9 @@ func XminFlowWorkflow( replicateXminPartitionCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity( replicateXminPartitionCtx, From 3ad106dbbd52024a28919ca71ddef8756531d7ad Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 7 Jan 2025 19:41:59 -0600 Subject: [PATCH 2/3] increase start to close timeout in a couple of places --- flow/workflows/drop_flow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 0cb2b173e..d6d058341 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -84,7 +84,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { if input.FlowConnectionConfigs != nil && input.DropFlowStats { dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, HeartbeatTimeout: 1 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: 1 * time.Minute, @@ -109,7 +109,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: 1 * time.Minute, }, From 692da2bc08612a8e285b165a60649f6cfbf00cf2 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 8 Jan 2025 08:29:19 -0600 Subject: [PATCH 3/3] remove rogue test --- flow/e2e/postgres/peer_flow_pg_test.go | 48 -------------------------- 1 file changed, 48 deletions(-) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 994e612bd..df8a002c0 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -963,54 +963,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { e2e.RequireEnvCanceled(s.t, env) } -func (s PeerFlowE2ETestSuitePG) Test_CustomSync() { - srcTableName := s.attachSchemaSuffix("test_customsync") - dstTableName := s.attachSchemaSuffix("test_customsync_dst") - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_customsync_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.Peer().Name, - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - - require.NoError(s.t, err) - tc := e2e.NewTemporalClient(s.t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - - e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ - NumberOfSyncs: 1, - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING - }) - - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( - "INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName)) - e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value")) - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() { srcTableName := s.attachSchemaSuffix("test_typesystem_pg") dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")