Skip to content

Commit

Permalink
Cosmetic changes
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Dec 23, 2024
1 parent 14116b5 commit 8df408e
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,22 @@ private void flushToDisk() {
}

/**
* Custom iterator to iterate over values written to disk with pushdown predicate.
* Custom iterator to iterate over values written to disk.
*/
@Override
public Iterator<R> iterator() {
ClosableIterator<R> iterator = new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}

/**
* Custom iterator to iterate over values written to disk with a key filter.
*/
@Override
public Iterator<R> iterator(Predicate<T> filter) {
Map<T, ValueMetadata> needMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
ClosableIterator<R> iterator = new LazyFileIterable(filePath, needMetadata, isCompressionEnabled).iterator();
Map<T, ValueMetadata> filteredValueMetadata = valueMetadataMap.entrySet().stream().filter(e -> filter.test(e.getKey())).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
ClosableIterator<R> iterator = new LazyFileIterable(filePath, filteredValueMetadata, isCompressionEnabled).iterator();
this.iterators.add(iterator);
return iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @param <T> The generic type of the keys
* @param <R> The generic type of the values
*/
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, PredicatePushdownIterable<T, R> {
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, KeyFilteringIterable<T, R> {

private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class);
private static final String SUBFOLDER_PREFIX = "hudi";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* frequently and incur unnecessary disk writes.
*/
@NotThreadSafe
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable, Closeable, PredicatePushdownIterable<T, R> {
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable, Closeable, KeyFilteringIterable<T, R> {

// Find the actual estimated payload size after inserting N records
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
Expand Down Expand Up @@ -132,16 +132,23 @@ private void initDiskBasedMap() {
* A custom iterator to wrap over iterating in-memory + disk spilled data.
*/
@Override
public Iterator<R> iterator(Predicate<T> filter) {
return diskBasedMap == null ? inMemoryIterator(filter) : new IteratorWrapper<>(inMemoryIterator(filter), diskIterator(filter));
public Iterator<R> iterator() {
return diskBasedMap == null ? inMemoryMap.values().iterator() : new IteratorWrapper<>(inMemoryMap.values().iterator(), diskBasedMap.iterator());
}

private Iterator<R> inMemoryIterator(Predicate<T> filter) {
return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator();
/**
* A custom iterator to wrap over iterating in-memory + disk spilled data.
*/
@Override
public Iterator<R> iterator(Predicate<T> filter) {
return diskBasedMap == null ? inMemoryMapIterator(filter) : new IteratorWrapper<>(inMemoryMapIterator(filter), diskBasedMap.iterator(filter));
}

private Iterator<R> diskIterator(Predicate<T> filter) {
return diskBasedMap.iterator(filter);
/**
* In-memory map iterator with a key filter.
*/
private Iterator<R> inMemoryMapIterator(Predicate<T> filter) {
return inMemoryMap.entrySet().stream().filter(entry -> filter.test(entry.getKey())).map(Map.Entry::getValue).iterator();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
import java.util.Iterator;
import java.util.function.Predicate;

public interface PredicatePushdownIterable<K, V> extends Iterable<V> {

@Override
default Iterator<V> iterator() {
return iterator(k -> true);
}

/**
* An iterable that allows filtering on the element keys.
*
* @param <K> the type of element keys
* @param <V> the type of elements returned by the iterator
*/
public interface KeyFilteringIterable<K, V> extends Iterable<V> {
/**
* Filter the values based on the given key-filter.
* Returns an iterator over elements of type {@code V}.
*
* @param filter The filter on the key of type {@code K}.
*
* @param filter The filter to apply
* @return The iterator after applying the filter
* @return an Iterator.
*/
Iterator<V> iterator(Predicate<K> filter);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -351,24 +352,19 @@ public <K extends Serializable, T extends Serializable> T get(String columnFamil
}
}

private byte[] getKeyBytes(String key) {
return getUTF8Bytes(key);
}

private <K extends Serializable> byte[] getKeyBytes(K key) {
try {
return SerializationUtils.serialize(key);
} catch (IOException e) {
throw new HoodieException(e);
}
}

public <K extends Serializable, T extends Serializable> List<T> multiGet(String columnFamilyName, List<K> keys) {
/**
* Retrieve values for the given keys in a column family.
*
* @param columnFamilyName Column Family Name
* @param keys Keys to be retrieved
* @param <T> Type of object stored.
*/
public <K extends Serializable, T extends Serializable> List<T> multiGetAsList(String columnFamilyName, List<K> keys) {
List<byte[]> byteKeys = keys.stream().map(this::getKeyBytes).collect(Collectors.toList());
ColumnFamilyHandle handle = managedHandlesMap.get(columnFamilyName);
ValidationUtils.checkArgument(handle != null, "Column Family not found :" + columnFamilyName);
List<ColumnFamilyHandle> columnFamilyHandles = byteKeys.stream().map(key -> handle).collect(Collectors.toList());
return multiGet(columnFamilyHandles, byteKeys);
return multiGetAsList(columnFamilyHandles, byteKeys);
}

/**
Expand All @@ -378,11 +374,11 @@ public <K extends Serializable, T extends Serializable> List<T> multiGet(String
* @param keys Keys to be retrieved
* @param <T> Type of object stored.
*/
public <T extends Serializable> List<T> multiGet(List<ColumnFamilyHandle> columnFamilyHandles, List<byte[]> keys) {
private <T extends Serializable> List<T> multiGetAsList(List<ColumnFamilyHandle> columnFamilyHandles, List<byte[]> keys) {
ValidationUtils.checkArgument(!closed);
try {
return getRocksDB().multiGetAsList(columnFamilyHandles, keys)
.stream().filter(val -> val != null).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList());
.stream().filter(Objects::nonNull).map(val -> (T) SerializationUtils.deserialize(val)).collect(Collectors.toList());
} catch (RocksDBException e) {
throw new HoodieException(e);
}
Expand Down Expand Up @@ -533,6 +529,18 @@ private <T extends Serializable> byte[] serializePayload(T value) throws IOExcep
return payload;
}

private byte[] getKeyBytes(String key) {
return getUTF8Bytes(key);
}

private <K extends Serializable> byte[] getKeyBytes(K key) {
try {
return SerializationUtils.serialize(key);
} catch (IOException e) {
throw new HoodieException(e);
}
}

String getRocksDBBasePath() {
return rocksDBBasePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ public Iterator<R> iterator() {
return getRocksDb().iterator(ROCKSDB_COL_FAMILY);
}

/**
* Custom iterator to iterate over values written to disk with a key filter.
*/
@Override
public Iterator<R> iterator(Predicate<T> filter) {
List<T> filteredKeys = keySet.stream().filter(filter).sorted().collect(Collectors.toList());
List<R> values = getRocksDb().multiGet(ROCKSDB_COL_FAMILY, filteredKeys);
List<R> values = getRocksDb().multiGetAsList(ROCKSDB_COL_FAMILY, filteredKeys);
return values.iterator();
}

Expand Down

0 comments on commit 8df408e

Please sign in to comment.