Skip to content

Commit

Permalink
[HUDI-8210] Support writing table version 6 log format from 1.x (apac…
Browse files Browse the repository at this point in the history
…he#12206)


* Changes to FileSystemView to handle 0.x style log naming and tests
 - Simple adapters in AbstractFileSystemView to handle filtering of uncommitted logs/files in 0.x
 - Validations for disallowed writer configs around NBCC, log format for table version=6 writing from 1.x
 - TestHoodieTableFileSystemView tests have all been redone to handle both styles of log naming
 - New unit tests
* Fix the log reader to skip pending log files for legacy tables
* Adding tests for filtering pending log blocks
* renaming AbstractHoodieLogRecordReader to Scanner consistently
---------

Co-authored-by: Vinoth Chandar <[email protected]>
Co-authored-by: danny0405 <[email protected]>
  • Loading branch information
vinothchandar and danny0405 authored Nov 17, 2024
1 parent 4b2a633 commit b31c858
Show file tree
Hide file tree
Showing 44 changed files with 1,116 additions and 557 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new StoragePath(partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").withDeltaCommit("100").withStorage(storage)
.withFileId("test-log-fileid1").withInstantTime("100").withStorage(storage)
.withSizeThreshold(1).build()) {

// write data to file
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(new StoragePath(partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").withDeltaCommit(INSTANT_TIME).withStorage(
.withFileId("test-log-fileid1").withInstantTime(INSTANT_TIME).withStorage(
storage)
.withSizeThreshold(500).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public TimelineArchiverV1(HoodieWriteConfig config, HoodieTable<T, I, K, O> tabl
private Writer openWriter() {
try {
if (this.writer == null) {
return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent()).withDeltaCommit("")
return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent()).withInstantTime("")
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withStorage(metaClient.getStorage()).build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -154,46 +155,71 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
// record positions supported only from table version 8
&& config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
}

private void init(HoodieRecord record) {
if (!doInit) {
return;
}

String prevCommit = instantTime;
private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat deltaWriteStat) {
HoodieTableVersion tableVersion = hoodieTable.version();
String prevCommit;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.

if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
// table versions 8 and greater.
prevCommit = instantTime;
if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
}
} else {
// older table versions.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
// Set the base commit time as the current instantTime for new inserts into log files
// Handle log file only case. This is necessary for the concurrent clustering and writer case (e.g., consistent hashing bucket index).
// NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
prevCommit = getInstantTimeForLogFile(record);
// This means there is no base data file, start appending to a new log file
LOG.info("New file group from append handle for partition {}", partitionPath);
}
}

deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);
}

private void init(HoodieRecord record) {
if (!doInit) {
return;
}
// Prepare the first write status
HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
writeStatus.setStat(deltaWriteStat);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
averageRecordSize = sizeEstimator.sizeEstimate(record);

deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);

populateWriteStat(record, deltaWriteStat);
averageRecordSize = sizeEstimator.sizeEstimate(record);
try {
// Save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
Expand All @@ -202,20 +228,22 @@ private void init(HoodieRecord record) {
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave();

this.writer = createLogWriter(getFileInstant(record));
String instantTime = config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit();
this.writer = createLogWriter(instantTime);
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
+ instantTime + " on storage path " + hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
}
doInit = false;
}

/**
* Returns the instant time to use in the log file name.
*/
private String getFileInstant(HoodieRecord<?> record) {
private String getInstantTimeForLogFile(HoodieRecord<?> record) {
if (config.isConsistentHashingEnabled()) {
// Handle log file only case. This is necessary for the concurrent clustering and writer case (e.g., consistent hashing bucket index).
// NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
Expand Down Expand Up @@ -454,7 +482,7 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
if (!recordList.isEmpty()) {
String keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Expand All @@ -463,12 +491,12 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
getUpdatedHeader(header, config), keyField));
}

if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
if (appendDeleteBlocks && !recordsToDeleteWithPositions.isEmpty()) {
blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, shouldWriteRecordPositions,
getUpdatedHeader(header, config)));
}

if (blocks.size() > 0) {
if (!blocks.isEmpty()) {
AppendResult appendResult = writer.appendBlocks(blocks);
processAppendResult(appendResult, recordList);
recordList.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,28 @@ private static Schema getWriteSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getWriteSchema());
}

protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime) {
return createLogWriter(deltaCommitTime, null);
protected HoodieLogFormat.Writer createLogWriter(String instantTime) {
return createLogWriter(instantTime, null);
}

protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime, String fileSuffix) {
protected HoodieLogFormat.Writer createLogWriter(String instantTime, String fileSuffix) {
try {
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.withDeltaCommit(deltaCommitTime)
.withInstantTime(instantTime)
.withFileSize(0L)
.withSizeThreshold(config.getLogFileMaxSize())
.withStorage(storage)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(writeToken)
.withFileCreationCallback(getLogCreationCallback())
.withTableVersion(config.getWriteVersion())
.withSuffix(fileSuffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.build();
} catch (IOException e) {
throw new HoodieException("Creating logger writer with fileId: " + fileId + ", "
+ "delta commit time: " + deltaCommitTime + ", "
+ "delta commit time: " + instantTime + ", "
+ "file suffix: " + fileSuffix + " error");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,13 +921,13 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), partitionName))
.withFileId(fileGroupFileId)
.withDeltaCommit(instantTime)
.withInstantTime(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
.withFileSize(0L)
.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
.withStorage(dataMetaClient.getStorage())
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
.withTableVersion(metadataWriteConfig.getWriteVersion())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build()) {
writer.appendBlock(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -158,6 +159,10 @@ public boolean isMetadataTable() {
return isMetadataTable;
}

public HoodieTableVersion version() {
return metaClient.getTableConfig().getTableVersion();
}

protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);

private synchronized FileSystemViewManager getViewManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
Expand All @@ -35,6 +37,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.CommonClientUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +116,7 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
HoodieInstant instantToRollback,
List<SerializableHoodieRollbackRequest> rollbackRequests,
boolean doDelete, int numPartitions) {
final TaskContextSupplier taskContextSupplier = context.getTaskContextSupplier();
return context.flatMap(rollbackRequests, (SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String, HoodieRollbackStat>>>) rollbackRequest -> {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
Expand All @@ -125,12 +129,17 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
final StoragePath filePath;
try {
String fileId = rollbackRequest.getFileId();
HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();

writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.withDeltaCommit(instantToRollback.requestedTime())
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
? instantToRollback.requestedTime() : rollbackRequest.getLatestBaseInstant()
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

// generate metadata
Expand Down Expand Up @@ -162,20 +171,19 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
1L
);

return Collections.singletonList(
return Stream.of(
Pair.of(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder()
.withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.build()))
.stream();
.build()));
} else {
return Collections.singletonList(
// no action needed.
return Stream.of(
Pair.of(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder()
.withPartitionPath(rollbackRequest.getPartitionPath())
.build()))
.stream();
.build()));
}
}, numPartitions);
}
Expand Down
Loading

0 comments on commit b31c858

Please sign in to comment.