diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 78270d7523bc..ade2ac9c3741 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -3193,6 +3193,12 @@ public boolean clusteringIncrementalEnabled() { return options.get(CLUSTERING_INCREMENTAL); } + public boolean bucketClusterEnabled() { + return !bucketAppendOrdered() + && !deletionVectorsEnabled() + && clusteringIncrementalEnabled(); + } + public Duration clusteringHistoryPartitionIdleTime() { return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index f3f0859d8c56..51cfe58b42fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -141,7 +141,8 @@ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer wr newScan(), options, dvMaintainerFactory, - tableName); + tableName, + schemaManager); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java new file mode 100644 index 000000000000..154d10c9d33f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.AppendOnlyFileStore; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.compact.CompactFutureManager; +import org.apache.paimon.compact.CompactResult; +import org.apache.paimon.compact.CompactTask; +import org.apache.paimon.compact.CompactUnit; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.utils.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** Cluster manager for {@link AppendOnlyFileStore}. */ +public class BucketedAppendClusterManager extends CompactFutureManager { + + private static final Logger LOG = LoggerFactory.getLogger(BucketedAppendClusterManager.class); + + private final ExecutorService executor; + private final BucketedAppendLevels levels; + private final IncrementalClusterStrategy strategy; + private final CompactRewriter rewriter; + + public BucketedAppendClusterManager( + ExecutorService executor, + List restored, + SchemaManager schemaManager, + List clusterKeys, + int maxSizeAmp, + int sizeRatio, + int numRunCompactionTrigger, + int numLevels, + CompactRewriter rewriter) { + this.executor = executor; + this.levels = new BucketedAppendLevels(restored, numLevels); + this.strategy = + new IncrementalClusterStrategy( + schemaManager, clusterKeys, maxSizeAmp, sizeRatio, numRunCompactionTrigger); + this.rewriter = rewriter; + } + + @Override + public boolean shouldWaitForLatestCompaction() { + return false; + } + + @Override + public boolean shouldWaitForPreparingCheckpoint() { + return false; + } + + @Override + public void addNewFile(DataFileMeta file) { + levels.addLevel0File(file); + } + + @Override + public List allFiles() { + return levels.allFiles(); + } + + @Override + public void triggerCompaction(boolean fullCompaction) { + Optional optionalUnit; + List runs = levels.levelSortedRuns(); + if (fullCompaction) { + Preconditions.checkState( + taskFuture == null, + "A compaction task is still running while the user " + + "forces a new compaction. This is unexpected."); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Trigger forced full compaction. Picking from the following runs\n{}", + runs); + } + optionalUnit = strategy.pick(levels.numberOfLevels(), runs, true); + } else { + if (taskFuture != null) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Trigger normal compaction. Picking from the following runs\n{}", runs); + } + optionalUnit = + strategy.pick(levels.numberOfLevels(), runs, false) + .filter(unit -> !unit.files().isEmpty()); + } + + optionalUnit.ifPresent( + unit -> { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Submit compaction with files (name, level, size): " + + levels.levelSortedRuns().stream() + .flatMap(lsr -> lsr.run().files().stream()) + .map( + file -> + String.format( + "(%s, %d, %d)", + file.fileName(), + file.level(), + file.fileSize())) + .collect(Collectors.joining(", "))); + } + submitCompaction(unit); + }); + } + + private void submitCompaction(CompactUnit unit) { + + BucketedAppendClusterTask task = + new BucketedAppendClusterTask(unit.files(), unit.outputLevel(), rewriter); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Pick these files (name, level, size) for {} compaction: {}", + task.getClass().getSimpleName(), + unit.files().stream() + .map( + file -> + String.format( + "(%s, %d, %d)", + file.fileName(), file.level(), file.fileSize())) + .collect(Collectors.joining(", "))); + } + taskFuture = executor.submit(task); + } + + @Override + public Optional getCompactionResult(boolean blocking) + throws ExecutionException, InterruptedException { + Optional result = innerGetCompactionResult(blocking); + result.ifPresent( + r -> { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Update levels in compact manager with these changes:\nBefore:\n{}\nAfter:\n{}", + r.before(), + r.after()); + } + levels.update(r.before(), r.after()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Levels in compact manager updated. Current runs are\n{}", + levels.levelSortedRuns()); + } + }); + return result; + } + + @Override + public void close() throws IOException {} + + @VisibleForTesting + public BucketedAppendLevels levels() { + return levels; + } + + /** A {@link CompactTask} impl for clustering of append bucketed table. */ + public static class BucketedAppendClusterTask extends CompactTask { + + private final List toCluster; + private final int outputLevel; + private final CompactRewriter rewriter; + + public BucketedAppendClusterTask( + List toCluster, int outputLevel, CompactRewriter rewriter) { + super(null); + this.toCluster = toCluster; + this.outputLevel = outputLevel; + this.rewriter = rewriter; + } + + @Override + protected CompactResult doCompact() throws Exception { + List rewrite = rewriter.rewrite(toCluster); + return new CompactResult(toCluster, upgrade(rewrite)); + } + + protected List upgrade(List files) { + return files.stream() + .map(file -> file.upgrade(outputLevel)) + .collect(Collectors.toList()); + } + } + + /** Compact rewriter for append-only table. */ + public interface CompactRewriter { + List rewrite(List compactBefore) throws Exception; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java new file mode 100644 index 000000000000..ba62fd9748da --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.mergetree.LevelSortedRun; +import org.apache.paimon.mergetree.SortedRun; +import org.apache.paimon.utils.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** A class which stores all level files in append bucketed table. */ +public class BucketedAppendLevels { + + private final HashSet level0; + + private final List levels; + + public BucketedAppendLevels(List inputFiles, int numLevels) { + int restoredNumLevels = + Math.max( + numLevels, + inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1); + checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2."); + this.level0 = new HashSet<>(); + this.levels = new ArrayList<>(); + for (int i = 1; i < restoredNumLevels; i++) { + levels.add(SortedRun.empty()); + } + + Map> levelMap = new HashMap<>(); + for (DataFileMeta file : inputFiles) { + levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file); + } + levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files)); + + Preconditions.checkState( + level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum() + == inputFiles.size(), + "Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected."); + } + + public void addLevel0File(DataFileMeta file) { + checkArgument(file.level() == 0); + level0.add(file); + } + + public SortedRun runOfLevel(int level) { + checkArgument(level > 0, "Level0 does not have one single sorted run."); + return levels.get(level - 1); + } + + public int numberOfLevels() { + return levels.size() + 1; + } + + public int maxLevel() { + return levels.size(); + } + + public List allFiles() { + List files = new ArrayList<>(); + List runs = levelSortedRuns(); + for (LevelSortedRun run : runs) { + files.addAll(run.run().files()); + } + return files; + } + + public List levelSortedRuns() { + List runs = new ArrayList<>(); + level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file)))); + for (int i = 0; i < levels.size(); i++) { + SortedRun run = levels.get(i); + if (run.nonEmpty()) { + runs.add(new LevelSortedRun(i + 1, run)); + } + } + return runs; + } + + public void update(List before, List after) { + Map> groupedBefore = groupByLevel(before); + Map> groupedAfter = groupByLevel(after); + for (int i = 0; i < numberOfLevels(); i++) { + updateLevel( + i, + groupedBefore.getOrDefault(i, emptyList()), + groupedAfter.getOrDefault(i, emptyList())); + } + } + + private void updateLevel(int level, List before, List after) { + if (before.isEmpty() && after.isEmpty()) { + return; + } + + if (level == 0) { + before.forEach(level0::remove); + level0.addAll(after); + } else { + List files = new ArrayList<>(runOfLevel(level).files()); + files.removeAll(before); + files.addAll(after); + levels.set(level - 1, SortedRun.fromSorted(files)); + } + } + + private Map> groupByLevel(List files) { + return files.stream() + .collect(Collectors.groupingBy(DataFileMeta::level, Collectors.toList())); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java new file mode 100644 index 000000000000..fcd59ce0d3a1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.sort.hilbert.HilbertIndexer; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Hilbert sorter for clustering. */ +public class HibertSorter extends Sorter { + + private static final RowType KEY_TYPE = + new RowType(Collections.singletonList(new DataField(0, "H_INDEX", DataTypes.BYTES()))); + + private final HibertKeyAbstract hilbertKeyAbstract; + + public HibertSorter( + RecordReaderIterator reader, + RowType valueType, + CoreOptions options, + List orderColNames, + IOManager ioManager) { + super(reader, KEY_TYPE, valueType, options, ioManager); + this.hilbertKeyAbstract = new HibertKeyAbstract(valueType, orderColNames); + this.hilbertKeyAbstract.open(); + } + + @Override + public InternalRow assignSortKey(InternalRow row) { + byte[] key = hilbertKeyAbstract.apply(row); + return new JoinedRow(GenericRow.of(key), row); + } + + private static class HibertKeyAbstract implements KeyAbstract { + + private final HilbertIndexer hilbertIndexer; + + public HibertKeyAbstract(RowType rowType, List orderColNames) { + hilbertIndexer = new HilbertIndexer(rowType, orderColNames); + } + + @Override + public void open() { + hilbertIndexer.open(); + } + + @Override + public byte[] apply(InternalRow value) { + byte[] hilbert = hilbertIndexer.index(value); + return Arrays.copyOf(hilbert, hilbert.length); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java new file mode 100644 index 000000000000..ee2ace3f3afe --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Projection; + +import java.util.List; + +import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix; + +/** Order sorter for clustering. */ +public class OrderSorter extends Sorter { + + private final OrderKeyAbstract orderKeyAbstract; + + public OrderSorter( + RecordReaderIterator reader, + RowType valueType, + CoreOptions options, + List orderColNames, + IOManager ioManager) { + super(reader, keyType(valueType, orderColNames), valueType, options, ioManager); + this.orderKeyAbstract = new OrderKeyAbstract(valueType, orderColNames); + this.orderKeyAbstract.open(); + } + + @Override + public InternalRow assignSortKey(InternalRow row) { + InternalRow key = orderKeyAbstract.apply(row); + return new JoinedRow(key, row); + } + + private static RowType keyType(RowType valueType, List orderColNames) { + List fieldNames = valueType.getFieldNames(); + int[] keyProjectionMap = orderColNames.stream().mapToInt(fieldNames::indexOf).toArray(); + return addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueType)); + } + + private static class OrderKeyAbstract implements KeyAbstract { + + private final RowType valueRowType; + private final int[] keyProjectionMap; + + private transient org.apache.paimon.codegen.Projection keyProjection; + + public OrderKeyAbstract(RowType rowType, List orderColNames) { + this.valueRowType = rowType; + List fieldNames = rowType.getFieldNames(); + this.keyProjectionMap = orderColNames.stream().mapToInt(fieldNames::indexOf).toArray(); + } + + @Override + public void open() { + // use key gen to speed up projection + keyProjection = CodeGenUtils.newProjection(valueRowType, keyProjectionMap); + } + + @Override + public InternalRow apply(InternalRow value) { + return keyProjection.apply(value).copy(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java new file mode 100644 index 000000000000..60dbecdde0d9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.KeyProjectedRow; +import org.apache.paimon.utils.MutableObjectIterator; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +/** Sorter for clustering. */ +public abstract class Sorter { + + protected final RecordReaderIterator reader; + protected final RowType keyType; + protected final RowType longRowType; + protected final int[] valueProjectionMap; + private final int arity; + + private final transient IOManager ioManager; + private final transient BinaryExternalSortBuffer buffer; + + public Sorter( + RecordReaderIterator reader, + RowType keyType, + RowType valueType, + CoreOptions options, + IOManager ioManager) { + this.reader = reader; + this.keyType = keyType; + int keyFieldCount = keyType.getFieldCount(); + int valueFieldCount = valueType.getFieldCount(); + this.valueProjectionMap = new int[valueFieldCount]; + for (int i = 0; i < valueFieldCount; i++) { + this.valueProjectionMap[i] = i + keyFieldCount; + } + List keyFields = keyType.getFields(); + List dataFields = valueType.getFields(); + List fields = new ArrayList<>(); + fields.addAll(keyFields); + fields.addAll(dataFields); + this.longRowType = new RowType(fields); + this.arity = longRowType.getFieldCount(); + + long maxMemory = options.writeBufferSize(); + int pageSize = options.pageSize(); + int spillSortMaxNumFiles = options.localSortMaxNumFileHandles(); + CompressOptions spillCompression = options.spillCompressOptions(); + MemorySize maxDiskSize = options.writeBufferSpillDiskSize(); + boolean sequenceOrder = options.sequenceFieldSortOrderIsAscending(); + + this.ioManager = ioManager; + this.buffer = + BinaryExternalSortBuffer.create( + ioManager, + longRowType, + IntStream.range(0, keyType.getFieldCount()).toArray(), + maxMemory, + pageSize, + spillSortMaxNumFiles, + spillCompression, + maxDiskSize, + sequenceOrder); + } + + public abstract InternalRow assignSortKey(InternalRow row); + + public InternalRow removeSortKey(InternalRow rowWithKey) { + KeyProjectedRow keyProjectedRow = new KeyProjectedRow(valueProjectionMap); + return keyProjectedRow.replaceRow(rowWithKey); + } + + public MutableObjectIterator sort() throws IOException { + while (reader.hasNext()) { + InternalRow row = reader.next(); + InternalRow rowWithKey = assignSortKey(row); + buffer.write(rowWithKey); + } + + if (buffer.size() > 0) { + return buffer.sortedIterator(); + } else { + throw new IllegalStateException("numRecords after sorting is 0."); + } + } + + public int arity() { + return arity; + } + + public void close() throws Exception { + if (buffer != null) { + buffer.clear(); + } + if (ioManager != null) { + ioManager.close(); + } + } + + public static Sorter getSorter( + RecordReaderIterator reader, + IOManager ioManager, + RowType rowType, + CoreOptions options) { + CoreOptions.OrderType clusterCurve = + options.clusteringStrategy(options.clusteringColumns().size()); + switch (clusterCurve) { + case HILBERT: + return new HibertSorter( + reader, rowType, options, options.clusteringColumns(), ioManager); + case ZORDER: + return new ZorderSorter( + reader, rowType, options, options.clusteringColumns(), ioManager); + case ORDER: + return new OrderSorter( + reader, rowType, options, options.clusteringColumns(), ioManager); + default: + throw new IllegalArgumentException("cannot match cluster type: " + clusterCurve); + } + } + + /** Abstract key from a row data. */ + public interface KeyAbstract extends Serializable { + default void open() {} + + KEY apply(InternalRow value); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java new file mode 100644 index 000000000000..57e95d06c6b3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.sort.zorder.ZIndexer; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Z-order sorter for clustering. */ +public class ZorderSorter extends Sorter { + + private static final RowType KEY_TYPE = + new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", DataTypes.BYTES()))); + + private final ZorderKeyAbstract zorderKeyAbstract; + + public ZorderSorter( + RecordReaderIterator reader, + RowType valueType, + CoreOptions options, + List orderColNames, + IOManager ioManager) { + super(reader, KEY_TYPE, valueType, options, ioManager); + this.zorderKeyAbstract = new ZorderKeyAbstract(valueType, options, orderColNames); + this.zorderKeyAbstract.open(); + } + + @Override + public InternalRow assignSortKey(InternalRow row) { + byte[] key = zorderKeyAbstract.apply(row); + return new JoinedRow(GenericRow.of(key), row); + } + + private static class ZorderKeyAbstract implements KeyAbstract { + + private final ZIndexer zIndexer; + + public ZorderKeyAbstract(RowType rowType, CoreOptions options, List orderColNames) { + zIndexer = new ZIndexer(rowType, orderColNames, options.varTypeSize()); + } + + @Override + public void open() { + zIndexer.open(); + } + + @Override + public byte[] apply(InternalRow value) { + byte[] zorder = zIndexer.index(value); + return Arrays.copyOf(zorder, zorder.length); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 6dee9473081e..1c63f41b0284 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -21,6 +21,7 @@ import org.apache.paimon.AppendOnlyFileStore; import org.apache.paimon.CoreOptions; import org.apache.paimon.append.AppendOnlyWriter; +import org.apache.paimon.append.cluster.Sorter; import org.apache.paimon.compact.CompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -41,6 +42,7 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IOExceptionSupplier; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.MutableObjectIterator; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StatsCollectorFactories; @@ -205,6 +207,42 @@ public List compactRewrite( return rewriter.result(); } + public List clusterRewrite( + BinaryRow partition, int bucket, List toCluster) throws Exception { + RecordReaderIterator reader = + createFilesIterator(partition, bucket, toCluster, null); + + // sort and rewrite + Exception collectedExceptions = null; + Sorter sorter = Sorter.getSorter(reader, ioManager, rowType, options); + RowDataRollingFileWriter rewriter = + createRollingFileWriter( + partition, bucket, new LongCounter(toCluster.get(0).minSequenceNumber())); + try { + MutableObjectIterator sorted = sorter.sort(); + BinaryRow binaryRow = new BinaryRow(sorter.arity()); + while ((binaryRow = sorted.next(binaryRow)) != null) { + InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow); + rewriter.write(rowRemovedKey); + } + } catch (Exception e) { + collectedExceptions = e; + } finally { + try { + rewriter.close(); + sorter.close(); + } catch (Exception e) { + collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions); + } + } + + if (collectedExceptions != null) { + throw collectedExceptions; + } + + return rewriter.result(); + } + private RowDataRollingFileWriter createRollingFileWriter( BinaryRow partition, int bucket, LongCounter seqNumCounter) { return new RowDataRollingFileWriter( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index a3f2083e25a2..b93c2b511509 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.append.BucketedAppendCompactManager; +import org.apache.paimon.append.cluster.BucketedAppendClusterManager; import org.apache.paimon.compact.CompactManager; import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; @@ -28,6 +29,7 @@ import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; @@ -42,6 +44,7 @@ public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite { private final String commitUser; + private final SchemaManager schemaManager; public BucketedAppendFileStoreWrite( FileIO fileIO, @@ -55,7 +58,8 @@ public BucketedAppendFileStoreWrite( FileStoreScan scan, CoreOptions options, @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, - String tableName) { + String tableName, + SchemaManager schemaManager) { super( fileIO, read, @@ -72,6 +76,7 @@ public BucketedAppendFileStoreWrite( super.withIgnorePreviousFiles(options.writeOnly()); } this.commitUser = commitUser; + this.schemaManager = schemaManager; } @Override @@ -94,6 +99,17 @@ protected CompactManager getCompactManager( @Nullable BucketedDvMaintainer dvMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); + } else if (options.bucketClusterEnabled()) { + return new BucketedAppendClusterManager( + compactExecutor, + restoredFiles, + schemaManager, + options.clusteringColumns(), + options.maxSizeAmplificationPercent(), + options.sortedRunSizeRatio(), + options.numSortedRunCompactionTrigger(), + options.numLevels(), + files -> clusterRewrite(partition, bucket, files)); } else { Function dvFactory = dvMaintainer != null diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index abbc2e4912c3..1378ecc58651 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -673,14 +673,19 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { if (options.clusteringIncrementalEnabled()) { - checkArgument( - options.bucket() == -1, - "Cannot define %s for incremental clustering table, it only support bucket = -1", - CoreOptions.BUCKET.key()); checkArgument( schema.primaryKeys().isEmpty(), "Cannot define %s for incremental clustering table.", PRIMARY_KEY.key()); + if (options.bucket() != -1) { + checkArgument( + !options.bucketAppendOrdered(), + "%s must be false for incremental clustering table.", + CoreOptions.BUCKET_APPEND_ORDERED.key()); + checkArgument( + !options.deletionVectorsEnabled(), + "Cannot enable deletion-vectors for incremental clustering table which bucket is not -1."); + } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java new file mode 100644 index 000000000000..0d2d60e69992 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.compact.CompactResult; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.BaseAppendFileStoreWrite; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.AppendOnlyFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link BucketedAppendClusterManager}. */ +public class BucketedAppendClusterManagerTest { + + @TempDir java.nio.file.Path tempDir; + @TempDir java.nio.file.Path ioManagerTempDir; + + FileStoreTable table; + BaseAppendFileStoreWrite write; + StreamTableCommit commit; + + @BeforeEach + public void before() throws Exception { + table = createFileStoreTable(); + write = + (BaseAppendFileStoreWrite) + table.store() + .newWrite("ss") + .withIOManager(IOManager.create(ioManagerTempDir.toString())); + commit = table.newStreamWriteBuilder().newCommit(); + + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + write.write(BinaryRow.EMPTY_ROW, 0, GenericRow.of(0, i, j)); + commit.commit(i, write.prepareCommit(false, i)); + } + } + } + + @Test + public void testBucketedAppendClusterTask() throws Exception { + List toCluster = + table.newSnapshotReader().read().dataSplits().get(0).dataFiles(); + + BucketedAppendClusterManager.BucketedAppendClusterTask task = + new BucketedAppendClusterManager.BucketedAppendClusterTask( + toCluster, 5, files -> write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, files)); + + CompactResult result = task.doCompact(); + assertThat(result.before().size()).isEqualTo(9); + assertThat(result.after().size()).isEqualTo(1); + List rows = new ArrayList<>(); + try (RecordReaderIterator clusterRows = + new RecordReaderIterator<>( + ((AppendOnlyFileStoreTable) table) + .store() + .newRead() + .createReader(BinaryRow.EMPTY_ROW, 0, result.after(), null))) { + while (clusterRows.hasNext()) { + InternalRow row = clusterRows.next(); + rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2))); + } + } + + assertThat(rows) + .containsExactly("0,0", "0,1", "1,0", "1,1", "0,2", "1,2", "2,0", "2,1", "2,2"); + } + + @Test + public void testTriggerCompaction() throws Exception { + List toCluster = + table.newSnapshotReader().read().dataSplits().get(0).dataFiles(); + CoreOptions options = table.coreOptions(); + BucketedAppendClusterManager manager = + new BucketedAppendClusterManager( + Executors.newSingleThreadExecutor(), + toCluster, + table.schemaManager(), + options.clusteringColumns(), + options.maxSizeAmplificationPercent(), + options.sortedRunSizeRatio(), + options.numSortedRunCompactionTrigger(), + options.numLevels(), + files -> write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, files)); + assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(9); + + manager.triggerCompaction(false); + + CompactResult compactResult = manager.getCompactionResult(true).get(); + assertThat(compactResult.before().size()).isEqualTo(9); + assertThat(compactResult.after().size()).isEqualTo(1); + + assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(1); + assertThat(manager.levels().levelSortedRuns().get(0).level()).isEqualTo(5); + } + + private FileStoreTable createFileStoreTable() throws Exception { + Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString())); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.INT()) + .column("f2", DataTypes.INT()) + .option("bucket", "1") + .option("bucket-key", "f0") + .option("compaction.min.file-num", "10") + .option("clustering.columns", "f1,f2") + .option("clustering.strategy", "zorder") + .build(); + Identifier identifier = Identifier.create("default", "test"); + catalog.createDatabase("default", false); + catalog.createTable(identifier, schema, false); + return (FileStoreTable) catalog.getTable(identifier); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java index 516fa63b55dc..cf81cdf85fd5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java @@ -57,18 +57,6 @@ public class IncrementalClusterManagerTest { @TempDir java.nio.file.Path tempDir; - @Test - public void testNonUnAwareBucketTable() { - Map options = new HashMap<>(); - options.put(CoreOptions.BUCKET.key(), "1"); - options.put(CoreOptions.BUCKET_KEY.key(), "f0"); - - assertThatThrownBy(() -> createTable(options, Collections.emptyList())) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "Cannot define bucket for incremental clustering table, it only support bucket = -1"); - } - @Test public void testNonClusterIncremental() throws Exception { Map options = new HashMap<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java new file mode 100644 index 000000000000..31fbeb5c28ee --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.cluster; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.MutableObjectIterator; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link Sorter}. */ +public class SorterTest { + @TempDir java.nio.file.Path tableTempDir; + + @TempDir java.nio.file.Path ioTempDir; + + @ParameterizedTest + @ValueSource(strings = {"hilbert", "zorder", "order"}) + public void testSorter(String curve) throws Exception { + innerTest(curve); + } + + private void innerTest(String curve) throws Exception { + FileStoreTable table = createTable(new HashMap<>(), curve); + writeOnce( + table, + GenericRow.of(2, 0, BinaryString.fromString("test")), + GenericRow.of(2, 1, BinaryString.fromString("test")), + GenericRow.of(2, 2, BinaryString.fromString("test")), + GenericRow.of(0, 0, BinaryString.fromString("test")), + GenericRow.of(0, 1, BinaryString.fromString("test")), + GenericRow.of(0, 2, BinaryString.fromString("test")), + GenericRow.of(1, 0, BinaryString.fromString("test")), + GenericRow.of(1, 1, BinaryString.fromString("test")), + GenericRow.of(1, 2, BinaryString.fromString("test"))); + IOManager ioManager = IOManager.create(ioTempDir.toString()); + ReadBuilder readBuilder = table.newReadBuilder(); + Sorter sorter = + Sorter.getSorter( + new RecordReaderIterator<>( + readBuilder.newRead().createReader(readBuilder.newScan().plan())), + ioManager, + table.rowType(), + table.coreOptions()); + List result = new ArrayList<>(); + MutableObjectIterator sorted = sorter.sort(); + BinaryRow binaryRow = new BinaryRow(sorter.arity()); + while ((binaryRow = sorted.next(binaryRow)) != null) { + InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow); + result.add(String.format("%s,%s", rowRemovedKey.getInt(0), rowRemovedKey.getInt(1))); + } + verify(curve, result); + } + + private void verify(String curve, List result) { + switch (curve) { + case "hilbert": + assertThat(result) + .containsExactly( + "0,0", "0,1", "1,1", "1,0", "2,0", "2,1", "2,2", "1,2", "0,2"); + break; + case "zorder": + assertThat(result) + .containsExactly( + "0,0", "0,1", "1,0", "1,1", "0,2", "1,2", "2,0", "2,1", "2,2"); + break; + case "order": + assertThat(result) + .containsExactly( + "0,0", "0,1", "0,2", "1,0", "1,1", "1,2", "2,0", "2,1", "2,2"); + break; + } + } + + protected FileStoreTable createTable(Map customOptions, String clusterCurve) + throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1"); + options.put(CoreOptions.CLUSTERING_STRATEGY.key(), clusterCurve); + options.putAll(customOptions); + + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()) + .getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tableTempDir.toString())); + return FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(tableTempDir.toString()), + schemaManager.createTable(schema)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index b807af5c86c2..df7fd0227e88 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -164,7 +164,11 @@ protected void buildForBucketedTableCompact( StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception { if (fullCompaction == null) { - fullCompaction = !isStreaming; + if (table.coreOptions().bucketClusterEnabled()) { + fullCompaction = false; + } else { + fullCompaction = !isStreaming; + } } else { checkArgument( !(fullCompaction && isStreaming), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 6c80070528ac..1538f8c083af 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -683,6 +683,171 @@ public void testClusterWithDeletionVector() throws Exception { assertThat(splits.get(0).deletionFiles().get().get(0)).isNull(); } + @Test + public void testClusterWithBucket() throws Exception { + Map dynamicOptions = commonOptions(); + dynamicOptions.put(CoreOptions.BUCKET.key(), "2"); + dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt"); + dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false"); + FileStoreTable table = createTable(null, dynamicOptions); + + BinaryString randomStr = BinaryString.fromString(randomString(150)); + List messages = new ArrayList<>(); + + // first write + for (int pt = 0; pt < 2; pt++) { + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + messages.addAll(write(GenericRow.of(i, j, randomStr, pt))); + } + } + } + commit(messages); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {0, 1, 3}); + List result1 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected1 = new ArrayList<>(); + for (int pt = 0; pt <= 1; pt++) { + expected1.add(String.format("+I[0, 0, %s]", pt)); + expected1.add(String.format("+I[0, 1, %s]", pt)); + expected1.add(String.format("+I[0, 2, %s]", pt)); + expected1.add(String.format("+I[1, 0, %s]", pt)); + expected1.add(String.format("+I[1, 1, %s]", pt)); + expected1.add(String.format("+I[1, 2, %s]", pt)); + expected1.add(String.format("+I[2, 0, %s]", pt)); + expected1.add(String.format("+I[2, 1, %s]", pt)); + expected1.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result1).containsExactlyElementsOf(expected1); + + // first cluster + runAction(Collections.emptyList()); + checkSnapshot(table); + List splits = readBuilder.newScan().plan().splits(); + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + List result2 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected2 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected2.add(String.format("+I[0, 0, %s]", pt)); + expected2.add(String.format("+I[0, 1, %s]", pt)); + expected2.add(String.format("+I[1, 0, %s]", pt)); + expected2.add(String.format("+I[1, 1, %s]", pt)); + expected2.add(String.format("+I[0, 2, %s]", pt)); + expected2.add(String.format("+I[1, 2, %s]", pt)); + expected2.add(String.format("+I[2, 0, %s]", pt)); + expected2.add(String.format("+I[2, 1, %s]", pt)); + expected2.add(String.format("+I[2, 2, %s]", pt)); + } + assertThat(result2).containsExactlyElementsOf(expected2); + + // second write + messages.clear(); + for (int pt = 0; pt <= 1; pt++) { + messages.addAll( + write( + GenericRow.of(0, 3, null, pt), + GenericRow.of(1, 3, null, pt), + GenericRow.of(2, 3, null, pt))); + messages.addAll( + write( + GenericRow.of(3, 0, null, pt), + GenericRow.of(3, 1, null, pt), + GenericRow.of(3, 2, null, pt), + GenericRow.of(3, 3, null, pt))); + } + commit(messages); + + List result3 = + getResult( + readBuilder.newRead(), + readBuilder.newScan().plan().splits(), + readBuilder.readType()); + List expected3 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected3.add(String.format("+I[0, 0, %s]", pt)); + expected3.add(String.format("+I[0, 1, %s]", pt)); + expected3.add(String.format("+I[1, 0, %s]", pt)); + expected3.add(String.format("+I[1, 1, %s]", pt)); + expected3.add(String.format("+I[0, 2, %s]", pt)); + expected3.add(String.format("+I[1, 2, %s]", pt)); + expected3.add(String.format("+I[2, 0, %s]", pt)); + expected3.add(String.format("+I[2, 1, %s]", pt)); + expected3.add(String.format("+I[2, 2, %s]", pt)); + expected3.add(String.format("+I[0, 3, %s]", pt)); + expected3.add(String.format("+I[1, 3, %s]", pt)); + expected3.add(String.format("+I[2, 3, %s]", pt)); + expected3.add(String.format("+I[3, 0, %s]", pt)); + expected3.add(String.format("+I[3, 1, %s]", pt)); + expected3.add(String.format("+I[3, 2, %s]", pt)); + expected3.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(result3).containsExactlyElementsOf(expected3); + + // second cluster(incremental) + runAction(Collections.emptyList()); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result4 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected4 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected4.add(String.format("+I[0, 0, %s]", pt)); + expected4.add(String.format("+I[0, 1, %s]", pt)); + expected4.add(String.format("+I[1, 0, %s]", pt)); + expected4.add(String.format("+I[1, 1, %s]", pt)); + expected4.add(String.format("+I[0, 2, %s]", pt)); + expected4.add(String.format("+I[1, 2, %s]", pt)); + expected4.add(String.format("+I[2, 0, %s]", pt)); + expected4.add(String.format("+I[2, 1, %s]", pt)); + expected4.add(String.format("+I[2, 2, %s]", pt)); + expected4.add(String.format("+I[0, 3, %s]", pt)); + expected4.add(String.format("+I[1, 3, %s]", pt)); + expected4.add(String.format("+I[3, 0, %s]", pt)); + expected4.add(String.format("+I[3, 1, %s]", pt)); + expected4.add(String.format("+I[2, 3, %s]", pt)); + expected4.add(String.format("+I[3, 2, %s]", pt)); + expected4.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); + assertThat(result4).containsExactlyElementsOf(expected4); + + // full cluster + runAction(Lists.newArrayList("--compact_strategy", "full")); + checkSnapshot(table); + splits = readBuilder.newScan().plan().splits(); + List result5 = getResult(readBuilder.newRead(), splits, readBuilder.readType()); + List expected5 = new ArrayList<>(); + for (int pt = 1; pt >= 0; pt--) { + expected5.add(String.format("+I[0, 0, %s]", pt)); + expected5.add(String.format("+I[0, 1, %s]", pt)); + expected5.add(String.format("+I[1, 0, %s]", pt)); + expected5.add(String.format("+I[1, 1, %s]", pt)); + expected5.add(String.format("+I[0, 2, %s]", pt)); + expected5.add(String.format("+I[0, 3, %s]", pt)); + expected5.add(String.format("+I[1, 2, %s]", pt)); + expected5.add(String.format("+I[1, 3, %s]", pt)); + expected5.add(String.format("+I[2, 0, %s]", pt)); + expected5.add(String.format("+I[2, 1, %s]", pt)); + expected5.add(String.format("+I[3, 0, %s]", pt)); + expected5.add(String.format("+I[3, 1, %s]", pt)); + expected5.add(String.format("+I[2, 2, %s]", pt)); + expected5.add(String.format("+I[2, 3, %s]", pt)); + expected5.add(String.format("+I[3, 2, %s]", pt)); + expected5.add(String.format("+I[3, 3, %s]", pt)); + } + assertThat(splits.size()).isEqualTo(2); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); + assertThat(result5).containsExactlyElementsOf(expected5); + } + protected FileStoreTable createTable(String partitionKeys) throws Exception { return createTable(partitionKeys, commonOptions()); }