diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 2564deab4..a670b5ee3 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -110,6 +110,7 @@ func (s ClickHouseSuite) Test_Addition_Removal() { return !rows.Next() }) + runID := e2e.EnvGetRunID(s.t, env) e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ AdditionalTables: []*protos.TableMapping{ { @@ -123,6 +124,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterAddRunID := e2e.EnvGetRunID(s.t, env) + require.NotEqual(s.t, runID, afterAddRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test'); @@ -163,6 +166,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterRemoveRunID := e2e.EnvGetRunID(s.t, env) + require.NotEqual(s.t, runID, afterRemoveRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test'); diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 11e6c7bc8..461077e49 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -727,6 +727,12 @@ func EnvGetWorkflowState(t *testing.T, env WorkflowRun) peerflow.CDCFlowWorkflow return state } +func EnvGetRunID(t *testing.T, env WorkflowRun) string { + execData, err := env.c.DescribeWorkflowExecution(context.Background(), env.GetID(), "") + require.NoError(t, err) + return execData.WorkflowExecutionInfo.Execution.RunId +} + func EnvGetFlowStatus(t *testing.T, env WorkflowRun) protos.FlowStatus { t.Helper() var flowStatus protos.FlowStatus