Skip to content

Commit

Permalink
Refactor and add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Dec 9, 2024
1 parent 4411811 commit b71c47f
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -68,6 +69,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -95,6 +97,8 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {

Expand Down Expand Up @@ -364,6 +368,91 @@ public void testRepeatedCleanActionsWithMetadataTableEnabled(final HoodieTableTy
validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, fileSetAfterSecondCleaning);
}

@Test
void testReverseLookupSecondaryKeysInternalWithOnlyBaseFileRecord() {
String recordKey = "recordKey";
List<String> recordKeys = Collections.singletonList(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

String secondaryKey = "secondaryKey";
HoodieRecord<HoodieMetadataPayload> secondaryIndexRecord = HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false);
baseFileRecords.put(recordKey, secondaryIndexRecord);
Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(secondaryKey, r.get(recordKey));
}

@Test
void testReverseLookupSecondaryKeysInternalWithOnlyLogRecords() {
String recordKey = "recordKey";
List<String> recordKeys = Collections.singletonList(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

// Case 1: A single log record.
String secondaryKey = "secondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false));

Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(secondaryKey, r.get(recordKey));

// Case 2: Multiple log records, and the latest record is a tombstone.
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", true));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(0, r.size());

// Case 3: Multiple log records, and the latest record is not a tombstone.
String newSecondaryKey = "newSecondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, newSecondaryKey, "partitionPath", false));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(newSecondaryKey, r.get(recordKey));
}

@Test
void testReverseLookupSecondaryKeysInternal() {
String recordKey = "recordKey";
List<String> recordKeys = Collections.singletonList(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

// Case 1: Latest log record is a tombstone.
String secondaryKey = "secondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", true));
baseFileRecords.put(recordKey, HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false));
Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(0, r.size());

// Case 2: Latest log record is not a tombstone.
String newSecondaryKey = "newSecondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, newSecondaryKey, "partitionPath", false));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(newSecondaryKey, r.get(recordKey));
}

private int getNumCompactions(HoodieTableMetaClient metaClient) {
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
return timeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -826,61 +827,25 @@ protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> recordK
}

private Map<String, String> reverseLookupSecondaryKeys(String partitionName, List<String> recordKeys, FileSlice fileSlice) {
Map<String, String> recordKeyMap = new HashMap<>();
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice);
try {
HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
return Collections.emptyMap();
}

Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
// Map of recordKey (primaryKey) -> log record that is not deleted for all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
String recordKey = SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
if (!payload.isDeleted()) { // process only valid records.
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
}
} else {
deletedRecordsFromLogs.add(recordKey);
}
});
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
fetchBaseFileAllRecordsByPayloadForSecIndex(baseFileReader, keySet, partitionName);

// Map of (record-key, secondary-index-record)
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = fetchBaseFileAllRecordsByPayloadForSecIndex(baseFileReader, keySet, partitionName);
if (baseFileRecords == null || baseFileRecords.isEmpty()) {
logRecordsMap.forEach((key1, value1) -> {
if (!value1.getData().isDeleted()) {
recordKeyMap.put(key1, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value1.getRecordKey()));
}
});
} else {
// Return non-deleted records from the log files.
logRecordsMap.forEach((key, value) -> {
if (!value.getData().isDeleted()) {
recordKeyMap.put(key, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
// Return non-deleted records from the base file.
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(key, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
}
return reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordScanner);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe);
} finally {
if (!reuse) {
closeReader(readers);
}
}
return recordKeyMap;
}

@Override
Expand Down Expand Up @@ -921,4 +886,42 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecords
return SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}

@VisibleForTesting
public static Map<String, String> reverseLookupSecondaryKeysInternal(List<String> recordKeys,
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
HoodieMetadataLogRecordReader logRecordScanner) {
Map<String, String> recordKeyMap = new HashMap<>();
Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
// Map of recordKey (primaryKey) -> log record that is not deleted for all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
String recordKey = SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
if (!payload.isDeleted()) { // process only valid records.
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
}
} else {
// Only when the latest log record is non-tombstone, logRecordMap can contain the recordKey.
logRecordsMap.remove(recordKey);
deletedRecordsFromLogs.add(recordKey);
}
});

// Return non-deleted records from the log files.
logRecordsMap.forEach((key, value) -> {
recordKeyMap.put(key, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
});
// Return non-deleted records from the base file.
if (baseFileRecords != null) {
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(key, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
}
return recordKeyMap;
}
}

0 comments on commit b71c47f

Please sign in to comment.