Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,21 @@ class Suite implements GroovyInterceptable {
throw new RuntimeException("dictionary ${dictName} are not ready, status: ${result}")
}

void waitForColocateGroupStable(String groupName, int timeoutSeconds = 60) {
waitForColocateGroupStable(context.dbName, groupName, timeoutSeconds)
}

void waitForColocateGroupStable(String dbName, String groupName, int timeoutSeconds = 60) {
String fullGroupName = "${dbName}.${groupName}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shared helper does not handle Doris global colocate groups. colocate_with accepts names starting with __global__, and SHOW PROC '/colocation_group' reports those as the bare global name rather than db.__global__... (see ColocateTableIndex.GroupId.getFullGroupName). With the current unconditional prefixing, waitForColocateGroupStable("__global__...") will time out even when the group is stable. Please either leave global group names unprefixed here or make this helper explicitly db-scoped.

logger.info("wait colocate group ${fullGroupName} stable")
awaitUntil(timeoutSeconds) {
def groups = sql_return_maparray("SHOW PROC '/colocation_group'")
def group = groups.find { it.GroupName == fullGroupName }
return group != null && group.IsStable == "true"
}
logger.info("colocate group ${fullGroupName} is stable")
}

void flightRecord(Closure actionSupplier) {
runAction(new FlightRecordAction(context), actionSupplier)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ suite("test_backup_restore_colocate", "backup_restore,external") {
res = sql "SELECT * FROM ${dbName}.${tableName2}"
assertEquals(res.size(), insert_num)

waitForColocateGroupStable(dbName, groupName)
explain {
sql("${query}")
contains("COLOCATE")
Expand Down Expand Up @@ -200,7 +201,7 @@ suite("test_backup_restore_colocate", "backup_restore,external") {
res = sql "SELECT * FROM ${dbName}.${tableName2}"
assertEquals(res.size(), insert_num)


waitForColocateGroupStable(dbName, groupName)
explain {
sql("${query}")
contains("COLOCATE")
Expand Down Expand Up @@ -376,26 +377,6 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {
assertTrue(result.ColocateMismatchNum as int == 0)
}

// Wait until the colocate group of `db_name`.`group_name` becomes stable.
// After RESTORE creates a brand-new colocate group in a new db, the group
// is unstable until ColocateTableCheckerAndBalancer scans it (default every
// tablet_checker_interval_ms = 20s). Nereids skips colocate join while the
// group is unstable, so EXPLAIN right after RESTORE FINISHED can miss COLOCATE.
def waitColocateGroupStable = { db_name, group_name ->
def fullName = "${db_name}.${group_name}".toString()
def deadline = System.currentTimeMillis() + 60_000
while (System.currentTimeMillis() < deadline) {
def groups = sql_return_maparray("SHOW PROC '/colocation_group'")
def g = groups.find { it.GroupName == fullName }
if (g != null && g.IsStable == "true") {
log.info("colocate group ${fullName} is stable")
return
}
sleep(1000)
}
log.warn("colocate group ${fullName} did not become stable within 60s")
}

def syncer = getSyncer()
syncer.createS3Repository(repoName)

Expand Down Expand Up @@ -466,6 +447,7 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {
res = sql "SELECT * FROM ${dbName}.${tableName2}"
assertEquals(res.size(), insert_num)

waitForColocateGroupStable(dbName, groupName)
explain {
sql("${query}")
contains("COLOCATE")
Expand Down Expand Up @@ -569,7 +551,7 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {
res = sql "SELECT * FROM ${dbName}.${tableName2}"
assertEquals(res.size(), insert_num)


waitForColocateGroupStable(dbName, groupName)
explain {
sql("${query}")
contains("COLOCATE")
Expand Down Expand Up @@ -644,10 +626,7 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {

query = "select * from ${newDbName}.${tableName1} as t1, ${newDbName}.${tableName2} as t2 where t1.id=t2.id;"

// RESTORE to a brand-new db creates a new colocate group that is initially
// unstable; wait for ColocateTableCheckerAndBalancer to mark it stable, otherwise
// EXPLAIN below may fall back to BROADCAST/SHUFFLE.
waitColocateGroupStable(newDbName, groupName)
waitForColocateGroupStable(newDbName, groupName)

explain {
sql("${query}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ suite("test_colocate_join_of_column_order") {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
notContains "COLOCATE"
}
waitForColocateGroupStable("group_column_order")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wait is after the four negative notContains "COLOCATE" assertions above, so those checks can still pass for the same unstable-group reason this PR is fixing. When the group is unstable, Nereids skips colocate join regardless of the join condition, which means these negative cases do not actually verify the column-order logic. Please move this wait to before the first explain after the inserts/planner setup so both the negative and positive assertions run with a stable colocate group.

explain {
sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
contains "COLOCATE"
Expand Down Expand Up @@ -100,6 +101,7 @@ suite("test_colocate_join_of_column_order") {
sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""

waitForColocateGroupStable("group_column_order3")
explain {
sql("""select /*+ set_var(disable_join_reorder=true) */ * from test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as bigint) c2 from test_colocate_join_of_column_order_tb) test_colocate_join_of_column_order_tb on test_colocate_join_of_column_order_ta.c1 = test_colocate_join_of_column_order_tb.c2 join [shuffle] test_colocate_join_of_column_order_tc on test_colocate_join_of_column_order_tb.c2 = test_colocate_join_of_column_order_tc.c1;""");
contains "COLOCATE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ suite("colocate_join_with_rollup", "query_p0") {
(20220107, 102, 202, 200, 100),
(20220108, 101, 202, 200, 100);"""

waitForColocateGroupStable("group1")
explain {
sql("""select sum_col1,sum_col2
from
Expand Down
Loading