Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_partitionValues(_partitions_need_create);
request.__set_be_endpoint(be_endpoint);
request.__set_write_single_replica(_write_single_replica);
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
Expand Down Expand Up @@ -211,6 +212,7 @@ Status VRowDistribution::_replace_overwriting_partition() {

std::string be_endpoint = BackendOptions::get_be_endpoint();
request.__set_be_endpoint(be_endpoint);
request.__set_load_to_single_tablet(_tablet_finder->is_find_tablet_every_sink());
if (_state && _state->get_query_ctx()) {
// Pass query_id to FE so it can determine if this is a multi-instance load by checking Coordinator
request.__set_query_id(_state->get_query_ctx()->query_id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -361,6 +362,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -4535,6 +4537,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
List<TOlapTablePartition> partitions = Lists.newArrayList();
boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet();
final boolean hasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it (skipped on cache-hit).
String cachedClusterId = null;
Expand All @@ -4561,17 +4564,36 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
boolean randomDistribution =
Comment thread
sollhui marked this conversation as resolved.
partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM;
boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution;
partitions.add(tPartition);
// tablet
AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache
&& Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
partitionSlaveTablets)) {
partitionSlaveTablets, cachedLoadTabletIdx)) {
if (cacheLoadTabletIdx) {
tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
}
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
if (cacheLoadTabletIdx) {
try {
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
.getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition);
tPartition.setLoadTabletIdx(tabletIndex);
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
}
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
+ 1;
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
Expand Down Expand Up @@ -4643,9 +4665,13 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
}

if (needUseCache) {
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1;
long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
partitionSlaveTablets);
partitionSlaveTablets, loadTabletIdx);
if (cacheLoadTabletIdx) {
tPartition.setLoadTabletIdx(cachedTabletIdx);
}
}

tablets.addAll(partitionTablets);
Expand Down Expand Up @@ -4863,6 +4889,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
boolean loadToSingleTablet = request.isSetLoadToSingleTablet() && request.isLoadToSingleTablet();
final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
// Lazy: resolved on the first CloudTablet that needs it.
String replaceCachedClusterId = null;
Expand Down Expand Up @@ -4891,17 +4918,36 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
boolean randomDistribution =
partition.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM;
boolean cacheLoadTabletIdx = loadToSingleTablet && randomDistribution;
partitions.add(tPartition);
// tablet
AtomicLong cachedLoadTabletIdx = new AtomicLong(-1);
if (needUseCache && txnId != 0
&& Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
partitionSlaveTablets)) {
partitionSlaveTablets, cachedLoadTabletIdx)) {
if (cacheLoadTabletIdx) {
tPartition.setLoadTabletIdx(cachedLoadTabletIdx.get());
}
// fast path, if cached
tablets.addAll(partitionTablets);
slaveTablets.addAll(partitionSlaveTablets);
continue;
}
if (cacheLoadTabletIdx) {
try {
int tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
.getCurrentTabletLoadIndex(dbId, olapTable.getId(), partition);
tPartition.setLoadTabletIdx(tabletIndex);
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send replace partition error status: {}", result);
return result;
}
}
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
+ 1;
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
Expand Down Expand Up @@ -4978,9 +5024,13 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
}

