diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 994e612bd..df8a002c0 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -963,54 +963,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { e2e.RequireEnvCanceled(s.t, env) } -func (s PeerFlowE2ETestSuitePG) Test_CustomSync() { - srcTableName := s.attachSchemaSuffix("test_customsync") - dstTableName := s.attachSchemaSuffix("test_customsync_dst") - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_customsync_flow"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.Peer().Name, - } - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - - require.NoError(s.t, err) - tc := e2e.NewTemporalClient(s.t) - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - - e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ - NumberOfSyncs: 1, - }) - e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING - }) - - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf( - "INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName)) - e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 3*time.Minute, "paused workflow", func() bool { - return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED - }) - - require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value")) - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) -} - func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() { srcTableName := s.attachSchemaSuffix("test_typesystem_pg") dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1d55fa797..d3bf1a7a3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -214,6 +214,9 @@ func processTableRemovals( logger.Info("altering publication for removed tables") removeTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) alterPublicationRemovedTablesFuture := workflow.ExecuteActivity( removeTablesCtx, @@ -457,6 +460,9 @@ func CDCFlowWorkflow( renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 12 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { @@ -500,7 +506,8 @@ func CDCFlowWorkflow( } else { logger.Error("error in sync flow", slog.Any("error", err)) } - _ = workflow.Sleep(ctx, 30*time.Second) + logger.Info("sync flow errored, sleeping for 10 minutes before retrying") + _ = workflow.Sleep(ctx, 10*time.Minute) } else { logger.Info("sync finished") } diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 829fbe760..d6d058341 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -6,6 +6,7 @@ import ( "time" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -15,6 +16,9 @@ import ( func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) @@ -80,8 +84,11 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { if input.FlowConnectionConfigs != nil && input.DropFlowStats { dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, HeartbeatTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, input.FlowJobName) @@ -102,7 +109,10 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 0a8a7d4f3..5c666ac53 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -8,6 +8,7 @@ import ( "time" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/activities" @@ -89,6 +90,9 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( if destConnStatus.NeedsSetupMetadataTables { setupCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 2 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fDst := workflow.ExecuteActivity(setupCtx, flowable.SetupMetadataTables, dstSetupInput) if err := fDst.Get(setupCtx, nil); err != nil { @@ -111,6 +115,9 @@ func (s *SetupFlowExecution) ensurePullability( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) srcTableIdNameMapping := make(map[uint32]string) @@ -149,6 +156,9 @@ func (s *SetupFlowExecution) createRawTable( s.Info("creating raw table on destination") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) // attempt to create the tables. @@ -176,6 +186,9 @@ func (s *SetupFlowExecution) setupNormalizedTables( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) sourceTables := slices.Sorted(maps.Keys(s.tableNameMapping)) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index b45292155..908a78ba7 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -42,6 +42,7 @@ func (s *SnapshotFlowExecution) setupReplication( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, MaximumAttempts: 20, }, }) @@ -78,6 +79,9 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil { @@ -126,6 +130,9 @@ func (s *SnapshotFlowExecution) cloneTable( schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) return workflow.ExecuteActivity( schemaCtx, @@ -343,6 +350,9 @@ func SnapshotFlowWorkflow( StartToCloseTimeout: sessionOpts.ExecutionTimeout, HeartbeatTimeout: 10 * time.Minute, WaitForCancellation: true, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) fMaintain := workflow.ExecuteActivity( diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 898d67c66..f6f83d85a 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -73,6 +74,9 @@ func XminFlowWorkflow( replicateXminPartitionCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Minute, + }, }) if err := workflow.ExecuteActivity( replicateXminPartitionCtx,