From 48930ad8e2bcbd0d74e8817951ffaede988ac777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 9 Jan 2025 20:52:11 +0000 Subject: [PATCH] cdc flow: continue as new after state updates (#2420) prevents rewinding state to start of workflow from undoing changes to settings (such as table additions/removals) --- flow/e2e/clickhouse/peer_flow_ch_test.go | 5 +++++ flow/e2e/test_utils.go | 7 +++++++ flow/workflows/cdc_flow.go | 2 ++ 3 files changed, 14 insertions(+) diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 2564deab4..a670b5ee3 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -110,6 +110,7 @@ func (s ClickHouseSuite) Test_Addition_Removal() { return !rows.Next() }) + runID := e2e.EnvGetRunID(s.t, env) e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ AdditionalTables: []*protos.TableMapping{ { @@ -123,6 +124,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterAddRunID := e2e.EnvGetRunID(s.t, env) + require.NotEqual(s.t, runID, afterAddRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test'); @@ -163,6 +166,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterRemoveRunID := e2e.EnvGetRunID(s.t, env) + require.NotEqual(s.t, runID, afterRemoveRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test'); diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 11e6c7bc8..37705e067 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -727,6 +727,13 @@ func EnvGetWorkflowState(t *testing.T, env WorkflowRun) peerflow.CDCFlowWorkflow return state } +func EnvGetRunID(t *testing.T, env WorkflowRun) string { + t.Helper() + execData, err := env.c.DescribeWorkflowExecution(context.Background(), env.GetID(), "") + require.NoError(t, err) + return execData.WorkflowExecutionInfo.Execution.RunId +} + func EnvGetFlowStatus(t *testing.T, env WorkflowRun) protos.FlowStatus { t.Helper() var flowStatus protos.FlowStatus diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 8a260e244..497e2d4bb 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -369,6 +369,7 @@ func CDCFlowWorkflow( logger.Info(fmt.Sprintf("mirror has been resumed after %s", time.Since(startTime).Round(time.Second))) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID @@ -479,6 +480,7 @@ func CDCFlowWorkflow( } state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) } var finished bool