Skip to content

Commit

Permalink
optimize alter table
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhhu653 committed Jan 9, 2024
1 parent 1f2495e commit 8cd082c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public void init() throws MetaException {
MetastoreConf.getVar(conf, ConfVars.EVENT_LISTENERS));
listeners.add(new SessionPropertiesListener(conf));
transactionalListeners = new ArrayList() {{
add(new AcidEventListener(conf));
// add(new AcidEventListener(conf));
}};
transactionalListeners.addAll(MetaStoreServerUtils.getMetaStoreListeners(
TransactionalMetaStoreEventListener.class, conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -289,7 +290,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam


if (renamedTranslatedToExternalTable || !tableInSpecifiedLoc) {
srcFs = wh.getFs(srcPath);
/** srcFs = wh.getFs(srcPath);
// get new location
assert(isReplicated == HMSHandler.isDbReplicationTarget(db));
Expand All @@ -307,6 +308,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
newt.getSd().setLocation(destPath.toString());
}
destFs = wh.getFs(destPath);
// check that destination does not exist otherwise we will be
Expand Down Expand Up @@ -342,7 +344,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
throw new MetaException("Alter table not allowed for table " +
TableName.getQualified(catName, dbname, name) +
"to new table = " + TableName.getQualified(catName, newDbName, newTblName));
}
}*/
}
}

Expand All @@ -352,7 +354,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam

// also the location field in partition
parts = msdb.getPartitions(catName, dbname, name, -1);
Multimap<Partition, ColumnStatistics> columnStatsNeedUpdated = ArrayListMultimap.create();
Map<List<FieldSchema>, List<Partition>> partsByCols = new HashMap<>();
for (Partition part : parts) {
String oldPartLoc = part.getSd().getLocation();
if (dataWasMoved && oldPartLoc.contains(oldTblLocPath)) {
Expand All @@ -363,44 +365,55 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
}
part.setDbName(newDbName);
part.setTableName(newTblName);
List<ColumnStatistics> multiColStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
part.getValues(), part.getSd().getCols(), oldt, part, null, null);
for (ColumnStatistics colStats : multiColStats) {
columnStatsNeedUpdated.put(part, colStats);
partsByCols.computeIfAbsent(part.getSd().getCols(), k -> new ArrayList<>()).add(part);
}
Map<String, Map<String, ColumnStatistics>> engineToColStats = new HashMap<>();
if (rename) {
// If this is the table rename, update the partition column statistics
for (Map.Entry<List<FieldSchema>, List<Partition>> entry : partsByCols.entrySet()) {
List<String> colNames = entry.getKey().stream().map(fs -> fs.getName()).collect(Collectors.toList());
List<String> partNames = new ArrayList<>();
for (Partition part : entry.getValue()) {
partNames.add(Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues()));
}
List<List<ColumnStatistics>> colStats =
msdb.getPartitionColumnStatistics(catName, dbname, name, partNames, colNames);
for (List<ColumnStatistics> cs : colStats) {
if (cs != null && !cs.isEmpty()) {
String engine = cs.get(0).getEngine();
cs.stream().forEach(stats -> {
stats.getStatsDesc().setDbName(newDbName);
stats.getStatsDesc().setTableName(newTblName);
String partName = stats.getStatsDesc().getPartName();
engineToColStats.computeIfAbsent(engine, key -> new HashMap<>()).put(partName, stats);
});
}
}
}
}
// Do not verify stats parameters on a partitioned table.
msdb.alterTable(catName, dbname, name, newt, null);
int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
String catalogName = catName;
// alterPartition is only for changing the partition location in the table rename
if (dataWasMoved) {

int partsToProcess = parts.size();
int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
int batchStart = 0;
while (partsToProcess > 0) {
int batchEnd = Math.min(batchStart + partitionBatchSize, parts.size());
List<Partition> partBatch = parts.subList(batchStart, batchEnd);
int partBatchSize = partBatch.size();
partsToProcess -= partBatchSize;
batchStart += partBatchSize;
List<List<String>> partValues = new ArrayList<>(partBatchSize);
for (Partition part : partBatch) {
partValues.add(part.getValues());
Batchable.runBatched(partitionBatchSize, parts, new Batchable<Partition, Void>() {
@Override
public List<Void> run(List<Partition> input) throws Exception {
msdb.alterPartitions(catalogName, newDbName, newTblName,
input.stream().map(part -> part.getValues()).collect(Collectors.toList()),
input, newt.getWriteId(), writeIdList);
return null;
}
msdb.alterPartitions(catName, newDbName, newTblName, partValues,
partBatch, newt.getWriteId(), writeIdList);
}
});
}
Deadline.checkTimeout();
Table table = msdb.getTable(catName, newDbName, newTblName);
MTable mTable = msdb.ensureGetMTable(catName, newDbName, newTblName);
for (Entry<Partition, ColumnStatistics> partColStats : columnStatsNeedUpdated.entries()) {
ColumnStatistics newPartColStats = partColStats.getValue();
newPartColStats.getStatsDesc().setDbName(newDbName);
newPartColStats.getStatsDesc().setTableName(newTblName);
msdb.updatePartitionColumnStatistics(table, mTable, newPartColStats,
partColStats.getKey().getValues(), writeIdList, newt.getWriteId());
if (rename) {
for (Entry<String, Map<String, ColumnStatistics>> entry : engineToColStats.entrySet()) {
msdb.updatePartitionColumnStatisticsInBatch(entry.getValue(), oldt,
transactionalListeners, writeIdList, newt.getWriteId());
}
}
} else {
msdb.alterTable(catName, dbname, name, newt, writeIdList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ public static String getUpdatedColumnSql(MPartitionColumnStatistics mStatsObj) {
if (mStatsObj.getNumNulls() != null) {
setStmt.append("\"NUM_NULLS\" = ? ,");
}
setStmt.append("\"ENGINE\" = ? ");
setStmt.append("\"ENGINE\" = ? ,");
setStmt.append("\"DB_NAME\" = ? ,");
setStmt.append("\"TABLE_NAME\" = ? ");
return setStmt.toString();
}

Expand Down Expand Up @@ -339,6 +341,8 @@ public static void initUpdatedColumnStatement(MPartitionColumnStatistics mStatsO
pst.setObject(colIdx++, mStatsObj.getNumNulls());
}
pst.setString(colIdx++, mStatsObj.getEngine());
pst.setString(colIdx++, mStatsObj.getDbName());
pst.setString(colIdx++, mStatsObj.getTableName());
}

public static ColumnStatisticsObj getTableColumnStatisticsObj(
Expand Down

0 comments on commit 8cd082c

Please sign in to comment.