Skip to content

Commit

Permalink
[HUDI-8781] Optimize executor memory usage during executing clustering (
Browse files Browse the repository at this point in the history
#12515)

* perf: optimize executor memory usage during executing clustering

1. optimize executor memory usage during executing clustering

Signed-off-by: TheR1sing3un <[email protected]>

* Cosmetic changes

---------

Signed-off-by: TheR1sing3un <[email protected]>
Co-authored-by: danny0405 <[email protected]>
  • Loading branch information
TheR1sing3un and danny0405 authored Dec 19, 2024
1 parent cb447c9 commit 9da3221
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/**
* Provides iterator interface over List of iterators. Consumes all records from first iterator element
* before moving to next iterator in the list. That is concatenate elements across multiple iterators.
* before moving to next iterator in the list. That is concatenating elements across multiple iterators.
*
* @param <T>
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;

/**
* Provides iterator interface over List of iterators. Consumes all records from first iterator element
* before moving to next iterator in the list. That is concatenating elements across multiple iterators.
*
* <p>Different with {@link ConcatenatingIterator}, the internal iterators are instantiated lazily.
*/
public class LazyConcatenatingIterator<T> implements ClosableIterator<T> {

private final Queue<Supplier<ClosableIterator<T>>> iteratorSuppliers;

private ClosableIterator<T> itr;

private boolean initialed = false;

private boolean closed = false;

public LazyConcatenatingIterator(List<Supplier<ClosableIterator<T>>> iteratorSuppliers) {
this.iteratorSuppliers = new LinkedList<>(iteratorSuppliers);
}

@Override
public void close() {
if (!closed) {
if (itr != null) {
itr.close();
itr = null;
}
iteratorSuppliers.clear();
closed = true;
}
}

@Override
public boolean hasNext() {
init();
while (itr != null) {
if (itr.hasNext()) {
return true;
}
// close current iterator
this.itr.close();
if (!iteratorSuppliers.isEmpty()) {
// move to the next
itr = iteratorSuppliers.poll().get();
} else {
itr = null;
}
}
return false;
}

@Override
public T next() {
ValidationUtils.checkState(hasNext(), "No more elements left");
return itr.next();
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private void init() {
if (!initialed) {
if (!this.iteratorSuppliers.isEmpty()) {
this.itr = iteratorSuppliers.poll().get();
}
initialed = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utils;

import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;

public class TestLazyConcatenatingIterator {

int initTimes;
int closeTimes;

private class MockClosableIterator implements ClosableIterator {

Iterator<Integer> iterator;

public MockClosableIterator(Iterator<Integer> iterator) {
initTimes++;
this.iterator = iterator;
}

@Override
public void close() {
closeTimes++;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Object next() {
return iterator.next();
}
}

// Simple test for iterator concatenation
@Test
public void testConcatBasic() {
Supplier<ClosableIterator<Integer>> i1 = () -> new MockClosableIterator(Arrays.asList(5, 3, 2, 1).iterator());
Supplier<ClosableIterator<Integer>> i2 = () -> new MockClosableIterator(Collections.emptyIterator()); // empty iterator
Supplier<ClosableIterator<Integer>> i3 = () -> new MockClosableIterator(Collections.singletonList(3).iterator());

LazyConcatenatingIterator<Integer> ci = new LazyConcatenatingIterator<>(Arrays.asList(i1, i2, i3));

assertEquals(0, initTimes);

List<Integer> allElements = new ArrayList<>();
int count = 0;
while (ci.hasNext()) {
count++;
if (count == 1) {
assertEquals(1, initTimes);
assertEquals(0, closeTimes);
}
if (count == 5) {
assertEquals(3, initTimes);
assertEquals(2, closeTimes);
}
allElements.add(ci.next());
}

assertEquals(3, initTimes);
assertEquals(3, closeTimes);

assertEquals(5, allElements.size());
assertEquals(Arrays.asList(5, 3, 2, 1, 3), allElements);
}

@Test
public void testConcatError() {
Supplier<ClosableIterator<Integer>> i1 = () -> new MockClosableIterator(Collections.emptyIterator()); // empty iterator

LazyConcatenatingIterator<Integer> ci = new LazyConcatenatingIterator<>(Collections.singletonList(i1));
assertFalse(ci.hasNext());
try {
ci.next();
fail("expected error for empty iterator");
} catch (IllegalStateException e) {
//
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.SerializableSchema;
Expand Down Expand Up @@ -111,6 +111,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -336,44 +337,47 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());

return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.withTableMetaClient(table.getMetaClient())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp));
recordIterators.add(new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp()))));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});

return new ConcatenatingIterator<>(recordIterators);
Supplier<ClosableIterator<HoodieRecord<T>>> iteratorSupplier = () -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.withTableMetaClient(table.getMetaClient())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp));
return new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
};
suppliers.add(iteratorSupplier);
});
return new LazyConcatenatingIterator<>(suppliers);
}));
}

Expand All @@ -395,27 +399,29 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex

return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
.mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> iteratorsForPartition = new ArrayList<>();
List<Supplier<ClosableIterator<HoodieRecord<T>>>> iteratorGettersForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader baseFileReader = getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp);

Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
CloseableMappingIterator mappingIterator = new CloseableMappingIterator(
(ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp));
iteratorsForPartition.add(mappingIterator);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
Supplier<ClosableIterator<HoodieRecord<T>>> recordIteratorGetter = () -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader baseFileReader = getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp);

Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
return new CloseableMappingIterator(
(ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
};
iteratorGettersForPartition.add(recordIteratorGetter);
});

return new ConcatenatingIterator<>(iteratorsForPartition);
return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
}));
}

Expand Down

0 comments on commit 9da3221

Please sign in to comment.