diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java index aa6c29b0844a2..a380fa35adf02 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java @@ -27,7 +27,7 @@ /** * Provides iterator interface over List of iterators. Consumes all records from first iterator element - * before moving to next iterator in the list. That is concatenate elements across multiple iterators. + * before moving to next iterator in the list. That is concatenating elements across multiple iterators. * * @param */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java new file mode 100644 index 0000000000000..048c315f276b9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.Supplier; + +/** + * Provides iterator interface over List of iterators. Consumes all records from first iterator element + * before moving to next iterator in the list. That is concatenating elements across multiple iterators. + * + *

Different with {@link ConcatenatingIterator}, the internal iterators are instantiated lazily. + */ +public class LazyConcatenatingIterator implements ClosableIterator { + + private final Queue>> iteratorSuppliers; + + private ClosableIterator itr; + + private boolean initialed = false; + + private boolean closed = false; + + public LazyConcatenatingIterator(List>> iteratorSuppliers) { + this.iteratorSuppliers = new LinkedList<>(iteratorSuppliers); + } + + @Override + public void close() { + if (!closed) { + if (itr != null) { + itr.close(); + itr = null; + } + iteratorSuppliers.clear(); + closed = true; + } + } + + @Override + public boolean hasNext() { + init(); + while (itr != null) { + if (itr.hasNext()) { + return true; + } + // close current iterator + this.itr.close(); + if (!iteratorSuppliers.isEmpty()) { + // move to the next + itr = iteratorSuppliers.poll().get(); + } else { + itr = null; + } + } + return false; + } + + @Override + public T next() { + ValidationUtils.checkState(hasNext(), "No more elements left"); + return itr.next(); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void init() { + if (!initialed) { + if (!this.iteratorSuppliers.isEmpty()) { + this.itr = iteratorSuppliers.poll().get(); + } + initialed = true; + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java new file mode 100644 index 0000000000000..fa1a37d027761 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.client.utils.LazyConcatenatingIterator; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestLazyConcatenatingIterator { + + int initTimes; + int closeTimes; + + private class MockClosableIterator implements ClosableIterator { + + Iterator iterator; + + public MockClosableIterator(Iterator iterator) { + initTimes++; + this.iterator = iterator; + } + + @Override + public void close() { + closeTimes++; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Object next() { + return iterator.next(); + } + } + + // Simple test for iterator concatenation + @Test + public void testConcatBasic() { + Supplier> i1 = () -> new MockClosableIterator(Arrays.asList(5, 3, 2, 1).iterator()); + Supplier> i2 = () -> new MockClosableIterator(Collections.emptyIterator()); // empty iterator + Supplier> i3 = () -> new MockClosableIterator(Collections.singletonList(3).iterator()); + + LazyConcatenatingIterator ci = new LazyConcatenatingIterator<>(Arrays.asList(i1, i2, i3)); + + assertEquals(0, initTimes); + + List allElements = new ArrayList<>(); + int count = 0; + while (ci.hasNext()) { + count++; + if (count == 1) { + assertEquals(1, initTimes); + assertEquals(0, closeTimes); + } + if (count == 5) { + assertEquals(3, initTimes); + assertEquals(2, closeTimes); + } + allElements.add(ci.next()); + } + + assertEquals(3, initTimes); + assertEquals(3, closeTimes); + + assertEquals(5, allElements.size()); + assertEquals(Arrays.asList(5, 3, 2, 1, 3), allElements); + } + + @Test + public void testConcatError() { + Supplier> i1 = () -> new MockClosableIterator(Collections.emptyIterator()); // empty iterator + + LazyConcatenatingIterator ci = new LazyConcatenatingIterator<>(Collections.singletonList(i1)); + assertFalse(ci.hasNext()); + try { + ci.next(); + fail("expected error for empty iterator"); + } catch (IllegalStateException e) { + // + } + } + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 0c28a9736fa32..552041b8ecf1b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -26,7 +26,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.client.utils.LazyConcatenatingIterator; import org.apache.hudi.common.config.HoodieMemoryConfig; import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.SerializableSchema; @@ -111,6 +111,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -336,44 +337,47 @@ private HoodieData> readRecordsForGroupWithLogs(JavaSparkContext int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size()); return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions(clusteringOpsPartition -> { - List>> recordIterators = new ArrayList<>(); + List>>> suppliers = new ArrayList<>(); clusteringOpsPartition.forEachRemaining(clusteringOp -> { - long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); - LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withStorage(table.getStorage()) - .withBasePath(table.getMetaClient().getBasePath()) - .withLogFilePaths(clusteringOp.getDeltaFilePaths()) - .withReaderSchema(readerSchema) - .withLatestInstantTime(instantTime) - .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReverseReader(config.getCompactionReverseLogReadEnabled()) - .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withSpillableMapBasePath(config.getSpillableMapBasePath()) - .withPartition(clusteringOp.getPartitionPath()) - .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan()) - .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .withRecordMerger(config.getRecordMerger()) - .withTableMetaClient(table.getMetaClient()) - .build(); - - Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) - ? Option.empty() - : Option.of(getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp)); - recordIterators.add(new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(), - tableConfig.getProps(), - tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp())))); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } - }); - return new ConcatenatingIterator<>(recordIterators); + Supplier>> iteratorSupplier = () -> { + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); + LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withStorage(table.getStorage()) + .withBasePath(table.getMetaClient().getBasePath()) + .withLogFilePaths(clusteringOp.getDeltaFilePaths()) + .withReaderSchema(readerSchema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withPartition(clusteringOp.getPartitionPath()) + .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) + .withRecordMerger(config.getRecordMerger()) + .withTableMetaClient(table.getMetaClient()) + .build(); + + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + ? Option.empty() + : Option.of(getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp)); + return new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(), + tableConfig.getProps(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp()))); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }; + suppliers.add(iteratorSupplier); + }); + return new LazyConcatenatingIterator<>(suppliers); })); } @@ -395,27 +399,29 @@ private HoodieData> readRecordsForGroupBaseFiles(JavaSparkContex return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism) .mapPartitions(clusteringOpsPartition -> { - List>> iteratorsForPartition = new ArrayList<>(); + List>>> iteratorGettersForPartition = new ArrayList<>(); clusteringOpsPartition.forEachRemaining(clusteringOp -> { - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); - HoodieFileReader baseFileReader = getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp); - - Option keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); - // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific - // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of - // it since these records will be shuffled later. - CloseableMappingIterator mappingIterator = new CloseableMappingIterator( - (ClosableIterator) baseFileReader.getRecordIterator(readerSchema), - rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp)); - iteratorsForPartition.add(mappingIterator); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } + Supplier>> recordIteratorGetter = () -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); + HoodieFileReader baseFileReader = getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, clusteringOp); + + Option keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig); + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific + // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of + // it since these records will be shuffled later. + return new CloseableMappingIterator( + (ClosableIterator) baseFileReader.getRecordIterator(readerSchema), + rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp)); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }; + iteratorGettersForPartition.add(recordIteratorGetter); }); - return new ConcatenatingIterator<>(iteratorsForPartition); + return new LazyConcatenatingIterator<>(iteratorGettersForPartition); })); }