Skip to content

Commit

Permalink
perf: improve compaction performance by avoid unnecessary disk visiting
Browse files Browse the repository at this point in the history
1. improve compaction performance by avoid unnecessary disk visiting
2. support push down predicate to `ExternalSpillableMap`

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Dec 20, 2024
1 parent 4819d67 commit 14116b5
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,18 @@ protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema,

protected void writeIncomingRecords() throws IOException {
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
Iterator<HoodieRecord<T>> newRecordsItr;
if (keyToNewRecords instanceof ExternalSpillableMap) {
newRecordsItr = ((ExternalSpillableMap) keyToNewRecords).iterator(key -> !writtenRecordKeys.contains(key));
} else {
newRecordsItr = keyToNewRecords.entrySet().stream()
.filter(e -> !writtenRecordKeys.contains(e.getKey()))
.map(Map.Entry::getValue)
.iterator();
}
while (newRecordsItr.hasNext()) {
HoodieRecord<T> hoodieRecord = newRecordsItr.next();
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
writeInsertRecord(hoodieRecord);
}
writeInsertRecord(hoodieRecord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
Expand Down Expand Up @@ -152,11 +154,12 @@ private void flushToDisk() {
}

/**
* Custom iterator to iterate over values written to disk.
* Custom iterator to iterate over values written to disk with pushdown predicate.
*/
@Override
public Iterator<R> iterator() {
ClosableIterator<R> iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator();
public Iterator<R> iterator(Predicate<T> filter) {
Map<T, ValueMetadata> needMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
ClosableIterator<R> iterator = new LazyFileIterable(filePath, needMetadata, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, PredicatePushdownIterable<T, R> {

private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class);
private static final String SUBFOLDER_PREFIX = "hudi";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
Expand All @@ -55,7 +56,7 @@
* frequently and incur unnecessary disk writes.
*/
@NotThreadSafe
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable, Closeable {
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable, Closeable, PredicatePushdownIterable<T, R> {

// Find the actual estimated payload size after inserting N records
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
Expand Down Expand Up @@ -130,9 +131,17 @@ private void initDiskBasedMap() {
/**
* A custom iterator to wrap over iterating in-memory + disk spilled data.
*/
public Iterator<R> iterator() {
@Override
public Iterator<R> iterator(Predicate<T> filter) {
return diskBasedMap == null ? inMemoryIterator(filter) : new IteratorWrapper<>(inMemoryIterator(filter), diskIterator(filter));
}

private Iterator<R> inMemoryIterator(Predicate<T> filter) {
return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator();
}

return diskBasedMap == null ? inMemoryMap.values().iterator() : new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
private Iterator<R> diskIterator(Predicate<T> filter) {
return diskBasedMap.iterator(filter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util.collection;

import java.util.Iterator;
import java.util.function.Predicate;

public interface PredicatePushdownIterable<K, V> extends Iterable<V> {

@Override
default Iterator<V> iterator() {
return iterator(k -> true);
}

/**
* Filter the values based on the given key-filter.
*
* @param filter The filter to apply
* @return The iterator after applying the filter
*/
Iterator<V> iterator(Predicate<K> filter);

}
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,7 @@ public <K extends Serializable> void delete(String columnFamilyName, K key) {
* @param <T> Type of object stored.
*/
public <T extends Serializable> T get(String columnFamilyName, String key) {
ValidationUtils.checkArgument(!closed);
try {
byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), getUTF8Bytes(key));
return val == null ? null : SerializationUtils.deserialize(val);
} catch (RocksDBException e) {
throw new HoodieException(e);
}
return get(columnFamilyName, getKeyBytes(key));
}

/**
Expand All @@ -337,15 +331,63 @@ public <T extends Serializable> T get(String columnFamilyName, String key) {
* @param <T> Type of object stored.
*/
public <K extends Serializable, T extends Serializable> T get(String columnFamilyName, K key) {
return get(columnFamilyName, getKeyBytes(key));
}

/**
* Retrieve a value for a given key in a column family.
*
* @param columnFamilyName Column Family Name
* @param key Key to be retrieved
* @param <T> Type of object stored.
*/
public <K extends Serializable, T extends Serializable> T get(String columnFamilyName, byte[] key) {
ValidationUtils.checkArgument(!closed);
try {
byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key));
byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key);
return val == null ? null : SerializationUtils.deserialize(val);
} catch (Exception e) {
throw new HoodieException(e);
}
}

private byte[] getKeyBytes(String key) {
return getUTF8Bytes(key);
}

private <K extends Serializable> byte[] getKeyBytes(K key) {
try {
return SerializationUtils.serialize(key);
} catch (IOException e) {
throw new HoodieException(e);
}
}

public <K extends Serializable, T extends Serializable> List<T> multiGet(String columnFamilyName, List<K> keys) {
List<byte[]> byteKeys = keys.stream().map(this::getKeyBytes).collect(Collectors.toList());
ColumnFamilyHandle handle = managedHandlesMap.get(columnFamilyName);
ValidationUtils.checkArgument(handle != null, "Column Family not found :" + columnFamilyName);
List<ColumnFamilyHandle> columnFamilyHandles = byteKeys.stream().map(key -> handle).collect(Collectors.toList());
return multiGet(columnFamilyHandles, byteKeys);
}

/**
* Retrieve values for the given keys in the given column families.
*
* @param columnFamilyHandles Column Family Handles
* @param keys Keys to be retrieved
* @param <T> Type of object stored.
*/
public <T extends Serializable> List<T> multiGet(List<ColumnFamilyHandle> columnFamilyHandles, List<byte[]> keys) {
ValidationUtils.checkArgument(!closed);
try {
return getRocksDB().multiGetAsList(columnFamilyHandles, keys)
.stream().filter(val -> val != null).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList());
} catch (RocksDBException e) {
throw new HoodieException(e);
}
}

/**
* Perform a prefix search and return stream of key-value pairs retrieved.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -141,6 +144,13 @@ public Iterator<R> iterator() {
return getRocksDb().iterator(ROCKSDB_COL_FAMILY);
}

@Override
public Iterator<R> iterator(Predicate<T> filter) {
List<T> filteredKeys = keySet.stream().filter(filter).sorted().collect(Collectors.toList());
List<R> values = getRocksDb().multiGet(ROCKSDB_COL_FAMILY, filteredKeys);
return values.iterator();
}

@Override
public Stream<R> valueStream() {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(), 0), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ public void testSimpleInsertWithoutHoodieMetadata(boolean isCompressionEnabled)
assert recordKeys.contains(rec.getRecordKey());
}


// test iterator with predicate
String firstKey = recordKeys.stream().findFirst().get();
recordKeys.remove(firstKey);
itr = records.iterator(key -> !key.equals(firstKey));
int cntSize = 0;
while (itr.hasNext()) {
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
cntSize++;
assert recordKeys.contains(rec.getRecordKey());
}
assertEquals(recordKeys.size(), cntSize);

verifyCleanup(records);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISynta
assert recordKeys.contains(rec.getRecordKey());
}
assertEquals(recordKeys.size(), cntSize);

// test iterator with predicate
String firstKey = recordKeys.stream().findFirst().get();
recordKeys.remove(firstKey);
itr = rocksDBBasedMap.iterator(key -> !key.equals(firstKey));
cntSize = 0;
while (itr.hasNext()) {
HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
cntSize++;
assert recordKeys.contains(rec.getRecordKey());
}
assertEquals(recordKeys.size(), cntSize);
}

@Test
Expand Down

0 comments on commit 14116b5

Please sign in to comment.