Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,7 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
}

Pair<Boolean, Boolean> result;
boolean holdTableLock = false;
db.writeLockOrDdlException();
try {
// db name not changed
Expand All @@ -3247,51 +3248,76 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
}
// register table, write create table edit log
result = db.createTableWithoutLock(olapTable, false, createTableInfo.isIfNotExists());
if (!result.second) {
olapTable.writeLock();
holdTableLock = true;
}
} finally {
db.writeUnlock();
}
if (!result.first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
}
try {
if (!result.first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
}

if (result.second) { // table already exists
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
// if this is a colocate table, its table id is already added to colocate group
// so we should remove the tableId here
Env.getCurrentColocateIndex().removeTable(tableId);
if (result.second) { // table already exists
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
// if this is a colocate table, its table id is already added to colocate group
// so we should remove the tableId here
Env.getCurrentColocateIndex().removeTable(tableId);
}
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
LOG.info("duplicate create table[{};{}] in db[{};{}], skip next steps",
tableName, tableId, db.getName(), db.getId());
} else {
// if table not exists, then db.createTableWithLock will write an editlog.
hadLogEditCreateTable = true;

// we have added these index to memory, only need to persist here
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId);
Map<Tag, List<List<Long>>> backendsPerBucketSeq = Env.getCurrentColocateIndex()
.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId,
backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
}
LOG.info("successfully create table[{};{}] in db[{};{}]",
tableName, tableId, db.getName(), db.getId());

if (DebugPointUtil.isEnable("FE.createOlapTable.beforeFirstTimeDynamicPartition")) {
long sleepMs = DebugPointUtil.getDebugParamOrDefault(
"FE.createOlapTable.beforeFirstTimeDynamicPartition", "sleepMs", 0L);
if (sleepMs > 0) {
LOG.info("debug point FE.createOlapTable.beforeFirstTimeDynamicPartition, sleep {}ms",
sleepMs);
try {
Thread.sleep(sleepMs);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}

Env.getCurrentEnv().getDynamicPartitionScheduler()
.executeDynamicPartitionFirstTime(db.getId(), olapTable.getId());
// register or remove table from DynamicPartition after table created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
Env.getCurrentEnv().getDynamicPartitionScheduler()
.createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
}
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);

if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
LOG.info("debug point FE.createOlapTable.exception, throw e");
throw new DdlException("debug point FE.createOlapTable.exception");
}
} finally {
if (holdTableLock) {
olapTable.writeUnlock();
}
LOG.info("duplicate create table[{};{}] in db[{};{}], skip next steps",
tableName, tableId, db.getName(), db.getId());
} else {
// if table not exists, then db.createTableWithLock will write an editlog.
hadLogEditCreateTable = true;

// we have added these index to memory, only need to persist here
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId);
Map<Tag, List<List<Long>>> backendsPerBucketSeq = Env.getCurrentColocateIndex()
.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId,
backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
}
LOG.info("successfully create table[{};{}] in db[{};{}]",
tableName, tableId, db.getName(), db.getId());
Env.getCurrentEnv().getDynamicPartitionScheduler()
.executeDynamicPartitionFirstTime(db.getId(), olapTable.getId());
// register or remove table from DynamicPartition after table created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
Env.getCurrentEnv().getDynamicPartitionScheduler()
.createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
}

if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
LOG.info("debug point FE.createOlapTable.exception, throw e");
throw new DdlException("debug point FE.createOlapTable.exception");
}
} catch (DdlException e) {
LOG.warn("create table failed {} - {}", tabletIdSet, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType

// Regression test for the cloud-specific race where a concurrent
// CREATE MATERIALIZED VIEW slips in after OP_CREATE_TABLE is journaled
// but before first-time dynamic partition entries are journaled, causing
// the rollup job to reference partitions that never appear in the journal.
// Replay later NPEs at RollupJobV2.addTabletToInvertedIndex.
//
// The fix holds olapTable.writeLock across the whole first-time dynamic
// partition setup so CREATE MV must wait until the table is fully built.
suite("test_create_table_and_create_mv_race", 'p0, docker') {
if (!isCloudMode()) {
return
}
def options = new ClusterOptions()
options.enableDebugPoints()
options.setFeNum(3)
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true

docker(options) {
sql """set enable_sql_cache=false"""

def tbl = 'test_create_table_and_create_mv_race_tbl'
def mvName = 'test_create_table_and_create_mv_race_mv'

sql "DROP TABLE IF EXISTS ${tbl}"

// Widen the race window: slow down first-time dynamic partition
// so the concurrent CREATE MV has time to arrive.
// With the fix, CREATE MV will block on olapTable.writeLock() and
// only run after the table is fully built.
//
// params serialize to HTTP query strings on the wire, so values
// must be String-convertible (Groovy handles the coercion).
cluster.injectDebugPoints(NodeType.FE, [
'FE.createOlapTable.beforeFirstTimeDynamicPartition': [sleepMs: '10000']
])

try {
def createDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)
def mvDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)

def createFuture = thread('create-table') {
sql """
CREATE TABLE ${tbl} (
user_id BIGINT,
create_dt datetime,
amount BIGINT
)
DUPLICATE KEY(user_id, create_dt)
PARTITION BY RANGE(create_dt) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-2",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.create_history_partition" = "true"
)
"""
createDoneAt.set(System.currentTimeMillis())
}

// Give CREATE TABLE time to reach the injected sleep.
sleep(2000)

def mvFuture = thread('create-mv') {
// If CREATE MV fires before the table exists in the namespace, retry a few times.
def attempts = 30
while (attempts-- > 0) {
try {
sql """
CREATE MATERIALIZED VIEW ${mvName} AS
SELECT user_id AS mv_user_id, sum(amount) AS mv_sum_amount
FROM ${tbl}
GROUP BY user_id
"""
break
} catch (Exception e) {
def msg = e.getMessage() ?: ''
if (msg.contains("Unknown table") || msg.contains("does not exist")) {
sleep(200)
continue
}
throw e
}
}
mvDoneAt.set(System.currentTimeMillis())
}

createFuture.get()
mvFuture.get()

// Correctness invariant: CREATE MV must have finished AFTER CREATE TABLE.
// If the lock is missing, CREATE MV would return first (it's cheap) while
// CREATE TABLE is still inside the injected 10s sleep.
def ct = createDoneAt.get()
def mt = mvDoneAt.get()
assert ct > 0 && mt > 0, "both futures should have completed"
assert mt >= ct, "CREATE MV (${mt}) must finish after CREATE TABLE (${ct})," +
" otherwise the lock fix is missing and MV raced ahead of first-time dynamic partition setup"
} finally {
cluster.clearFrontendDebugPoints()
}

cluster.checkFeIsAlive(1, true)

// Verify the table has all dynamic partitions.
// dynamic_partition.start=-2, end=2, create_history_partition=true → 5 partitions expected
def partitions = sql "SHOW PARTITIONS FROM ${tbl}"
assert partitions.size() >= 3,
"dynamic_partition.start=-2/end=2 should have produced at least 3 partitions, got ${partitions.size()}"

// Sanity: the MV exists and an end-to-end insert works.
def now = new Date()
def dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
def today = dateFormat.format(now)
sql "INSERT INTO ${tbl} VALUES (1, '${today} 12:00:00', 100)"
def cnt = sql "SELECT count(*) FROM ${tbl}"
assert cnt[0][0] == 1L, "expected 1 row, got ${cnt[0][0]}"
}
}
Loading