diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 2564deab4..a3d3c2618 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -89,11 +89,14 @@ func (s ClickHouseSuite) Test_Addition_Removal() { `, srcTableName)) require.NoError(s.t, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "first insert", "test_table_add_remove", dstTableName, "id,key") + runID := env.GetRunID() e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) e2e.EnvWaitFor(s.t, env, 4*time.Minute, "pausing for add table", func() bool { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_PAUSED }) + pausedRunID := env.GetRunID() + require.NotEqual(s.t, runID, pausedRunID) _, err = s.Conn().Exec(context.Background(), `SELECT pg_terminate_backend(pid) FROM pg_stat_activity @@ -123,6 +126,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterAddRunID := env.GetRunID() + require.NotEqual(s.t, pausedRunID, afterAddRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test'); @@ -163,6 +168,8 @@ func (s ClickHouseSuite) Test_Addition_Removal() { flowStatus := getFlowStatus() return flowStatus == protos.FlowStatus_STATUS_RUNNING }) + afterRemoveRunID := env.GetRunID() + require.NotEqual(s.t, runID, afterRemoveRunID) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key) VALUES ('test');