- 
                Notifications
    You must be signed in to change notification settings 
- Fork 358
[AMORO-3775] Add support for metric-based refresh event trigger in TableRuntimeRefreshExecutor #3776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[AMORO-3775] Add support for metric-based refresh event trigger in TableRuntimeRefreshExecutor #3776
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -18,6 +18,17 @@ | |
|  | ||
| package org.apache.amoro.server.dashboard; | ||
|  | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.BASE_FILE_COUNT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.BASE_FILE_COUNT_DEFAULT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.EQ_DELETE_FILE_COUNT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.EQ_DELETE_FILE_COUNT_DEFAULT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.INSERT_FILE_COUNT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.INSERT_FILE_COUNT_DEFAULT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.POS_DELETE_FILE_COUNT; | ||
| import static org.apache.amoro.table.TablePartitionDetailProperties.POS_DELETE_FILE_COUNT_DEFAULT; | ||
|  | ||
| import com.github.pagehelper.Page; | ||
| import com.github.pagehelper.PageHelper; | ||
| import com.github.pagehelper.PageInfo; | ||
|  | @@ -451,19 +462,132 @@ public List<PartitionBaseInfo> getTablePartitions(AmoroTable<?> amoroTable) { | |
| getTableFilesInternal(amoroTable, null, null); | ||
| try { | ||
| for (PartitionFileBaseInfo fileInfo : tableFiles) { | ||
| if (!partitionBaseInfoHashMap.containsKey(fileInfo.getPartition())) { | ||
| PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo(); | ||
| partitionBaseInfo.setPartition(fileInfo.getPartition()); | ||
| partitionBaseInfo.setSpecId(fileInfo.getSpecId()); | ||
| partitionBaseInfoHashMap.put(fileInfo.getPartition(), partitionBaseInfo); | ||
| } | ||
| PartitionBaseInfo partitionInfo = partitionBaseInfoHashMap.get(fileInfo.getPartition()); | ||
| partitionInfo.setFileCount(partitionInfo.getFileCount() + 1); | ||
| partitionInfo.setFileSize(partitionInfo.getFileSize() + fileInfo.getFileSize()); | ||
| partitionInfo.setLastCommitTime( | ||
| partitionInfo.getLastCommitTime() > fileInfo.getCommitTime() | ||
| ? partitionInfo.getLastCommitTime() | ||
| : fileInfo.getCommitTime()); | ||
| refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap); | ||
| } | ||
| } finally { | ||
| try { | ||
| tableFiles.close(); | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to close the manifest reader.", e); | ||
| } | ||
| } | ||
| return new ArrayList<>(partitionBaseInfoHashMap.values()); | ||
| } | ||
|  | ||
| /** | ||
| * Create partition base information from a PartitionFileBaseInfo instance. | ||
| * | ||
| * @param fileInfo Partition file base information, used to obtain partition information | ||
| * @return Returns the partition base information corresponding to the partition | ||
| */ | ||
| private PartitionBaseInfo createPartitionBaseInfoFromPartitionFile( | ||
| PartitionFileBaseInfo fileInfo) { | ||
| PartitionBaseInfo partitionBaseInfo = new PartitionBaseInfo(); | ||
| partitionBaseInfo.setPartition(fileInfo.getPartition()); | ||
| partitionBaseInfo.setSpecId(fileInfo.getSpecId()); | ||
| return partitionBaseInfo; | ||
| } | ||
|  | ||
| /** | ||
| * Refresh the basic information of a partition | ||
| * | ||
| * @param fileInfo Partition file base information | ||
| * @param partitionBaseInfoHashMap A hashmap containing the base information of all partitions | ||
| */ | ||
| private void refreshPartitionBasicInfo( | ||
| PartitionFileBaseInfo fileInfo, Map<String, PartitionBaseInfo> partitionBaseInfoHashMap) { | ||
| // Get the partitionBaseInfo instance | ||
| PartitionBaseInfo partitionInfo = | ||
| partitionBaseInfoHashMap.computeIfAbsent( | ||
| fileInfo.getPartition(), key -> createPartitionBaseInfoFromPartitionFile(fileInfo)); | ||
| // Update the number of files | ||
| partitionInfo.setFileCount(partitionInfo.getFileCount() + 1); | ||
| // Update the total file size | ||
| partitionInfo.setFileSize(partitionInfo.getFileSize() + fileInfo.getFileSize()); | ||
| // Update the last commit time | ||
| partitionInfo.setLastCommitTime( | ||
| partitionInfo.getLastCommitTime() > fileInfo.getCommitTime() | ||
| ? partitionInfo.getLastCommitTime() | ||
| : fileInfo.getCommitTime()); | ||
| } | ||
|  | ||
| /** | ||
| * Refresh and update the detailed properties of a partition based on file information. | ||
| * | ||
| * <p>This method primarily updates statistical properties of the partition, such as the sum of | ||
| * squared errors of file sizes, and the counts of base files, insert files, eq-delete files, and | ||
| * pos-delete files.</> | ||
| * | ||
| * @param fileInfo Partition file base information | ||
| * @param partitionBaseInfoHashMap A hashmap containing basic information about all partitions | ||
| * @param minTargetSize The minimum target size used to limit the file size and calculate the sum | ||
| * of squared errors. | ||
| */ | ||
| private void refreshPartitionDetailProperties( | ||
| PartitionFileBaseInfo fileInfo, | ||
| Map<String, PartitionBaseInfo> partitionBaseInfoHashMap, | ||
| long minTargetSize) { | ||
| PartitionBaseInfo partitionInfo = | ||
| partitionBaseInfoHashMap.computeIfAbsent( | ||
| fileInfo.getPartition(), key -> createPartitionBaseInfoFromPartitionFile(fileInfo)); | ||
| // Update the file-size-squared-error-sum | ||
| long actualSize = Math.min(fileInfo.getFileSize(), minTargetSize); | ||
| long diff = minTargetSize - actualSize; | ||
| partitionInfo.setProperty( | ||
| FILE_SIZE_SQUARED_ERROR_SUM, | ||
| (double) | ||
| partitionInfo.getPropertyOrDefault( | ||
| FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT) | ||
| + diff * diff); | ||
|  | ||
| // Update the count of base files, insert files, equality delete files, and position delete | ||
| // files | ||
| switch (DataFileType.fromName(fileInfo.getFileType())) { | ||
| case BASE_FILE: | ||
| partitionInfo.setProperty( | ||
| BASE_FILE_COUNT, | ||
| (long) partitionInfo.getPropertyOrDefault(BASE_FILE_COUNT, BASE_FILE_COUNT_DEFAULT) | ||
| + 1); | ||
| break; | ||
| case INSERT_FILE: | ||
| partitionInfo.setProperty( | ||
| INSERT_FILE_COUNT, | ||
| (long) partitionInfo.getPropertyOrDefault(INSERT_FILE_COUNT, INSERT_FILE_COUNT_DEFAULT) | ||
| + 1); | ||
| break; | ||
| case EQ_DELETE_FILE: | ||
| partitionInfo.setProperty( | ||
| EQ_DELETE_FILE_COUNT, | ||
| (long) | ||
| partitionInfo.getPropertyOrDefault( | ||
| EQ_DELETE_FILE_COUNT, EQ_DELETE_FILE_COUNT_DEFAULT) | ||
| + 1); | ||
| break; | ||
| case POS_DELETE_FILE: | ||
| partitionInfo.setProperty( | ||
| POS_DELETE_FILE_COUNT, | ||
| (long) | ||
| partitionInfo.getPropertyOrDefault( | ||
| POS_DELETE_FILE_COUNT, POS_DELETE_FILE_COUNT_DEFAULT) | ||
| + 1); | ||
| break; | ||
| } | ||
| } | ||
|  | ||
| /** | ||
| * Get a list of partition information for a specific table that contains the square error sum of | ||
| * the partition file size. This method calculates statistics for each partition of a table, | ||
| * including file count, total file size, and the sum of squared errors of file sizes, for use in | ||
| * further operations such as table optimization. | ||
| */ | ||
| public List<PartitionBaseInfo> getTablePartitionsWithDetailProperties( | ||
| MixedTable table, long minTargetSize) { | ||
| Map<String, PartitionBaseInfo> partitionBaseInfoHashMap = new HashMap<>(); | ||
| CloseableIterable<PartitionFileBaseInfo> tableFiles = getTableFilesInternal(table, null, null); | ||
| try { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can simply use a try-with-resources statement. | ||
| for (PartitionFileBaseInfo fileInfo : tableFiles) { | ||
| refreshPartitionBasicInfo(fileInfo, partitionBaseInfoHashMap); | ||
| refreshPartitionDetailProperties(fileInfo, partitionBaseInfoHashMap, minTargetSize); | ||
| } | ||
| } finally { | ||
| try { | ||
|  | @@ -609,6 +733,11 @@ public List<OptimizingTaskInfo> getOptimizingTaskInfos( | |
| private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal( | ||
| AmoroTable<?> amoroTable, String partition, Integer specId) { | ||
| MixedTable mixedTable = getTable(amoroTable); | ||
| return getTableFilesInternal(mixedTable, partition, specId); | ||
| } | ||
|  | ||
| private CloseableIterable<PartitionFileBaseInfo> getTableFilesInternal( | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, after the event-triggered evaluation, a full table scan is performed to collect partition information, which can be very expensive (especially for large tables with hundreds of thousands of files). Perhaps we can optimize this part when upgrading the Iceberg version and introducing PartitionStatistics. | ||
| MixedTable mixedTable, String partition, Integer specId) { | ||
| if (mixedTable.isKeyedTable()) { | ||
| return CloseableIterable.concat( | ||
| Arrays.asList( | ||
|  | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,206 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||
| * or more contributor license agreements. See the NOTICE file | ||||||
| * distributed with this work for additional information | ||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||
| * to you under the Apache License, Version 2.0 (the | ||||||
| * "License"); you may not use this file except in compliance | ||||||
| * with the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| * See the License for the specific language governing permissions and | ||||||
| * limitations under the License. | ||||||
| */ | ||||||
|  | ||||||
| package org.apache.amoro.server.refresh.event; | ||||||
|  | ||||||
| import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM; | ||||||
| import static org.apache.amoro.table.TablePartitionDetailProperties.FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT; | ||||||
|  | ||||||
| import org.apache.amoro.TableFormat; | ||||||
| import org.apache.amoro.config.OptimizingConfig; | ||||||
| import org.apache.amoro.server.dashboard.MixedAndIcebergTableDescriptor; | ||||||
| import org.apache.amoro.server.table.DefaultTableRuntime; | ||||||
| import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; | ||||||
| import org.apache.amoro.table.MixedTable; | ||||||
| import org.apache.amoro.table.descriptor.PartitionBaseInfo; | ||||||
| import org.apache.amoro.utils.MemorySize; | ||||||
| import org.apache.iceberg.SnapshotSummary; | ||||||
| import org.apache.iceberg.util.PropertyUtil; | ||||||
| import org.apache.iceberg.util.ThreadPools; | ||||||
| import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | ||||||
|  | ||||||
| import java.util.List; | ||||||
| import java.util.Map; | ||||||
| import java.util.concurrent.ExecutorService; | ||||||
| import java.util.stream.Collectors; | ||||||
|  | ||||||
| public class MetricBasedRefreshEvent { | ||||||
| private static final Logger logger = LoggerFactory.getLogger(MetricBasedRefreshEvent.class); | ||||||
|  | ||||||
| /** | ||||||
| * Determines if evaluating pending input is necessary. | ||||||
| * | ||||||
| * @param tableRuntime The runtime information of the table. | ||||||
| * @param table The table to be evaluated. | ||||||
| * @return true if evaluation is necessary, otherwise false. | ||||||
| */ | ||||||
| public static boolean isEvaluatingPendingInputNecessary( | ||||||
| DefaultTableRuntime tableRuntime, MixedTable table) { | ||||||
| if (table.format() != TableFormat.ICEBERG && table.format() != TableFormat.MIXED_ICEBERG) { | ||||||
| logger.debug( | ||||||
| "MetricBasedRefreshEvent only support ICEBERG/MIXED_ICEBERG tables. Always return true for other table formats."); | ||||||
| return true; | ||||||
| } | ||||||
|  | ||||||
| // Perform periodic scheduling according to the fallback interval to avoid false positives or | ||||||
| // missed triggers based on metadata metric-driven evaluation | ||||||
| OptimizingConfig config = tableRuntime.getOptimizingConfig(); | ||||||
| if (reachFallbackInterval( | ||||||
| tableRuntime.getLastPlanTime(), config.getEvaluationFallbackInterval())) { | ||||||
| logger.info("Maximum interval for evaluating table {} has reached.", table.id()); | ||||||
| return true; | ||||||
| } | ||||||
|  | ||||||
| ExecutorService executorService = ThreadPools.getWorkerPool(); | ||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a dedicated thread pool to avoid thread congestion. 
        Suggested change
       
 | ||||||
| MixedAndIcebergTableDescriptor formatTableDescriptor = new MixedAndIcebergTableDescriptor(); | ||||||
| formatTableDescriptor.withIoExecutor(executorService); | ||||||
|  | ||||||
| // Step 1: If the condition `delete file=0 && avg file size > target size * ratio` is satisfied, | ||||||
| // then evaluating the pending input is considered unnecessary and will be skipped. | ||||||
| BasicStats basicStats = new BasicStats(); | ||||||
| if (table.isUnkeyedTable()) { | ||||||
| basicStats.accept(table.asUnkeyedTable().currentSnapshot().summary()); | ||||||
| } else { | ||||||
| basicStats.accept(table.asKeyedTable().baseTable().currentSnapshot().summary()); | ||||||
| basicStats.accept(table.asKeyedTable().changeTable().currentSnapshot().summary()); | ||||||
| } | ||||||
|  | ||||||
| double avgFileSize = | ||||||
| basicStats.deleteFileCnt + basicStats.dataFileCnt > 0 | ||||||
| ? (double) basicStats.fileSize / (basicStats.deleteFileCnt + basicStats.dataFileCnt) | ||||||
| : 0; | ||||||
| long minTargetSize = config.getTargetSize(); | ||||||
|  | ||||||
| if (basicStats.deleteFileCnt == 0 | ||||||
| && avgFileSize > config.getTargetSize() * config.getMinTargetSizeRatio()) { | ||||||
| logger.info( | ||||||
| "Table {} contains only appended data and no deleted files (average file size: {}), skip evaluating pending input.", | ||||||
| table.id(), | ||||||
| avgFileSize); | ||||||
| return false; | ||||||
| } | ||||||
|  | ||||||
| double mseTolerance = minTargetSize - config.getAverageFileSizeTolerance().getBytes(); | ||||||
| if (config.getAverageFileSizeTolerance().getBytes() == 0) { | ||||||
| logger.warn( | ||||||
| "The minimum tolerance value for the average partition file size is set to 0, which will prevent any partition from being pended."); | ||||||
| return false; | ||||||
| } | ||||||
|  | ||||||
| // Step 2: Calculate detail properties for each partition of a table including the sum of | ||||||
| // squared errors of file sizes | ||||||
| List<PartitionBaseInfo> partitionBaseInfos = | ||||||
| formatTableDescriptor.getTablePartitionsWithDetailProperties(table, minTargetSize); | ||||||
|  | ||||||
| List<PartitionBaseInfo> filteredPartitionBaseInfos = | ||||||
| filterOutPartitionToBeOptimized(table, partitionBaseInfos, mseTolerance); | ||||||
|  | ||||||
| long filteredPartitionCount = filteredPartitionBaseInfos.size(); | ||||||
| long filteredFileCount = | ||||||
| filteredPartitionBaseInfos.stream().mapToLong(PartitionBaseInfo::getFileCount).sum(); | ||||||
| long totalPartitionCount = partitionBaseInfos.size(); | ||||||
| MseStats stats = | ||||||
| partitionBaseInfos.stream().collect(MseStats::new, MseStats::accept, MseStats::combine); | ||||||
|  | ||||||
| logger.info( | ||||||
| "Filter out {} partitions ({} files in total) from {} ({} files in total) of table {} that have reached the tolerance value(T={}), MSE for total files: [{},{}].", | ||||||
| filteredPartitionCount, | ||||||
| filteredFileCount, | ||||||
| totalPartitionCount, | ||||||
| stats.totalFileCount, | ||||||
| table.id(), | ||||||
| mseTolerance, | ||||||
| stats.mseTotalMin, | ||||||
| stats.mseTotalMax); | ||||||
| return !filteredPartitionBaseInfos.isEmpty(); | ||||||
| } | ||||||
|  | ||||||
| /** Determines if the metric-based evalutation fallback interval has been reached. */ | ||||||
| private static boolean reachFallbackInterval(long lastPlanTime, long fallbackInterval) { | ||||||
| return fallbackInterval >= 0 && System.currentTimeMillis() - lastPlanTime > fallbackInterval; | ||||||
| } | ||||||
|  | ||||||
| /** | ||||||
| * Filters out partitions that need to be optimized based on the MSE tolerance. | ||||||
| * | ||||||
| * @param table The table being evaluated. | ||||||
| * @param partitionBaseInfos Base information of the partitions. | ||||||
| * @param tolerance The MSE tolerance value. | ||||||
| * @return A list of partitions that need optimization. | ||||||
| */ | ||||||
| @VisibleForTesting | ||||||
| public static List<PartitionBaseInfo> filterOutPartitionToBeOptimized( | ||||||
| MixedTable table, List<PartitionBaseInfo> partitionBaseInfos, double tolerance) { | ||||||
| return partitionBaseInfos.stream() | ||||||
| .filter( | ||||||
| partition -> { | ||||||
| double meanSquaredError = | ||||||
| (double) | ||||||
| partition.getPropertyOrDefault( | ||||||
| FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT) | ||||||
| / partition.getFileCount(); | ||||||
| boolean isFiltered = | ||||||
| partition.getFileCount() > 1 && meanSquaredError >= tolerance * tolerance; | ||||||
| logger.debug( | ||||||
| "Table: {}, Partition: {}, isFiltered: {}", table.id(), partition, isFiltered); | ||||||
| return isFiltered; | ||||||
| }) | ||||||
| .collect(Collectors.toList()); | ||||||
| } | ||||||
|  | ||||||
| static class BasicStats { | ||||||
| int deleteFileCnt = 0; | ||||||
| int dataFileCnt = 0; | ||||||
| long fileSize = 0; | ||||||
|  | ||||||
| void accept(Map<String, String> summary) { | ||||||
| deleteFileCnt += | ||||||
| PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DELETE_FILES_PROP, 0); | ||||||
| dataFileCnt += PropertyUtil.propertyAsInt(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0); | ||||||
| fileSize += | ||||||
| MemorySize.parse( | ||||||
| PropertyUtil.propertyAsString(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, "0")) | ||||||
| .getBytes(); | ||||||
| } | ||||||
| } | ||||||
|  | ||||||
| static class MseStats { | ||||||
| double mseTotalMin = Double.MAX_VALUE; | ||||||
| double mseTotalMax = Double.MIN_VALUE; | ||||||
| long totalFileCount = 0L; | ||||||
|  | ||||||
| void accept(PartitionBaseInfo p) { | ||||||
| long fileCount = p.getFileCount(); | ||||||
| totalFileCount += fileCount; | ||||||
| double mseTotal = | ||||||
| ((double) | ||||||
| p.getPropertyOrDefault( | ||||||
| FILE_SIZE_SQUARED_ERROR_SUM, FILE_SIZE_SQUARED_ERROR_SUM_DEFAULT)) | ||||||
| / fileCount; | ||||||
| mseTotalMin = Math.min(mseTotalMin, mseTotal); | ||||||
| mseTotalMax = Math.max(mseTotalMax, mseTotal); | ||||||
| } | ||||||
|  | ||||||
| void combine(MseStats other) { | ||||||
| mseTotalMin = Math.min(mseTotalMin, other.mseTotalMin); | ||||||
| mseTotalMax = Math.max(mseTotalMax, other.mseTotalMax); | ||||||
| totalFileCount += other.totalFileCount; | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use try catch with resource to close the io automaticly