Skip to content

Commit f00825b

Browse files
committed
[AMORO-3775] Add support for metric-based refresh event in TableRuntimeRefreshExecutor
1 parent 208399f commit f00825b

File tree

11 files changed

+849
-17
lines changed

11 files changed

+849
-17
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java

Lines changed: 142 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@
1818

1919
package org.apache.amoro.server.dashboard;
2020

21+
import static org.apache.amoro.table.TablePartitionDetailProperties.BASE_FILE_COUNT;
22+
import static org.apache.amoro.table.TablePartitionDetailProperties.BASE_FILE_COUNT_DEFAULT;
23+
import static org.apache.amoro.table.TablePartitionDetailProperties.EQ_DELETE_FILE_COUNT;
24+
import static org.apache.amoro.table.TablePartitionDetailProperties.EQ_DELETE_FILE_COUNT_DEFAULT;
25+
import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM;
26+
import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT;
27+
import static org.apache.amoro.table.TablePartitionDetailProperties.INSERT_FILE_COUNT;
28+
import static org.apache.amoro.table.TablePartitionDetailProperties.INSERT_FILE_COUNT_DEFAULT;
29+
import static org.apache.amoro.table.TablePartitionDetailProperties.POS_DELETE_FILE_COUNT;
30+
import static org.apache.amoro.table.TablePartitionDetailProperties.POS_DELETE_FILE_COUNT_DEFAULT;
31+
2132
import com.github.pagehelper.Page;
2233
import com.github.pagehelper.PageHelper;
2334
import com.github.pagehelper.PageInfo;
@@ -451,19 +462,132 @@ public List<PartitionBaseInfo> getTablePartitions(AmoroTable<?> amoroTable) {
451462
getTableFilesInternal(amoroTable, null, null);
452463
try {
453464
for (PartitionFileBaseInfo fileInfo : tableFiles) {
454-
if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) {
455-
PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
456-
partitionBaseInfo.setPartition(fileInfo.getPartition());
457-
partitionBaseInfo.setSpecId(fileInfo.getSpecId());
458-
partitionBaseInfoHashMap.put(fileInfo.getPartition(), partitionBaseInfo);
459-
}
460-
PartitionBaseInfo partitionInfo = partitionBaseInfoHashMap.get(fileInfo.getPartition());
461-
partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
462-
partitionInfo.setFileSize(partitionInfo.getFileSize() + fileInfo.getFileSize());
463-
partitionInfo.setLastCommitTime(
464-
partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
465-
? partitionInfo.getLastCommitTime()
466-
: fileInfo.getCommitTime());
465+
refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap);
466+
}
467+
} finally {
468+
try {
469+
tableFiles.close();
470+
} catch (IOException e) {
471+
LOG.warn("Failed to close the manifest reader.", e);
472+
}
473+
}
474+
return new ArrayList<>(partitionBaseInfoHashMap.values());
475+
}
476+
477+
/**
478+
* Create partition base information from a PartitionFileBaseInfo instance.
479+
*
480+
* @param fileInfo Partition file base information, used to obtain partition information
481+
* @return Returns the partition base information corresponding to the partition
482+
*/
483+
private PartitionBaseInfo createPartitionBaseInfoFromPartitionFile(
484+
PartitionFileBaseInfo fileInfo) {
485+
PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo();
486+
partitionBaseInfo.setPartition(fileInfo.getPartition());
487+
partitionBaseInfo.setSpecId(fileInfo.getSpecId());
488+
return partitionBaseInfo;
489+
}
490+
491+
/**
492+
* Refresh the basic information of a partition
493+
*
494+
* @param fileInfo Partition file base information
495+
* @param partitionBaseInfoHashMap A hashmap containing the base information of all partitions
496+
*/
497+
private void refreshPartitionBasicInfo(
498+
PartitionFileBaseInfo fileInfo, Map<String, PartitionBaseInfo> partitionBaseInfoHashMap) {
499+
// Get the partitionBaseInfo instance
500+
PartitionBaseInfo partitionInfo =
501+
partitionBaseInfoHashMap.computeIfAbsent(
502+
fileInfo.getPartition(), key -> createPartitionBaseInfoFromPartitionFile(fileInfo));
503+
// Update the number of files
504+
partitionInfo.setFileCount(partitionInfo.getFileCount() + 1);
505+
// Update the total file size
506+
partitionInfo.setFileSize(partitionInfo.getFileSize() + fileInfo.getFileSize());
507+
// Update the last commit time
508+
partitionInfo.setLastCommitTime(
509+
partitionInfo.getLastCommitTime() > fileInfo.getCommitTime()
510+
? partitionInfo.getLastCommitTime()
511+
: fileInfo.getCommitTime());
512+
}
513+
514+
/**
515+
* Refresh and update the detailed properties of a partition based on file information.
516+
*
517+
* <p>This method primarily updates statistical properties of the partition, such as the sum of
518+
* squared errors of file sizes, and the counts of base files, insert files, eq-delete files, and
519+
* pos-delete files.</>
520+
*
521+
* @param fileInfo Partition file base information
522+
* @param partitionBaseInfoHashMap A hashmap containing basic information about all partitions
523+
* @param minTargetSize The minimum target size used to limit the file size and calculate the sum
524+
* of squared errors.
525+
*/
526+
private void refreshPartitionDetailProperties(
527+
PartitionFileBaseInfo fileInfo,
528+
Map<String, PartitionBaseInfo> partitionBaseInfoHashMap,
529+
long minTargetSize) {
530+
PartitionBaseInfo partitionInfo =
531+
partitionBaseInfoHashMap.computeIfAbsent(
532+
fileInfo.getPartition(), key -> createPartitionBaseInfoFromPartitionFile(fileInfo));
533+
// Update the file-size-squared-error-sum
534+
long actualSize = Math.min(fileInfo.getFileSize(), minTargetSize);
535+
long diff = minTargetSize - actualSize;
536+
partitionInfo.setProperty(
537+
FILE_SIZE_SQUARED_ERROR_SUM,
538+
(double)
539+
partitionInfo.getPropertyOrDefault(
540+
FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT)
541+
+ diff * diff);
542+
543+
// Update the count of base files, insert files, equality delete files, and position delete
544+
// files
545+
switch (DataFileType.fromName(fileInfo.getFileType())) {
546+
case BASE_FILE:
547+
partitionInfo.setProperty(
548+
BASE_FILE_COUNT,
549+
(long) partitionInfo.getPropertyOrDefault(BASE_FILE_COUNT, BASE_FILE_COUNT_DEFAULT)
550+
+ 1);
551+
break;
552+
case INSERT_FILE:
553+
partitionInfo.setProperty(
554+
INSERT_FILE_COUNT,
555+
(long) partitionInfo.getPropertyOrDefault(INSERT_FILE_COUNT, INSERT_FILE_COUNT_DEFAULT)
556+
+ 1);
557+
break;
558+
case EQ_DELETE_FILE:
559+
partitionInfo.setProperty(
560+
EQ_DELETE_FILE_COUNT,
561+
(long)
562+
partitionInfo.getPropertyOrDefault(
563+
EQ_DELETE_FILE_COUNT, EQ_DELETE_FILE_COUNT_DEFAULT)
564+
+ 1);
565+
break;
566+
case POS_DELETE_FILE:
567+
partitionInfo.setProperty(
568+
POS_DELETE_FILE_COUNT,
569+
(long)
570+
partitionInfo.getPropertyOrDefault(
571+
POS_DELETE_FILE_COUNT, POS_DELETE_FILE_COUNT_DEFAULT)
572+
+ 1);
573+
break;
574+
}
575+
}
576+
577+
/**
578+
* Get a list of partition information for a specific table that contains the square error sum of
579+
* the partition file size. This method calculates statistics for each partition of a table,
580+
* including file count, total file size, and the sum of squared errors of file sizes, for use in
581+
* further operations such as table optimization.
582+
*/
583+
public List<PartitionBaseInfo> getTablePartitionsWithDetailProperties(
584+
MixedTable table, long minTargetSize) {
585+
Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>();
586+
CloseableIterable<PartitionFileBaseInfo> tableFiles = getTableFilesInternal(table, null, null);
587+
try {
588+
for (PartitionFileBaseInfo fileInfo : tableFiles) {
589+
refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap);
590+
refreshPartitionDetailProperties(fileInfo, partitionBaseInfoHashMap, minTargetSize);
467591
}
468592
} finally {
469593
try {
@@ -609,6 +733,11 @@ public List<OptimizingTaskInfo> getOptimizingTaskInfos(
609733
private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal(
610734
AmoroTable<?> amoroTable, String partition, Integer specId) {
611735
MixedTable mixedTable = getTable(amoroTable);
736+
return getTableFilesInternal(mixedTable, partition, specId);
737+
}
738+
739+
private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal(
740+
MixedTable mixedTable, String partition, Integer specId) {
612741
if (mixedTable.isKeyedTable()) {
613742
return CloseableIterable.concat(
614743
Arrays.asList(
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.refresh.event;
20+
21+
import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM;
22+
import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT;
23+
24+
import org.apache.amoro.TableFormat;
25+
import org.apache.amoro.config.OptimizingConfig;
26+
import org.apache.amoro.server.dashboard.MixedAndIcebergTableDescriptor;
27+
import org.apache.amoro.server.table.DefaultTableRuntime;
28+
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
29+
import org.apache.amoro.table.MixedTable;
30+
import org.apache.amoro.table.descriptor.PartitionBaseInfo;
31+
import org.apache.iceberg.util.ThreadPools;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.util.List;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.stream.Collectors;
38+
39+
public class MetricBasedRefreshEvent {
40+
private static final Logger logger = LoggerFactory.getLogger(MetricBasedRefreshEvent.class);
41+
42+
/**
43+
* Determines if evaluating pending input is necessary.
44+
*
45+
* @param tableRuntime The runtime information of the table.
46+
* @param table The table to be evaluated.
47+
* @return true if evaluation is necessary, otherwise false.
48+
*/
49+
public static boolean isEvaluatingPendingInputNecessary(
50+
DefaultTableRuntime tableRuntime, MixedTable table) {
51+
if (table.format() != TableFormat.ICEBERG) {
52+
logger.debug(
53+
"EventBasedRefreshEvent only support ICEBERG tables. Always return true for other table formats.");
54+
return true;
55+
}
56+
57+
// Perform periodic scheduling according to the minor/full interval to avoid false positives or
58+
// missed triggers based on metadata metric-driven evaluation
59+
OptimizingConfig config = tableRuntime.getOptimizingConfig();
60+
if (reachMinorInterval(
61+
tableRuntime.getLastMinorOptimizingTime(), config.getMinorLeastInterval())
62+
|| reachFullInterval(
63+
tableRuntime.getLastFullOptimizingTime(), config.getFullTriggerInterval())) {
64+
logger.info("Maximum interval for minor optimization/full optimization has reached.");
65+
return true;
66+
}
67+
68+
long minTargetSize = config.getTargetSize();
69+
int minPartitionCountToOptimized = 1;
70+
double mseTolerance = minTargetSize - config.getAverageFileSizeTolerance().getBytes();
71+
if (config.getAverageFileSizeTolerance().getBytes() == 0) {
72+
logger.warn(
73+
"The minimum tolerance value for the average partition file size is set to 0, which will prevent any partition from being pended.");
74+
return false;
75+
}
76+
77+
ExecutorService executorService = ThreadPools.getWorkerPool();
78+
MixedAndIcebergTableDescriptor formatTableDescriptor = new MixedAndIcebergTableDescriptor();
79+
formatTableDescriptor.withIoExecutor(executorService);
80+
// Calculate detail properties for each partition of a table including the sum of squared errors
81+
// of file sizes
82+
List<PartitionBaseInfo> partitionBaseInfos =
83+
formatTableDescriptor.getTablePartitionsWithDetailProperties(table, minTargetSize);
84+
85+
List<PartitionBaseInfo> filteredPartitionBaseInfos =
86+
filterOutPartitionToBeOptimized(table, partitionBaseInfos, mseTolerance);
87+
88+
long filteredPartitionCount = filteredPartitionBaseInfos.size();
89+
long filteredFileCount =
90+
filteredPartitionBaseInfos.stream().mapToLong(PartitionBaseInfo::getFileCount).sum();
91+
long totalPartitionCount = partitionBaseInfos.size();
92+
93+
MseStats stats =
94+
partitionBaseInfos.stream().collect(MseStats::new, MseStats::accept, MseStats::combine);
95+
96+
logger.info(
97+
"Filter out {} partitions ({} files in total) from {} ({} files in total) of table {} that have reached the tolerance value(T={}), MSE for total files: [{},{}].",
98+
filteredPartitionCount,
99+
filteredFileCount,
100+
totalPartitionCount,
101+
stats.totalFileCount,
102+
table.id(),
103+
config.getAverageFileSizeTolerance(),
104+
stats.mseTotalMin,
105+
stats.mseTotalMax);
106+
return filteredPartitionBaseInfos.size() >= minPartitionCountToOptimized;
107+
}
108+
109+
/** Determines if the minor optimization interval has been reached. */
110+
private static boolean reachMinorInterval(long lastMinorOptimizingTime, long minorLeastInterval) {
111+
return minorLeastInterval >= 0
112+
&& System.currentTimeMillis() - lastMinorOptimizingTime > minorLeastInterval;
113+
}
114+
115+
/** Determines if the full optimization interval has been reached. */
116+
private static boolean reachFullInterval(long lastFullOptimizingTime, long fullTriggerInterval) {
117+
return fullTriggerInterval >= 0
118+
&& System.currentTimeMillis() - lastFullOptimizingTime > fullTriggerInterval;
119+
}
120+
121+
/**
122+
* Filters out partitions that need to be optimized based on the MSE tolerance.
123+
*
124+
* @param table The table being evaluated.
125+
* @param partitionBaseInfos Base information of the partitions.
126+
* @param tolerance The MSE tolerance value.
127+
* @return A list of partitions that need optimization.
128+
*/
129+
@VisibleForTesting
130+
public static List<PartitionBaseInfo> filterOutPartitionToBeOptimized(
131+
MixedTable table, List<PartitionBaseInfo> partitionBaseInfos, double tolerance) {
132+
return partitionBaseInfos.stream()
133+
.filter(
134+
partition -> {
135+
double meanSquaredError =
136+
(double)
137+
partition.getPropertyOrDefault(
138+
FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT)
139+
/ partition.getFileCount();
140+
boolean isFiltered =
141+
partition.getFileCount() > 1 && meanSquaredError >= tolerance * tolerance;
142+
logger.debug(
143+
"Table: {}, Partition: {}, isFiltered: {}", table.id(), partition, isFiltered);
144+
return isFiltered;
145+
})
146+
.collect(Collectors.toList());
147+
}
148+
149+
static class MseStats {
150+
double mseTotalMin = Double.MAX_VALUE;
151+
double mseTotalMax = Double.MIN_VALUE;
152+
long totalFileCount = 0L;
153+
154+
void accept(PartitionBaseInfo p) {
155+
long fileCount = p.getFileCount();
156+
totalFileCount += fileCount;
157+
double mseTotal =
158+
((double)
159+
p.getPropertyOrDefault(
160+
FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT))
161+
/ fileCount;
162+
mseTotalMin = Math.min(mseTotalMin, mseTotal);
163+
mseTotalMax = Math.max(mseTotalMax, mseTotal);
164+
}
165+
166+
void combine(MseStats other) {
167+
mseTotalMin = Math.min(mseTotalMin, other.mseTotalMin);
168+
mseTotalMax = Math.max(mseTotalMax, other.mseTotalMax);
169+
totalFileCount += other.totalFileCount;
170+
}
171+
}
172+
}

amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.amoro.process.ProcessStatus;
2626
import org.apache.amoro.server.optimizing.OptimizingProcess;
2727
import org.apache.amoro.server.optimizing.OptimizingStatus;
28+
import org.apache.amoro.server.refresh.event.MetricBasedRefreshEvent;
2829
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
2930
import org.apache.amoro.server.table.DefaultTableRuntime;
3031
import org.apache.amoro.server.table.TableService;
@@ -115,7 +116,11 @@ public void execute(TableRuntime tableRuntime) {
115116
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
116117
|| (mixedTable.isUnkeyedTable()
117118
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) {
118-
tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
119+
if (!defaultTableRuntime.getOptimizingConfig().isEventBasedTriggerEnabled()
120+
|| MetricBasedRefreshEvent.isEvaluatingPendingInputNecessary(
121+
defaultTableRuntime, mixedTable)) {
122+
tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
123+
}
119124
}
120125
} catch (Throwable throwable) {
121126
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);

amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,12 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
323323
PropertyUtil.propertyAsLong(
324324
properties,
325325
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL,
326-
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT));
326+
TableProperties.SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT))
327+
.setAverageFileSizeTolerance(
328+
CompatiblePropertyUtil.propertyAsMemorySize(
329+
properties,
330+
TableProperties.SELF_OPTIMIZING_AVERAGE_FILE_SIZE_TOLERANCE,
331+
TableProperties.SELF_OPTIMIZING_AVERAGE_FILE_SIZE_TOLERANCE_DEFAULT));
327332
}
328333

329334
/**

0 commit comments

Comments
 (0)