Skip to content

Commit

Permalink
[HUDI-7441] Move getWritePartitionPaths method to common module to …
Browse files Browse the repository at this point in the history
…decouple hive dependency (#10744)

Co-authored-by: wuzhiping <[email protected]>
  • Loading branch information
stayrascal and wuzhiping authored Feb 27, 2024
1 parent 20e2348 commit 20db8b7
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetada
.collect(Collectors.toList());
}

/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

/**
* Convert commit action metadata to bloom filter records.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
Expand Down Expand Up @@ -412,7 +412,7 @@ private FileIndex getFileIndex() {
* @return the set of read partitions
*/
private Set<String> getReadPartitions(List<HoodieCommitMetadata> metadataList) {
Set<String> partitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
Set<String> partitions = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.partitionPruner != null) {
Set<String> selectedPartitions = this.partitionPruner.filter(partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -192,7 +193,7 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
// build fileGroup from fsView
Path basePath = new Path(tableMetaClient.getBasePath());
// filter affectedPartition by inputPaths
List<String> affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
List<String> affectedPartition = HoodieTableMetadataUtil.getWritePartitionPaths(metadataList).stream()
.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
if (affectedPartition.isEmpty()) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,19 +514,6 @@ public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf,
return fullPathToFileStatus.values().toArray(new FileStatus[0]);
}

/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
public static Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

public static HoodieRealtimeFileSplit createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, String[] hosts) {
try {
return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, hosts), path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.
import org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, concatTimeline, getCommitMetadata, handleHollowCommitIfNeeded}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getWritePartitionPaths, listAffectedFilesForCommits}
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down

0 comments on commit 20db8b7

Please sign in to comment.