if (needUseCache) {
Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
long loadTabletIdx = cacheLoadTabletIdx ? tPartition.getLoadTabletIdx() : -1;
long cachedTabletIdx = Env.getCurrentGlobalTransactionMgr().getAutoPartitionCacheMgr()
.getOrSetAutoPartitionInfo(txnId, partition.getId(), partitionTablets,
partitionSlaveTablets);
partitionSlaveTablets, loadTabletIdx);
if (cacheLoadTabletIdx) {
tPartition.setLoadTabletIdx(cachedTabletIdx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Cache auto partition info, txnId: {}, partitionId: {}, "
+ "tablets: {}, slaveTablets: {}", txnId, partition.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/*
** this class AutoPartitionCacheManager is used for solve the follow question :
Expand Down Expand Up @@ -59,10 +60,17 @@ public class AutoPartitionCacheManager {
public static class PartitionTabletCache {
public final List<TTabletLocation> tablets;
public final List<TTabletLocation> slaveTablets;
public final long loadTabletIdx;

public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation> slaveTablets) {
this(tablets, slaveTablets, -1);
}

public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation> slaveTablets,
long loadTabletIdx) {
this.tablets = tablets;
this.slaveTablets = slaveTablets;
this.loadTabletIdx = loadTabletIdx;
}
}

Expand All @@ -73,6 +81,14 @@ public PartitionTabletCache(List<TTabletLocation> tablets, List<TTabletLocation>
// return true if cached, else false, this function only read cache
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets) {
return getAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets,
new AtomicLong(-1));
}

// return true if cached, else false, this function only read cache
public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets,
AtomicLong loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap = autoPartitionInfo.get(txnId);
if (partitionMap == null) {
return false;
Expand All @@ -87,11 +103,18 @@ public boolean getAutoPartitionInfo(Long txnId, Long partitionId,
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.clear();
partitionSlaveTablets.addAll(cached.slaveTablets);
loadTabletIdx.set(cached.loadTabletIdx);
return true;
}

public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets) {
getOrSetAutoPartitionInfo(txnId, partitionId, partitionTablets, partitionSlaveTablets, -1);
}

public long getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
List<TTabletLocation> partitionTablets, List<TTabletLocation> partitionSlaveTablets,
long loadTabletIdx) {
ConcurrentHashMap<Long, PartitionTabletCache> partitionMap =
autoPartitionInfo.computeIfAbsent(txnId, k -> new ConcurrentHashMap<>());

Expand All @@ -100,7 +123,8 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
needUpdate.set(true);
return new PartitionTabletCache(
new ArrayList<>(partitionTablets),
new ArrayList<>(partitionSlaveTablets)
new ArrayList<>(partitionSlaveTablets),
loadTabletIdx
);
});

Expand All @@ -110,13 +134,13 @@ public void getOrSetAutoPartitionInfo(Long txnId, Long partitionId,
partitionTablets.addAll(cached.tablets);
partitionSlaveTablets.addAll(cached.slaveTablets);
LOG.debug("Get cached auto partition info from cache, txnId: {}, partitionId: {}, "
+ "tablets: {}, slaveTablets: {}", txnId, partitionId,
cached.tablets.size(), cached.slaveTablets.size());
+ "tablets: {}, slaveTablets: {}, loadTabletIdx: {}", txnId, partitionId,
cached.tablets.size(), cached.slaveTablets.size(), cached.loadTabletIdx);
}
return cached.loadTabletIdx;
}

public void clearAutoPartitionInfo(Long txnId) {
autoPartitionInfo.remove(txnId);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.thrift.TTabletLocation;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class AutoPartitionCacheManagerTest {
@Test
public void testGetOrSetAutoPartitionInfoReturnsCachedLoadTabletIdx() {
AutoPartitionCacheManager cacheManager = new AutoPartitionCacheManager();
List<TTabletLocation> firstTablets = new ArrayList<>();
firstTablets.add(new TTabletLocation(10001L, Arrays.asList(1L)));
List<TTabletLocation> firstSlaveTablets = new ArrayList<>();

long storedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
10L, 20L, firstTablets, firstSlaveTablets, 3);
Assert.assertEquals(3, storedLoadTabletIdx);

List<TTabletLocation> secondTablets = new ArrayList<>();
secondTablets.add(new TTabletLocation(20001L, Arrays.asList(2L)));
List<TTabletLocation> secondSlaveTablets = new ArrayList<>();

long cachedLoadTabletIdx = cacheManager.getOrSetAutoPartitionInfo(
10L, 20L, secondTablets, secondSlaveTablets, 5);
Assert.assertEquals(3, cachedLoadTabletIdx);
Assert.assertEquals(1, secondTablets.size());
Assert.assertEquals(10001L, secondTablets.get(0).getTabletId());

List<TTabletLocation> cachedTablets = new ArrayList<>();
List<TTabletLocation> cachedSlaveTablets = new ArrayList<>();
AtomicLong readLoadTabletIdx = new AtomicLong(-1);
Assert.assertTrue(cacheManager.getAutoPartitionInfo(
10L, 20L, cachedTablets, cachedSlaveTablets, readLoadTabletIdx));
Assert.assertEquals(3, readLoadTabletIdx.get());
Assert.assertEquals(1, cachedTablets.size());
Assert.assertEquals(10001L, cachedTablets.get(0).getTabletId());
}
}
4 changes: 4 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,8 @@ struct TCreatePartitionRequest {
6: optional bool write_single_replica = false
// query_id to identify the coordinator, if coordinator exists, it means this is a multi-instance load
7: optional Types.TUniqueId query_id
// Whether the caller's table sink is using load_to_single_tablet mode.
8: optional bool load_to_single_tablet = false
}

struct TCreatePartitionResult {
Expand All @@ -1425,6 +1427,8 @@ struct TReplacePartitionRequest {
5: optional string be_endpoint
6: optional bool write_single_replica = false
7: optional Types.TUniqueId query_id
// Whether the caller's table sink is using load_to_single_tablet mode.
8: optional bool load_to_single_tablet = false
}

struct TReplacePartitionResult {
Expand Down
Loading