From 8df408e4a71dd533ac9140b4015fc66b05b26183 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Mon, 23 Dec 2024 12:37:10 +0800 Subject: [PATCH] Cosmetic changes --- .../util/collection/BitCaskDiskMap.java | 16 ++++++-- .../hudi/common/util/collection/DiskMap.java | 2 +- .../util/collection/ExternalSpillableMap.java | 21 ++++++---- ...terable.java => KeyFilteringIterable.java} | 22 +++++----- .../common/util/collection/RocksDBDAO.java | 40 +++++++++++-------- .../util/collection/RocksDbDiskMap.java | 5 ++- 6 files changed, 67 insertions(+), 39 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/common/util/collection/{PredicatePushdownIterable.java => KeyFilteringIterable.java} (72%) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 0eed98683d036..01794e6cf3ba1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -154,12 +154,22 @@ private void flushToDisk() { } /** - * Custom iterator to iterate over values written to disk with pushdown predicate. + * Custom iterator to iterate over values written to disk. + */ + @Override + public Iterator iterator() { + ClosableIterator iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator(); + this.iterators.add(iterator); + return iterator; + } + + /** + * Custom iterator to iterate over values written to disk with a key filter. */ @Override public Iterator iterator(Predicate filter) { - Map needMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - ClosableIterator iterator = new LazyFileIterable(filePath, needMetadata, isCompressionEnabled).iterator(); + Map filteredValueMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + ClosableIterator iterator = new LazyFileIterable(filePath, filteredValueMetadata, isCompressionEnabled).iterator(); this.iterators.add(iterator); return iterator; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java index 6518a6fab35ef..917d749733f82 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java @@ -37,7 +37,7 @@ * @param The generic type of the keys * @param The generic type of the values */ -public abstract class DiskMap implements Map, PredicatePushdownIterable { +public abstract class DiskMap implements Map, KeyFilteringIterable { private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class); private static final String SUBFOLDER_PREFIX = "hudi"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index e3cf06636cb3a..202cfc0855472 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -56,7 +56,7 @@ * frequently and incur unnecessary disk writes. */ @NotThreadSafe -public class ExternalSpillableMap implements Map, Serializable, Closeable, PredicatePushdownIterable { +public class ExternalSpillableMap implements Map, Serializable, Closeable, KeyFilteringIterable { // Find the actual estimated payload size after inserting N records private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100; @@ -132,16 +132,23 @@ private void initDiskBasedMap() { * A custom iterator to wrap over iterating in-memory + disk spilled data. */ @Override - public Iterator iterator(Predicate filter) { - return diskBasedMap == null ? inMemoryIterator(filter) : new IteratorWrapper<>(inMemoryIterator(filter), diskIterator(filter)); + public Iterator iterator() { + return diskBasedMap == null ? inMemoryMap.values().iterator() : new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator()); } - private Iterator inMemoryIterator(Predicate filter) { - return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator(); + /** + * A custom iterator to wrap over iterating in-memory + disk spilled data. + */ + @Override + public Iterator iterator(Predicate filter) { + return diskBasedMap == null ? inMemoryMapIterator(filter) : new IteratorWrapper<>(inMemoryMapIterator(filter), diskBasedMap.iterator(filter)); } - private Iterator diskIterator(Predicate filter) { - return diskBasedMap.iterator(filter); + /** + * In-memory map iterator with a key filter. + */ + private Iterator inMemoryMapIterator(Predicate filter) { + return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/KeyFilteringIterable.java similarity index 72% rename from hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/collection/KeyFilteringIterable.java index 0b58a97002272..a7929b2f16379 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/PredicatePushdownIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/KeyFilteringIterable.java @@ -21,19 +21,19 @@ import java.util.Iterator; import java.util.function.Predicate; -public interface PredicatePushdownIterable extends Iterable { - - @Override - default Iterator iterator() { - return iterator(k -> true); - } - +/** + * An iterable that allows filtering on the element keys. + * + * @param the type of element keys + * @param the type of elements returned by the iterator + */ +public interface KeyFilteringIterable extends Iterable { /** - * Filter the values based on the given key-filter. + * Returns an iterator over elements of type {@code V}. + * + * @param filter The filter on the key of type {@code K}. * - * @param filter The filter to apply - * @return The iterator after applying the filter + * @return an Iterator. */ Iterator iterator(Predicate filter); - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java index 301796967f339..af06c26593951 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java @@ -50,6 +50,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -351,24 +352,19 @@ public T get(String columnFamil } } - private byte[] getKeyBytes(String key) { - return getUTF8Bytes(key); - } - - private byte[] getKeyBytes(K key) { - try { - return SerializationUtils.serialize(key); - } catch (IOException e) { - throw new HoodieException(e); - } - } - - public List multiGet(String columnFamilyName, List keys) { + /** + * Retrieve values for the given keys in a column family. + * + * @param columnFamilyName Column Family Name + * @param keys Keys to be retrieved + * @param Type of object stored. + */ + public List multiGetAsList(String columnFamilyName, List keys) { List byteKeys = keys.stream().map(this::getKeyBytes).collect(Collectors.toList()); ColumnFamilyHandle handle = managedHandlesMap.get(columnFamilyName); ValidationUtils.checkArgument(handle != null, "Column Family not found :" + columnFamilyName); List columnFamilyHandles = byteKeys.stream().map(key -> handle).collect(Collectors.toList()); - return multiGet(columnFamilyHandles, byteKeys); + return multiGetAsList(columnFamilyHandles, byteKeys); } /** @@ -378,11 +374,11 @@ public List multiGet(String * @param keys Keys to be retrieved * @param Type of object stored. */ - public List multiGet(List columnFamilyHandles, List keys) { + private List multiGetAsList(List columnFamilyHandles, List keys) { ValidationUtils.checkArgument(!closed); try { return getRocksDB().multiGetAsList(columnFamilyHandles, keys) - .stream().filter(val -> val != null).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList()); + .stream().filter(Objects::nonNull).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList()); } catch (RocksDBException e) { throw new HoodieException(e); } @@ -533,6 +529,18 @@ private byte[] serializePayload(T value) throws IOExcep return payload; } + private byte[] getKeyBytes(String key) { + return getUTF8Bytes(key); + } + + private byte[] getKeyBytes(K key) { + try { + return SerializationUtils.serialize(key); + } catch (IOException e) { + throw new HoodieException(e); + } + } + String getRocksDBBasePath() { return rocksDBBasePath; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java index ee8c45f6f14b5..af95ea993dcab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDbDiskMap.java @@ -144,10 +144,13 @@ public Iterator iterator() { return getRocksDb().iterator(ROCKSDB_COL_FAMILY); } + /** + * Custom iterator to iterate over values written to disk with a key filter. + */ @Override public Iterator iterator(Predicate filter) { List filteredKeys = keySet.stream().filter(filter).sorted().collect(Collectors.toList()); - List values = getRocksDb().multiGet(ROCKSDB_COL_FAMILY, filteredKeys); + List values = getRocksDb().multiGetAsList(ROCKSDB_COL_FAMILY, filteredKeys); return values.iterator(); }