From 14116b543e96e41e301b4c52c896797ad0a811b2 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 20 Dec 2024 18:09:17 +0800 Subject: [PATCH] perf: improve compaction performance by avoid unnecessary disk visiting 1. improve compaction performance by avoid unnecessary disk visiting 2. support push down predicate to `ExternalSpillableMap` Signed-off-by: TheR1sing3un --- .../org/apache/hudi/io/HoodieMergeHandle.java | 15 +++-- .../util/collection/BitCaskDiskMap.java | 9 ++- .../hudi/common/util/collection/DiskMap.java | 2 +- .../util/collection/ExternalSpillableMap.java | 15 ++++- .../collection/PredicatePushdownIterable.java | 39 +++++++++++++ .../common/util/collection/RocksDBDAO.java | 58 ++++++++++++++++--- .../util/collection/RocksDbDiskMap.java | 10 ++++ .../util/collection/TestBitCaskDiskMap.java | 13 +++++ .../util/collection/TestRocksDbDiskMap.java | 12 ++++ 9 files changed, 153 insertions(+), 20 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 31f221fb85fa9..a966ff0666f70 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -425,13 +425,18 @@ protected void writeToFile(HoodieKey key, HoodieRecord record, Schema schema, protected void writeIncomingRecords() throws IOException { // write out any pending records (this can happen when inserts are turned into updates) - Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) - ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator(); + Iterator> newRecordsItr; + if (keyToNewRecords instanceof ExternalSpillableMap) { + newRecordsItr = ((ExternalSpillableMap) keyToNewRecords).iterator(key -> !writtenRecordKeys.contains(key)); + } else { + newRecordsItr = keyToNewRecords.entrySet().stream() + .filter(e -> !writtenRecordKeys.contains(e.getKey())) + .map(Map.Entry::getValue) + .iterator(); + } while (newRecordsItr.hasNext()) { HoodieRecord hoodieRecord = newRecordsItr.next(); - if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { - writeInsertRecord(hoodieRecord); - } + writeInsertRecord(hoodieRecord); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 2f1595bfe1dd5..0eed98683d036 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -50,6 +50,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -152,11 +154,12 @@ private void flushToDisk() { } /** - * Custom iterator to iterate over values written to disk. + * Custom iterator to iterate over values written to disk with pushdown predicate. */ @Override - public Iterator iterator() { - ClosableIterator iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator(); + public Iterator iterator(Predicate filter) { + Map needMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + ClosableIterator iterator = new LazyFileIterable(filePath, needMetadata, isCompressionEnabled).iterator(); this.iterators.add(iterator); return iterator; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java index c8d57aec032eb..6518a6fab35ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java @@ -37,7 +37,7 @@ * @param The generic type of the keys * @param The generic type of the values */ -public abstract class DiskMap implements Map, Iterable { +public abstract class DiskMap implements Map, PredicatePushdownIterable { private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class); private static final String SUBFOLDER_PREFIX = "hudi"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 44b7a80203575..e3cf06636cb3a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Stream; /** @@ -55,7 +56,7 @@ * frequently and incur unnecessary disk writes. */ @NotThreadSafe -public class ExternalSpillableMap implements Map, Serializable, Closeable { +public class ExternalSpillableMap implements Map, Serializable, Closeable, PredicatePushdownIterable { // Find the actual estimated payload size after inserting N records private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100; @@ -130,9 +131,17 @@ private void initDiskBasedMap() { /** * A custom iterator to wrap over iterating in-memory + disk spilled data. */ - public Iterator iterator() { + @Override + public Iterator iterator(Predicate filter) { + return diskBasedMap == null ? inMemoryIterator(filter) : new IteratorWrapper<>(inMemoryIterator(filter), diskIterator(filter)); + } + + private Iterator inMemoryIterator(Predicate filter) { + return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator(); + } - return diskBasedMap == null ? inMemoryMap.values().iterator() : new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator()); + private Iterator diskIterator(Predicate filter) { + return diskBasedMap.iterator(filter); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java new file mode 100644 index 0000000000000..0b58a97002272 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import java.util.Iterator; +import java.util.function.Predicate; + +public interface PredicatePushdownIterable extends Iterable { + + @Override + default Iterator iterator() { + return iterator(k -> true); + } + + /** + * Filter the values based on the given key-filter. + * + * @param filter The filter to apply + * @return The iterator after applying the filter + */ + Iterator iterator(Predicate filter); + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java index 7503adc0fefd6..301796967f339 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java @@ -320,13 +320,7 @@ public void delete(String columnFamilyName, K key) { * @param Type of object stored. */ public T get(String columnFamilyName, String key) { - ValidationUtils.checkArgument(!closed); - try { - byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), getUTF8Bytes(key)); - return val == null ? null : SerializationUtils.deserialize(val); - } catch (RocksDBException e) { - throw new HoodieException(e); - } + return get(columnFamilyName, getKeyBytes(key)); } /** @@ -337,15 +331,63 @@ public T get(String columnFamilyName, String key) { * @param Type of object stored. */ public T get(String columnFamilyName, K key) { + return get(columnFamilyName, getKeyBytes(key)); + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + * @param Type of object stored. + */ + public T get(String columnFamilyName, byte[] key) { ValidationUtils.checkArgument(!closed); try { - byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key)); + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key); return val == null ? null : SerializationUtils.deserialize(val); } catch (Exception e) { throw new HoodieException(e); } } + private byte[] getKeyBytes(String key) { + return getUTF8Bytes(key); + } + + private byte[] getKeyBytes(K key) { + try { + return SerializationUtils.serialize(key); + } catch (IOException e) { + throw new HoodieException(e); + } + } + + public List multiGet(String columnFamilyName, List keys) { + List byteKeys = keys.stream().map(this::getKeyBytes).collect(Collectors.toList()); + ColumnFamilyHandle handle = managedHandlesMap.get(columnFamilyName); + ValidationUtils.checkArgument(handle != null, "Column Family not found :" + columnFamilyName); + List columnFamilyHandles = byteKeys.stream().map(key -> handle).collect(Collectors.toList()); + return multiGet(columnFamilyHandles, byteKeys); + } + + /** + * Retrieve values for the given keys in the given column families. + * + * @param columnFamilyHandles Column Family Handles + * @param keys Keys to be retrieved + * @param Type of object stored. + */ + public List multiGet(List columnFamilyHandles, List keys) { + ValidationUtils.checkArgument(!closed); + try { + return getRocksDB().multiGetAsList(columnFamilyHandles, keys) + .stream().filter(val -> val != null).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList()); + } catch (RocksDBException e) { + throw new HoodieException(e); + } + } + /** * Perform a prefix search and return stream of key-value pairs retrieved. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java index 6122982a14977..ee8c45f6f14b5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -30,9 +30,12 @@ import java.util.Collection; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.Spliterators; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -141,6 +144,13 @@ public Iterator iterator() { return getRocksDb().iterator(ROCKSDB_COL_FAMILY); } + @Override + public Iterator iterator(Predicate filter) { + List filteredKeys = keySet.stream().filter(filter).sorted().collect(Collectors.toList()); + List values = getRocksDb().multiGet(ROCKSDB_COL_FAMILY, filteredKeys); + return values.iterator(); + } + @Override public Stream valueStream() { return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(), 0), false); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java index e6080519f5ee9..68b612d90c1fd 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java @@ -119,6 +119,19 @@ public void testSimpleInsertWithoutHoodieMetadata(boolean isCompressionEnabled) assert recordKeys.contains(rec.getRecordKey()); } + + // test iterator with predicate + String firstKey = recordKeys.stream().findFirst().get(); + recordKeys.remove(firstKey); + itr = records.iterator(key -> !key.equals(firstKey)); + int cntSize = 0; + while (itr.hasNext()) { + HoodieRecord rec = itr.next(); + cntSize++; + assert recordKeys.contains(rec.getRecordKey()); + } + assertEquals(recordKeys.size(), cntSize); + verifyCleanup(records); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java index 69bd193a1a76d..a013c9e638ca9 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java @@ -122,6 +122,18 @@ public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISynta assert recordKeys.contains(rec.getRecordKey()); } assertEquals(recordKeys.size(), cntSize); + + // test iterator with predicate + String firstKey = recordKeys.stream().findFirst().get(); + recordKeys.remove(firstKey); + itr = rocksDBBasedMap.iterator(key -> !key.equals(firstKey)); + cntSize = 0; + while (itr.hasNext()) { + HoodieRecord rec = itr.next(); + cntSize++; + assert recordKeys.contains(rec.getRecordKey()); + } + assertEquals(recordKeys.size(), cntSize); } @Test