Skip to content

Commit

Permalink
[MINOR] Refactor the init method of HoodieAppendHandle (apache#10693)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuyaogai authored Feb 20, 2024
1 parent a3a0103 commit 2c0fc56
Showing 1 changed file with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,50 +173,52 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
}

private void init(HoodieRecord record) {
if (doInit) {
String prevCommit = instantTime;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
if (config.isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
if (!doInit) {
return;
}

String prevCommit = instantTime;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
if (config.isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
}

// Prepare the first write status
HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
writeStatus.setStat(deltaWriteStat);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
averageRecordSize = sizeEstimator.sizeEstimate(record);
// Prepare the first write status
HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
writeStatus.setStat(deltaWriteStat);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
averageRecordSize = sizeEstimator.sizeEstimate(record);

deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);
deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);

try {
// Save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());

this.writer = createLogWriter(getFileInstant(record));
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
}
doInit = false;
try {
// Save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());

this.writer = createLogWriter(getFileInstant(record));
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
}
doInit = false;
}

/**
Expand Down

0 comments on commit 2c0fc56

Please sign in to comment.