Skip to content

Commit

Permalink
[HUDI-8448] Fix log block footer incorrectly using HeaderMetadataType (
Browse files Browse the repository at this point in the history
  • Loading branch information
usberkeley authored Oct 29, 2024
1 parent 2b64650 commit 899b1f6
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.FooterMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.FileIOUtils;
Expand Down Expand Up @@ -91,7 +92,7 @@ public String showLogFileCommits(
storage, new StoragePath(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>,
Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
Map<FooterMetadataType, String>>, Integer>>> commitCountAndMetadata =
new HashMap<>();
int numCorruptBlocks = 0;
int dummyInstantTimeCount = 0;
Expand Down Expand Up @@ -145,7 +146,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
} else {
List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>,
Map<HeaderMetadataType, String>>, Integer>> list =
Map<FooterMetadataType, String>>, Integer>> list =
new ArrayList<>();
list.add(
new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
Expand All @@ -158,11 +159,11 @@ storage, new StoragePath(logFilePathPattern)).stream()
List<Comparable[]> rows = new ArrayList<>();
ObjectMapper objectMapper = new ObjectMapper();
for (Map.Entry<String, List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>,
Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
Map<FooterMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
.entrySet()) {
String instantTime = entry.getKey();
for (Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>,
Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
Map<FooterMetadataType, String>>, Integer> tuple3 : entry
.getValue()) {
Comparable[] output = new Comparable[6];
output[0] = tuple3._1()._1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.FooterMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
Expand Down Expand Up @@ -154,7 +155,7 @@ private HoodieLogBlock readBlock() throws IOException {
// 4. Read the header for a log block, if present

Map<HeaderMetadataType, String> header =
nextBlockVersion.hasHeader() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
nextBlockVersion.hasHeader() ? HoodieLogBlock.getHeaderMetadata(inputStream) : null;

// 5. Read the content length for the content
// Fallback to full-block size if no content-length
Expand All @@ -168,8 +169,8 @@ private HoodieLogBlock readBlock() throws IOException {
Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily);

// 7. Read footer if any
Map<HeaderMetadataType, String> footer =
nextBlockVersion.hasFooter() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
Map<FooterMetadataType, String> footer =
nextBlockVersion.hasFooter() ? HoodieLogBlock.getFooterMetadata(inputStream) : null;

// 8. Read log block length, if present. This acts as a reverse pointer when traversing a
// log file in reverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public HoodieAvroDataBlock(Supplier<SeekableDataInputStream> inputStreamSupplier
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
Map<FooterMetadataType, String> footer,
String keyField) {
super(content, inputStreamSupplier, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public HoodieCommandBlock(Map<HeaderMetadataType, String> header) {

public HoodieCommandBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
Map<FooterMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStreamSupplier, readBlockLazily);
this.type =
HoodieCommandBlockTypeEnum.values()[Integer.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class HoodieCorruptBlock extends HoodieLogBlock {

public HoodieCorruptBlock(Option<byte[]> corruptedBytes, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
Map<FooterMetadataType, String> footer) {
super(header, footer, blockContentLocation, corruptedBytes, inputStreamSupplier, readBlockLazily);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
public HoodieDataBlock(List<HoodieRecord> records,
boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
Map<FooterMetadataType, String> footer,
String keyFieldName) {
super(header, footer, Option.empty(), Option.empty(), null, false);
if (shouldWriteRecordPositions) {
Expand Down Expand Up @@ -116,7 +116,7 @@ protected HoodieDataBlock(Option<byte[]> content,
Option<HoodieLogBlockContentLocation> blockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> headers,
Map<HeaderMetadataType, String> footer,
Map<FooterMetadataType, String> footer,
String keyFieldName,
boolean enablePointLookups) {
super(headers, footer, blockContentLocation, content, inputStreamSupplier, readBlockLazily);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,

public HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
Map<FooterMetadataType, String> footer) {
// Setting `shouldWriteRecordPositions` to false as this constructor is only used by the reader
this(content, inputStreamSupplier, readBlockLazily, blockContentLocation, header, footer, false);
}

HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, boolean shouldWriteRecordPositions) {
Map<FooterMetadataType, String> footer, boolean shouldWriteRecordPositions) {
super(header, footer, blockContentLocation, content, inputStreamSupplier, readBlockLazily);
this.shouldWriteRecordPositions = shouldWriteRecordPositions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public HoodieHFileDataBlock(Supplier<SeekableDataInputStream> inputStreamSupplie
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
Map<FooterMetadataType, String> footer,
boolean enablePointLookups,
StoragePath pathForReader,
boolean useNativeHFileReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
Expand All @@ -63,7 +64,7 @@ public abstract class HoodieLogBlock {
// Header for each log block
private final Map<HeaderMetadataType, String> logBlockHeader;
// Footer for each log block
private final Map<HeaderMetadataType, String> logBlockFooter;
private final Map<FooterMetadataType, String> logBlockFooter;
// Location of a log block on disk
private final Option<HoodieLogBlockContentLocation> blockContentLocation;
// data for a specific block
Expand All @@ -74,7 +75,7 @@ public abstract class HoodieLogBlock {

public HoodieLogBlock(
@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Map<FooterMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
@Nullable Supplier<SeekableDataInputStream> inputStreamSupplier,
Expand Down Expand Up @@ -114,7 +115,7 @@ public Map<HeaderMetadataType, String> getLogBlockHeader() {
return logBlockHeader;
}

public Map<HeaderMetadataType, String> getLogBlockFooter() {
public Map<FooterMetadataType, String> getLogBlockFooter() {
return logBlockFooter;
}

Expand Down Expand Up @@ -255,42 +256,31 @@ public long getBlockEndPos() {
}

/**
* Convert log metadata to bytes 1. Write size of metadata 2. Write enum ordinal 3. Write actual bytes
* Convert header metadata to bytes 1. Write size of metadata 2. Write enum ordinal 3. Write actual bytes
*/
public static byte[] getLogMetadataBytes(Map<HeaderMetadataType, String> metadata) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
output.writeInt(metadata.size());
for (Map.Entry<HeaderMetadataType, String> entry : metadata.entrySet()) {
output.writeInt(entry.getKey().ordinal());
byte[] bytes = getUTF8Bytes(entry.getValue());
output.writeInt(bytes.length);
output.write(bytes);
}
return baos.toByteArray();
public static byte[] getHeaderMetadataBytes(Map<HeaderMetadataType, String> metadata) throws IOException {
return getLogMetadataBytes(metadata);
}

/**
* Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes}.
* Convert bytes to Header Metadata, follow the same order as {@link HoodieLogBlock#getHeaderMetadataBytes}.
*/
public static Map<HeaderMetadataType, String> getLogMetadata(SeekableDataInputStream dis) throws IOException {
public static Map<HeaderMetadataType, String> getHeaderMetadata(SeekableDataInputStream dis) throws IOException {
return getLogMetadata(dis, index -> HeaderMetadataType.values()[index]);
}

Map<HeaderMetadataType, String> metadata = new HashMap<>();
// 1. Read the metadata written out
int metadataCount = dis.readInt();
try {
while (metadataCount > 0) {
int metadataEntryIndex = dis.readInt();
int metadataEntrySize = dis.readInt();
byte[] metadataEntry = new byte[metadataEntrySize];
dis.readFully(metadataEntry, 0, metadataEntrySize);
metadata.put(HeaderMetadataType.values()[metadataEntryIndex], new String(metadataEntry));
metadataCount--;
}
return metadata;
} catch (EOFException eof) {
throw new IOException("Could not read metadata fields ", eof);
}
/**
* Convert footer metadata to bytes 1. Write size of metadata 2. Write enum ordinal 3. Write actual bytes
*/
public static byte[] getFooterMetadataBytes(Map<FooterMetadataType, String> metadata) throws IOException {
return getLogMetadataBytes(metadata);
}

/**
* Convert bytes to Footer Metadata, follow the same order as {@link HoodieLogBlock#getFooterMetadataBytes}.
*/
public static Map<FooterMetadataType, String> getFooterMetadata(SeekableDataInputStream dis) throws IOException {
return getLogMetadata(dis, index -> FooterMetadataType.values()[index]);
}

/**
Expand Down Expand Up @@ -343,4 +333,62 @@ protected void inflate() throws HoodieIOException {
protected void deflate() {
content = Option.empty();
}

/**
* Converts a given map of log metadata into a byte array representation.
*
* The conversion process involves the following steps:
* 1. Write the size of the metadata map (number of entries).
* 2. For each entry in the map:
* - Write the ordinal of the enum key (to identify the type of metadata).
* - Write the length of the value string.
* - Write the actual bytes of the value string in UTF-8 encoding.
*
* @param metadata A map containing metadata entries, where the key is an enum type representing
* the metadata type and the value is the corresponding string representation.
* @return A byte array containing the serialized metadata.
* @throws IOException If an I/O error occurs during the writing process, such as failure to write
* to the underlying output stream.
*/
private static <T extends Enum<T>> byte[] getLogMetadataBytes(Map<T, String> metadata) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
output.writeInt(metadata.size());
for (Map.Entry<T, String> entry : metadata.entrySet()) {
output.writeInt(entry.getKey().ordinal());
byte[] bytes = getUTF8Bytes(entry.getValue());
output.writeInt(bytes.length);
output.write(bytes);
}
return baos.toByteArray();
}

/**
* Convert bytes to Log Metadata, following the same order as {@link HoodieLogBlock#getHeaderMetadataBytes}
* and {@link HoodieLogBlock#getFooterMetadataBytes}.
*
* @param dis The SeekableDataInputStream to read the metadata from.
* @param typeMapper A function to map the ordinal index to the corresponding metadata type enum.
* @param <T> The type of the metadata enum (either HeaderMetadataType or FooterMetadataType).
* @return A Map containing the metadata type as the key and the metadata value as the value.
* @throws IOException If an I/O error occurs while reading the metadata.
*/
private static <T> Map<T, String> getLogMetadata(SeekableDataInputStream dis, Function<Integer, T> typeMapper) throws IOException {
Map<T, String> metadata = new HashMap<>();
// 1. Read the metadata written out
int metadataCount = dis.readInt();
try {
while (metadataCount > 0) {
int metadataEntryIndex = dis.readInt();
int metadataEntrySize = dis.readInt();
byte[] metadataEntry = new byte[metadataEntrySize];
dis.readFully(metadataEntry, 0, metadataEntrySize);
metadata.put(typeMapper.apply(metadataEntryIndex), new String(metadataEntry));
metadataCount--;
}
return metadata;
} catch (EOFException eof) {
throw new IOException("Could not read metadata fields ", eof);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public HoodieParquetDataBlock(Supplier<SeekableDataInputStream> inputStreamSuppl
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
Map<FooterMetadataType, String> footer,
String keyField) {
super(content, inputStreamSupplier, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@

import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.io.ByteArraySeekableDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;

import org.apache.avro.SchemaBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -74,6 +80,30 @@ public void testErrorHandlingInInflate(int numReadFailTimes) throws IOException
assertArrayEquals(expected, actual.get());
}

@Test
public void testHeaderMetadata() throws IOException {
Map<HoodieLogBlock.HeaderMetadataType, String> a = new HashMap<>();
a.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
a.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "1");
a.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, "{}");
a.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, "rollback");
a.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES, "1");
a.put(HoodieLogBlock.HeaderMetadataType.RECORD_POSITIONS, "");
a.put(HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER, "1");
a.put(HoodieLogBlock.HeaderMetadataType.IS_PARTIAL, "true");
byte[] bytes = HoodieLogBlock.getHeaderMetadataBytes(a);

Map<HoodieLogBlock.HeaderMetadataType, String> b = HoodieLogBlock.getHeaderMetadata(new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(bytes)));
Assertions.assertEquals("100", b.get(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME));
Assertions.assertEquals("1", b.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME));
Assertions.assertEquals("{}", b.get(HoodieLogBlock.HeaderMetadataType.SCHEMA));
Assertions.assertEquals("rollback", b.get(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE));
Assertions.assertEquals("1", b.get(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES));
Assertions.assertEquals("", b.get(HoodieLogBlock.HeaderMetadataType.RECORD_POSITIONS));
Assertions.assertEquals("1", b.get(HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER));
Assertions.assertEquals("true", b.get(HoodieLogBlock.HeaderMetadataType.IS_PARTIAL));
}

private SeekableDataInputStream prepareMockedLogInputStream(int contentSize,
int numReadFailTimes) throws IOException {
IOException exception = new IOException("Read content from log file fails");
Expand Down
Loading

0 comments on commit 899b1f6

Please sign in to comment.