From 1b03e7e0b65bea82acc83c073a116f6ca46c5be9 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 10 Jun 2026 14:42:47 +0800 Subject: [PATCH] fix load_to_single_tablet routing for auto partition --- be/src/exec/sink/vrow_distribution.cpp | 2 + .../doris/service/FrontendServiceImpl.java | 62 +++++++++++++++++-- .../AutoPartitionCacheManager.java | 32 ++++++++-- .../AutoPartitionCacheManagerTest.java | 61 ++++++++++++++++++ gensrc/thrift/FrontendService.thrift | 4 ++ 5 files changed, 151 insertions(+), 10 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java diff --git a/be/src/exec/sink/vrow_distribution.cpp b/be/src/exec/sink/vrow_distribution.cpp index 96cf8966fe0ce1..35d96556f31ada 100644 --- a/be/src/exec/sink/vrow_distribution.cpp +++ b/be/src/exec/sink/vrow_distribution.cpp @@ -110,6 +110,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_partitionValues(_partitions_need_create); request.__set_be_endpoint(be_endpoint); request.__set_write_single_replica(_write_single_replica); + request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink()); if (_state && _state->get_query_ctx()) { // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator request.__set_query_id(_state->get_query_ctx()->query_id()); @@ -211,6 +212,7 @@ Status VRowDistribution::_replace_overwriting_partition() { std::string be_endpoint = BackendOptions::get_be_endpoint(); request.__set_be_endpoint(be_endpoint); + request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink()); if (_state && _state->get_query_ctx()) { // Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator request.__set_query_id(_state->get_query_ctx()->query_id()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index f01de48cf7d625..fb74d7ca29a02a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; @@ -361,6 +362,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -4535,6 +4537,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); List partitions = Lists.newArrayList(); + boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet(); final boolean hasBeEndpoint = request.isSetBeEndpoint(); // Lazy: resolved on the first CloudTablet that needs it (skipped on cache-hit). String cachedClusterId = null; @@ -4561,17 +4564,36 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + boolean randomDistribution = + partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM; + boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution; partitions.add(tPartition); // tablet + AtomicLong cachedLoadTabletIdx = new AtomicLong(-1); if (needUseCache && Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getAutoPartitionInfo(txnId, partition.getId(), partitionTablets, - partitionSlaveTablets)) { + partitionSlaveTablets, cachedLoadTabletIdx)) { + if (cacheLoadTabletIdx) { + tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get()); + } // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); continue; } + if (cacheLoadTabletIdx) { + try { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition); + tPartition.setLoadTabletIdx(tabletIndex); + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + } int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { @@ -4643,9 +4665,13 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t } if (needUseCache) { - Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() + long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1; + long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets, - partitionSlaveTablets); + partitionSlaveTablets, loadTabletIdx); + if (cacheLoadTabletIdx) { + tPartition.setLoadTabletIdx(cachedTabletIdx); + } } tablets.addAll(partitionTablets); @@ -4863,6 +4889,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request List tablets = new ArrayList<>(); List slaveTablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet(); final boolean replaceHasBeEndpoint = request.isSetBeEndpoint(); // Lazy: resolved on the first CloudTablet that needs it. String replaceCachedClusterId = null; @@ -4891,17 +4918,36 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId())); + boolean randomDistribution = + partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM; + boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution; partitions.add(tPartition); // tablet + AtomicLong cachedLoadTabletIdx = new AtomicLong(-1); if (needUseCache && txnId != 0 && Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getAutoPartitionInfo(txnId, partition.getId(), partitionTablets, - partitionSlaveTablets)) { + partitionSlaveTablets, cachedLoadTabletIdx)) { + if (cacheLoadTabletIdx) { + tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get()); + } // fast path, if cached tablets.addAll(partitionTablets); slaveTablets.addAll(partitionSlaveTablets); continue; } + if (cacheLoadTabletIdx) { + try { + int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr() + .getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition); + tPartition.setLoadTabletIdx(tabletIndex); + } catch (UserException ex) { + errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage())); + result.setStatus(errorStatus); + LOG.warn("send replace partition error status: {}", result); + return result; + } + } int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { @@ -4978,9 +5024,13 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request } if (needUseCache) { - Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() + long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1; + long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr() .getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets, - partitionSlaveTablets); + partitionSlaveTablets, loadTabletIdx); + if (cacheLoadTabletIdx) { + tPartition.setLoadTabletIdx(cachedTabletIdx); + } if (LOG.isDebugEnabled()) { LOG.debug("Cache auto partition info, txnId: {}, partitionId: {}, " + "tablets: {}, slaveTablets: {}", txnId, partition.getId(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java index 96d6033deb24d4..23824ef268f26f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/AutoPartitionCacheManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /* ** this class AutoPartitionCacheManager is used for solve the follow question : @@ -59,10 +60,17 @@ public class AutoPartitionCacheManager { public static class PartitionTabletCache { public final List tablets; public final List slaveTablets; + public final long loadTabletIdx; public PartitionTabletCache(List tablets, List slaveTablets) { + this(tablets, slaveTablets, -1); + } + + public PartitionTabletCache(List tablets, List slaveTablets, + long loadTabletIdx) { this.tablets = tablets; this.slaveTablets = slaveTablets; + this.loadTabletIdx = loadTabletIdx; } } @@ -73,6 +81,14 @@ public PartitionTabletCache(List tablets, List // return true if cached, else false, this function only read cache public boolean getAutoPartitionInfo(Long txnId, Long partitionId, List partitionTablets, List partitionSlaveTablets) { + return getAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets, + new AtomicLong(-1)); + } + + // return true if cached, else false, this function only read cache + public boolean getAutoPartitionInfo(Long txnId, Long partitionId, + List partitionTablets, List partitionSlaveTablets, + AtomicLong loadTabletIdx) { ConcurrentHashMap partitionMap = autoPartitionInfo.get(txnId); if (partitionMap == null) { return false; @@ -87,11 +103,18 @@ public boolean getAutoPartitionInfo(Long txnId, Long partitionId, partitionTablets.addAll(cached.tablets); partitionSlaveTablets.clear(); partitionSlaveTablets.addAll(cached.slaveTablets); + loadTabletIdx.set(cached.loadTabletIdx); return true; } public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId, List partitionTablets, List partitionSlaveTablets) { + getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets, -1); + } + + public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId, + List partitionTablets, List partitionSlaveTablets, + long loadTabletIdx) { ConcurrentHashMap partitionMap = autoPartitionInfo.computeIfAbsent(txnId, k -> new ConcurrentHashMap<>()); @@ -100,7 +123,8 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId, needUpdate.set(true); return new PartitionTabletCache( new ArrayList<>(partitionTablets), - new ArrayList<>(partitionSlaveTablets) + new ArrayList<>(partitionSlaveTablets), + loadTabletIdx ); }); @@ -110,13 +134,13 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId, partitionTablets.addAll(cached.tablets); partitionSlaveTablets.addAll(cached.slaveTablets); LOG.debug("Get cached auto partition info from cache, txnId: {}, partitionId: {}, " - + "tablets: {}, slaveTablets: {}", txnId, partitionId, - cached.tablets.size(), cached.slaveTablets.size()); + + "tablets: {}, slaveTablets: {}, loadTabletIdx: {}", txnId, partitionId, + cached.tablets.size(), cached.slaveTablets.size(), cached.loadTabletIdx); } + return cached.loadTabletIdx; } public void clearAutoPartitionInfo(Long txnId) { autoPartitionInfo.remove(txnId); } } - diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java new file mode 100644 index 00000000000000..6e56cd5551c638 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/AutoPartitionCacheManagerTest.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.thrift.TTabletLocation; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public class AutoPartitionCacheManagerTest { + @Test + public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() { + AutoPartitionCacheManager cacheManager = new AutoPartitionCacheManager(); + List firstTablets = new ArrayList<>(); + firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L))); + List firstSlaveTablets = new ArrayList<>(); + + long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo( + 10L, 20L, firstTablets, firstSlaveTablets, 3); + Assert.assertEquals(3, storedLoadTabletIdx); + + List secondTablets = new ArrayList<>(); + secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L))); + List secondSlaveTablets = new ArrayList<>(); + + long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo( + 10L, 20L, secondTablets, secondSlaveTablets, 5); + Assert.assertEquals(3, cachedLoadTabletIdx); + Assert.assertEquals(1, secondTablets.size()); + Assert.assertEquals(10001L, secondTablets.get(0).getTabletId()); + + List cachedTablets = new ArrayList<>(); + List cachedSlaveTablets = new ArrayList<>(); + AtomicLong readLoadTabletIdx = new AtomicLong(-1); + Assert.assertTrue(cacheManager.getAutoPartitionInfo( + 10L, 20L, cachedTablets, cachedSlaveTablets, readLoadTabletIdx)); + Assert.assertEquals(3, readLoadTabletIdx.get()); + Assert.assertEquals(1, cachedTablets.size()); + Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId()); + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index cc74ffbe06558a..b7111df7066908 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1405,6 +1405,8 @@ struct TCreatePartitionRequest { 6: optional bool write_single_replica = false // query_id to identify the coordinator, if coordinator exists, it means this is a multi-instance load 7: optional Types.TUniqueId query_id + // Whether the caller's table sink is using load_to_single_tablet mode. + 8: optional bool load_to_single_tablet = false } struct TCreatePartitionResult { @@ -1425,6 +1427,8 @@ struct TReplacePartitionRequest { 5: optional string be_endpoint 6: optional bool write_single_replica = false 7: optional Types.TUniqueId query_id + // Whether the caller's table sink is using load_to_single_tablet mode. + 8: optional bool load_to_single_tablet = false } struct TReplacePartitionResult {