diff --git a/.gitignore b/.gitignore index 5a59990d7..3e0130df7 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ hs_err_pid* # Ignore java-version and idea files. .java-version .idea +.vscode # Ignore Gradle project-specific cache directory .gradle diff --git a/pom.xml b/pom.xml index ff80b9544..e26fa0356 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ 3.4 1.4.2 2.4.0 + 1.2.0 2.18.2 2.43.0 0.16.1 @@ -333,6 +334,13 @@ ${delta.hive.version} + + + org.apache.paimon + paimon-bundle + ${paimon.version} + + org.apache.spark diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java index 9d89de6aa..9ea7943a7 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java @@ -27,9 +27,10 @@ public class TableFormat { public static final String HUDI = "HUDI"; public static final String ICEBERG = "ICEBERG"; public static final String DELTA = "DELTA"; + public static final String PAIMON = "PAIMON"; public static final String PARQUET = "PARQUET"; public static String[] values() { - return new String[] {"HUDI", "ICEBERG", "DELTA"}; + return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"}; } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 6bd5282c7..b27eafd34 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -110,6 +110,18 @@ test + + + org.apache.paimon + paimon-bundle + + + org.apache.paimon + paimon-spark-${spark.version.prefix} + ${paimon.version} + test + + org.apache.hadoop diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java new file mode 100644 index 000000000..aa0ac299e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.apache.xtable.model.storage.DataLayoutStrategy.HIVE_STYLE_PARTITION; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.*; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.extractor.ConversionSource; + +@Log4j2 +public class PaimonConversionSource implements ConversionSource { + + private final FileStoreTable paimonTable; + private final SchemaManager schemaManager; + private final SnapshotManager snapshotManager; + + private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance(); + private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + private final PaimonPartitionExtractor partitionSpecExtractor = + PaimonPartitionExtractor.getInstance(); + + public PaimonConversionSource(FileStoreTable paimonTable) { + this.paimonTable = paimonTable; + this.schemaManager = paimonTable.schemaManager(); + this.snapshotManager = paimonTable.snapshotManager(); + } + + @Override + public InternalTable getTable(Snapshot snapshot) { + TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId()); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + + List partitionKeys = paimonTable.partitionKeys(); + List partitioningFields = + partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema); + + DataLayoutStrategy dataLayoutStrategy = + partitioningFields.isEmpty() ? DataLayoutStrategy.FLAT : HIVE_STYLE_PARTITION; + + return InternalTable.builder() + .name(paimonTable.name()) + .tableFormat(TableFormat.PAIMON) + .readSchema(internalSchema) + .layoutStrategy(dataLayoutStrategy) + .basePath(paimonTable.location().toString()) + .partitioningFields(partitioningFields) + .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis())) + .latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString()) + .build(); + } + + @Override + public InternalTable getCurrentTable() { + SnapshotManager snapshotManager = paimonTable.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + throw new ReadException("No snapshots found for table " + paimonTable.name()); + } + return getTable(snapshot); + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + SnapshotManager snapshotManager = paimonTable.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + if (snapshot == null) { + throw new ReadException("No snapshots found for table " + paimonTable.name()); + } + + InternalTable internalTable = getTable(snapshot); + List internalDataFiles = + dataFileExtractor.toInternalDataFiles(paimonTable, snapshot); + + return InternalSnapshot.builder() + .table(internalTable) + .version(Long.toString(snapshot.timeMillis())) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + // TODO : Implement pending commits extraction, required for incremental sync + .sourceIdentifier(getCommitIdentifier(snapshot)) + .build(); + } + + @Override + public TableChange getTableChangeForCommit(Snapshot snapshot) { + throw new UnsupportedOperationException("Incremental Sync is not supported yet."); + } + + @Override + public CommitsBacklog getCommitsBacklog( + InstantsForIncrementalSync instantsForIncrementalSync) { + throw new UnsupportedOperationException("Incremental Sync is not supported yet."); + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + return false; // Incremental sync is not supported yet + } + + @Override + public String getCommitIdentifier(Snapshot snapshot) { + return Long.toString(snapshot.commitIdentifier()); + } + + @Override + public void close() throws IOException {} +} diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java new file mode 100644 index 000000000..82a28efc8 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.io.IOException; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; + +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.spi.extractor.ConversionSource; + +public class PaimonConversionSourceProvider extends ConversionSourceProvider { + @Override + public ConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) { + try { + Options catalogOptions = new Options(); + CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf); + + Path path = new Path(sourceTableConfig.getDataPath()); + FileIO fileIO = FileIO.get(path, context); + FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path); + + return new PaimonConversionSource(paimonTable); + } catch (IOException e) { + throw new ReadException(e.getMessage()); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java new file mode 100644 index 000000000..578d4423e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.storage.InternalDataFile; + +public class PaimonDataFileExtractor { + + private final PaimonPartitionExtractor partitionExtractor = + PaimonPartitionExtractor.getInstance(); + + private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor(); + + public static PaimonDataFileExtractor getInstance() { + return INSTANCE; + } + + public List toInternalDataFiles(FileStoreTable table, Snapshot snapshot) { + List result = new ArrayList<>(); + Iterator manifestEntryIterator = + newSnapshotReader(table, snapshot).readFileIterator(); + while (manifestEntryIterator.hasNext()) { + result.add(toInternalDataFile(table, manifestEntryIterator.next())); + } + return result; + } + + private InternalDataFile toInternalDataFile(FileStoreTable table, ManifestEntry entry) { + return InternalDataFile.builder() + .physicalPath(toFullPhysicalPath(table, entry)) + .fileSizeBytes(entry.file().fileSize()) + .lastModified(entry.file().creationTimeEpochMillis()) + .recordCount(entry.file().rowCount()) + .partitionValues(partitionExtractor.toPartitionValues(table, entry.partition())) + .columnStats(toColumnStats(entry.file())) + .build(); + } + + private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) { + String basePath = table.location().toString(); + String bucketPath = "bucket-" + entry.bucket(); + String filePath = entry.file().fileName(); + + Optional partitionPath = partitionExtractor.toPartitionPath(table, entry.partition()); + if (partitionPath.isPresent()) { + return String.join("/", basePath, partitionPath.get(), bucketPath, filePath); + } else { + return String.join("/", basePath, bucketPath, filePath); + } + } + + private List toColumnStats(DataFileMeta file) { + // TODO: Implement logic to extract column stats from the file meta + return Collections.emptyList(); + } + + private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) { + // If the table has primary keys, we read only the top level files + // which means we can only consider fully compacted files. + if (!table.schema().primaryKeys().isEmpty()) { + return table + .newSnapshotReader() + .withLevel(table.coreOptions().numLevels() - 1) + .withSnapshot(snapshot); + } else { + return table.newSnapshotReader().withSnapshot(snapshot); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java new file mode 100644 index 000000000..0c497470e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.*; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.InternalRowPartitionComputer; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; + +/** Extracts partition spec for Paimon as identity transforms on partition keys. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PaimonPartitionExtractor { + + private final PaimonSchemaExtractor paimonSchemaExtractor = PaimonSchemaExtractor.getInstance(); + + private static final PaimonPartitionExtractor INSTANCE = new PaimonPartitionExtractor(); + + public static PaimonPartitionExtractor getInstance() { + return INSTANCE; + } + + public List toInternalPartitionFields( + List partitionKeys, InternalSchema schema) { + if (partitionKeys == null || partitionKeys.isEmpty()) { + return Collections.emptyList(); + } + return partitionKeys.stream() + .map(key -> toPartitionField(key, schema)) + .collect(Collectors.toList()); + } + + public List toPartitionValues(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + InternalSchema internalSchema = paimonSchemaExtractor.toInternalSchema(table.schema()); + + List partitionValues = new ArrayList<>(); + for (Map.Entry entry : + partitionComputer.generatePartValues(partition).entrySet()) { + PartitionValue partitionValue = + PartitionValue.builder() + .partitionField(toPartitionField(entry.getKey(), internalSchema)) + .range(Range.scalar(entry.getValue())) + .build(); + partitionValues.add(partitionValue); + } + return partitionValues; + } + + public Optional toPartitionPath(FileStoreTable table, BinaryRow partition) { + InternalRowPartitionComputer partitionComputer = newPartitionComputer(table); + return partitionComputer.generatePartValues(partition).entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .reduce((a, b) -> a + "/" + b); + } + + private InternalPartitionField toPartitionField(String key, InternalSchema schema) { + InternalField sourceField = + findField(schema, key) + .orElseThrow( + () -> new IllegalArgumentException("Partition key not found in schema: " + key)); + return InternalPartitionField.builder() + .sourceField(sourceField) + .transformType(PartitionTransformType.VALUE) + .build(); + } + + private Optional findField(InternalSchema schema, String path) { + return schema.getAllFields().stream().filter(f -> f.getPath().equals(path)).findFirst(); + } + + private InternalRowPartitionComputer newPartitionComputer(FileStoreTable table) { + return new InternalRowPartitionComputer( + table.coreOptions().partitionDefaultName(), + table.store().partitionType(), + table.partitionKeys().toArray(new String[0]), + table.coreOptions().legacyPartitionName()); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java new file mode 100644 index 000000000..8bb669bec --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.schema.SchemaUtils; + +/** Converts Paimon RowType to XTable InternalSchema. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class PaimonSchemaExtractor { + private static final PaimonSchemaExtractor INSTANCE = new PaimonSchemaExtractor(); + + public static PaimonSchemaExtractor getInstance() { + return INSTANCE; + } + + public InternalSchema toInternalSchema(TableSchema paimonSchema) { + RowType rowType = paimonSchema.logicalRowType(); + List fields = toInternalFields(rowType); + return InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(fields) + .recordKeyFields(primaryKeyFields(paimonSchema, fields)) + .build(); + } + + private List primaryKeyFields( + TableSchema paimonSchema, List internalFields) { + List keys = paimonSchema.primaryKeys(); + return internalFields.stream() + .filter(f -> keys.contains(f.getName())) + .collect(Collectors.toList()); + } + + private List toInternalFields(RowType rowType) { + List fields = new ArrayList<>(rowType.getFieldCount()); + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField dataField = rowType.getFields().get(i); + InternalField internalField = + InternalField.builder() + .name(dataField.name()) + .fieldId(dataField.id()) + .parentPath(null) + .schema( + fromPaimonType(dataField.type(), dataField.name(), dataField.type().isNullable())) + .defaultValue( + dataField.type().isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null) + .build(); + fields.add(internalField); + } + return fields; + } + + private InternalSchema fromPaimonType(DataType type, String fieldPath, boolean nullable) { + InternalType internalType; + List fields = null; + Map metadata = null; + if (type instanceof CharType || type instanceof VarCharType) { + internalType = InternalType.STRING; + } else if (type instanceof BooleanType) { + internalType = InternalType.BOOLEAN; + } else if (type instanceof TinyIntType + || type instanceof SmallIntType + || type instanceof IntType) { + internalType = InternalType.INT; + } else if (type instanceof BigIntType) { + internalType = InternalType.LONG; + } else if (type instanceof FloatType) { + internalType = InternalType.FLOAT; + } else if (type instanceof DoubleType) { + internalType = InternalType.DOUBLE; + } else if (type instanceof BinaryType || type instanceof VarBinaryType) { + internalType = InternalType.BYTES; + } else if (type instanceof DateType) { + internalType = InternalType.DATE; + } else if (type instanceof TimestampType || type instanceof LocalZonedTimestampType) { + internalType = InternalType.TIMESTAMP; + metadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + } else if (type instanceof DecimalType) { + DecimalType d = (DecimalType) type; + metadata = new HashMap<>(2, 1.0f); + metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, d.getPrecision()); + metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, d.getScale()); + internalType = InternalType.DECIMAL; + } else if (type instanceof RowType) { + RowType rt = (RowType) type; + List nested = new ArrayList<>(rt.getFieldCount()); + for (DataField df : rt.getFields()) { + nested.add( + InternalField.builder() + .name(df.name()) + .fieldId(df.id()) + .parentPath(fieldPath) + .schema( + fromPaimonType( + df.type(), + SchemaUtils.getFullyQualifiedPath(fieldPath, df.name()), + df.type().isNullable())) + .defaultValue( + df.type().isNullable() ? InternalField.Constants.NULL_DEFAULT_VALUE : null) + .build()); + } + fields = nested; + internalType = InternalType.RECORD; + } else if (type instanceof ArrayType) { + ArrayType at = (ArrayType) type; + InternalSchema elementSchema = + fromPaimonType( + at.getElementType(), + SchemaUtils.getFullyQualifiedPath( + fieldPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME), + at.getElementType().isNullable()); + InternalField elementField = + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath(fieldPath) + .schema(elementSchema) + .build(); + fields = Collections.singletonList(elementField); + internalType = InternalType.LIST; + } else if (type instanceof MapType) { + MapType mt = (MapType) type; + InternalSchema keySchema = + fromPaimonType( + mt.getKeyType(), + SchemaUtils.getFullyQualifiedPath( + fieldPath, InternalField.Constants.MAP_KEY_FIELD_NAME), + false); + InternalField keyField = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .parentPath(fieldPath) + .schema(keySchema) + .build(); + InternalSchema valueSchema = + fromPaimonType( + mt.getValueType(), + SchemaUtils.getFullyQualifiedPath( + fieldPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), + mt.getValueType().isNullable()); + InternalField valueField = + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath(fieldPath) + .schema(valueSchema) + .build(); + fields = Arrays.asList(keyField, valueField); + internalType = InternalType.MAP; + } else { + throw new NotSupportedException("Unsupported Paimon type: " + type.asSQLString()); + } + + return InternalSchema.builder() + .name(type.asSQLString()) + .dataType(internalType) + .isNullable(nullable) + .metadata(metadata) + .fields(fields) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java index 14395e0db..a5670eacd 100644 --- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java @@ -21,6 +21,7 @@ import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; import static org.apache.xtable.model.storage.TableFormat.ICEBERG; +import static org.apache.xtable.model.storage.TableFormat.PAIMON; import static org.apache.xtable.model.storage.TableFormat.PARQUET; import java.nio.file.Path; @@ -91,6 +92,9 @@ static GenericTable getInstance( case ICEBERG: return TestIcebergTable.forStandardSchemaAndPartitioning( tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); + case PAIMON: + return TestPaimonTable.createTable( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration(), false); default: throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); } @@ -113,6 +117,9 @@ static GenericTable getInstanceWithAdditionalColumns( case ICEBERG: return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning( tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); + case PAIMON: + return TestPaimonTable.createTable( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration(), true); default: throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); } diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index bda54c0f6..1ffaa5369 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -24,6 +24,7 @@ import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; import static org.apache.xtable.model.storage.TableFormat.ICEBERG; +import static org.apache.xtable.model.storage.TableFormat.PAIMON; import static org.apache.xtable.model.storage.TableFormat.PARQUET; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -66,6 +67,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -104,9 +106,12 @@ import org.apache.xtable.iceberg.TestIcebergDataHelper; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.paimon.PaimonConversionSourceProvider; public class ITConversionController { - @TempDir public static Path tempDir; + @TempDir(cleanup = CleanupMode.NEVER) // TODO remove CleanupMode.NEVER after debugging + public static Path tempDir; + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -123,6 +128,15 @@ public static void setupOnce() { .sparkContext() .hadoopConfiguration() .set("parquet.avro.write-old-list-structure", "false"); + sparkSession + .sparkContext() + .conf() + .set( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .set("spark.sql.catalog.paimon.warehouse", tempDir.toUri().toString()); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); } @@ -142,10 +156,13 @@ private static Stream testCasesWithPartitioningAndSyncModes() { private static Stream generateTestParametersForFormatsSyncModesAndPartitioning() { List arguments = new ArrayList<>(); - for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) { + for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) { for (SyncMode syncMode : SyncMode.values()) { + if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL) + continue; // Paimon does not support incremental sync yet + for (boolean isPartitioned : new boolean[] {true, false}) { - arguments.add(Arguments.of(sourceTableFormat, syncMode, isPartitioned)); + arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned)); } } } @@ -170,23 +187,37 @@ private static Stream testCasesWithSyncModes() { } private ConversionSourceProvider getConversionSourceProvider(String sourceTableFormat) { - if (sourceTableFormat.equalsIgnoreCase(HUDI)) { - ConversionSourceProvider hudiConversionSourceProvider = - new HudiConversionSourceProvider(); - hudiConversionSourceProvider.init(jsc.hadoopConfiguration()); - return hudiConversionSourceProvider; - } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) { - ConversionSourceProvider deltaConversionSourceProvider = - new DeltaConversionSourceProvider(); - deltaConversionSourceProvider.init(jsc.hadoopConfiguration()); - return deltaConversionSourceProvider; - } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) { - ConversionSourceProvider icebergConversionSourceProvider = - new IcebergConversionSourceProvider(); - icebergConversionSourceProvider.init(jsc.hadoopConfiguration()); - return icebergConversionSourceProvider; - } else { - throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); + switch (sourceTableFormat.toUpperCase()) { + case HUDI: + { + ConversionSourceProvider hudiConversionSourceProvider = + new HudiConversionSourceProvider(); + hudiConversionSourceProvider.init(jsc.hadoopConfiguration()); + return hudiConversionSourceProvider; + } + case DELTA: + { + ConversionSourceProvider deltaConversionSourceProvider = + new DeltaConversionSourceProvider(); + deltaConversionSourceProvider.init(jsc.hadoopConfiguration()); + return deltaConversionSourceProvider; + } + case ICEBERG: + { + ConversionSourceProvider icebergConversionSourceProvider = + new IcebergConversionSourceProvider(); + icebergConversionSourceProvider.init(jsc.hadoopConfiguration()); + return icebergConversionSourceProvider; + } + case PAIMON: + { + ConversionSourceProvider paimonConversionSourceProvider = + new PaimonConversionSourceProvider(); + paimonConversionSourceProvider.init(jsc.hadoopConfiguration()); + return paimonConversionSourceProvider; + } + default: + throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); } } @@ -486,11 +517,9 @@ public void testTimeTravelQueries(String sourceTableFormat) throws Exception { private static List getOtherFormats(String sourceTableFormat) { return Arrays.stream(TableFormat.values()) - .filter( - format -> - !format.equals(sourceTableFormat) - && !format.equals( - PARQUET)) // excluded file formats because upset, insert etc. not supported + .filter(fmt -> !fmt.equals(sourceTableFormat)) + .filter(fmt -> !fmt.equals(PAIMON)) // Paimon target is not supported yet + .filter(fmt -> !fmt.equals(PARQUET)) // upserts/inserts are not supported in Parquet .collect(Collectors.toList()); } @@ -911,34 +940,34 @@ private void checkDatasetEquivalence( })); String[] selectColumnsArr = sourceTable.getColumnsToSelect().toArray(new String[] {}); - List dataset1Rows = sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList(); + List sourceRowsList = sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList(); targetRowsByFormat.forEach( - (format, targetRows) -> { - List dataset2Rows = + (targetFormat, targetRows) -> { + List targetRowsList = targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList(); assertEquals( - dataset1Rows.size(), - dataset2Rows.size(), + sourceRowsList.size(), + targetRowsList.size(), String.format( "Datasets have different row counts when reading from Spark. Source: %s, Target: %s", - sourceFormat, format)); + sourceFormat, targetFormat)); // sanity check the count to ensure test is set up properly if (expectedCount != null) { - assertEquals(expectedCount, dataset1Rows.size()); + assertEquals(expectedCount, sourceRowsList.size()); } else { // if count is not known ahead of time, ensure datasets are non-empty - assertFalse(dataset1Rows.isEmpty()); + assertFalse(sourceRowsList.isEmpty()); } - if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) { - compareDatasetWithUUID(dataset1Rows, dataset2Rows); + if (containsUUIDFields(sourceRowsList) && containsUUIDFields(targetRowsList)) { + compareDatasetWithUUID(sourceRowsList, targetRowsList); } else { assertEquals( - dataset1Rows, - dataset2Rows, + sourceRowsList, + targetRowsList, String.format( "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", - sourceFormat, format)); + sourceFormat, targetFormat)); } }); } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java new file mode 100644 index 000000000..551020074 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.BucketEntry; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.ParameterUtils; + +public class TestPaimonTable implements GenericTable { + + private final Random random = new Random(); + private final FileStoreTable paimonTable; + private final String partitionField; + + public TestPaimonTable(FileStoreTable paimonTable, String partitionField) { + this.paimonTable = paimonTable; + this.partitionField = partitionField; + } + + public static GenericTable createTable( + String tableName, + String partitionField, + Path tempDir, + Configuration hadoopConf, + boolean additionalColumns) { + String basePath = initBasePath(tempDir, tableName); + Catalog catalog = createFilesystemCatalog(basePath, hadoopConf); + FileStoreTable paimonTable = createTable(catalog, partitionField, additionalColumns); + + System.out.println( + "Initialized Paimon test table at base path: " + + basePath + + " with partition field: " + + partitionField + + " and additional columns: " + + additionalColumns); + + return new TestPaimonTable(paimonTable, partitionField); + } + + public static Catalog createFilesystemCatalog(String basePath, Configuration hadoopConf) { + CatalogContext context = CatalogContext.create(new org.apache.paimon.fs.Path(basePath)); + return CatalogFactory.createCatalog(context); + } + + public static FileStoreTable createTable( + Catalog catalog, String partitionField, boolean additionalColumns) { + try { + catalog.createDatabase("test_db", true); + Identifier identifier = Identifier.create("test_db", "test_table"); + Schema schema = buildSchema(partitionField, additionalColumns); + catalog.createTable(identifier, schema, true); + return (FileStoreTable) catalog.getTable(identifier); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static Schema buildSchema(String partitionField, boolean additionalColumns) { + Schema.Builder builder = + Schema.newBuilder() + .primaryKey("id") + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("value", DataTypes.DOUBLE()) + .column("created_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .column("updated_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .column("is_active", DataTypes.BOOLEAN()) + .column("description", DataTypes.VARCHAR(255)) + .option("bucket", "1") + .option("bucket-key", "id") + .option("full-compaction.delta-commits", "1"); + + if (partitionField != null) { + builder + .primaryKey("id", partitionField) + .column(partitionField, DataTypes.STRING()) + .partitionKeys(partitionField); + } + + if (additionalColumns) { + builder.column("extra_info", DataTypes.STRING()).column("extra_value", DataTypes.DOUBLE()); + } + + return builder.build(); + } + + private GenericRow buildGenericRow(int rowIdx, TableSchema schema, String partitionValue) { + List rowValues = new ArrayList<>(schema.fields().size()); + for (int i = 0; i < schema.fields().size(); i++) { + DataField field = schema.fields().get(i); + if (field.name().equals(partitionField)) { + rowValues.add(BinaryString.fromString(partitionValue)); + } else if (field.type() instanceof IntType) { + rowValues.add(random.nextInt()); + } else if (field.type() instanceof DoubleType) { + rowValues.add(random.nextDouble()); + } else if (field.type() instanceof VarCharType) { + rowValues.add(BinaryString.fromString(field.name() + "_" + rowIdx + "_" + i)); + } else if (field.type() instanceof LocalZonedTimestampType) { + rowValues.add(Timestamp.fromEpochMillis(System.currentTimeMillis())); + } else if (field.type() instanceof BooleanType) { + rowValues.add(random.nextBoolean()); + } else { + throw new UnsupportedOperationException("Unsupported field type: " + field.type()); + } + } + + return GenericRow.of(rowValues.toArray()); + } + + private static String initBasePath(Path tempDir, String tableName) { + try { + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + return basePath.toUri().toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List insertRows(int numRows) { + String partitionValue = LEVEL_VALUES.get(0); + return insertRecordsToPartition(numRows, partitionValue); + } + + @Override + public List insertRecordsForSpecialPartition(int numRows) { + return insertRecordsToPartition(numRows, SPECIAL_PARTITION_VALUE); + } + + private List insertRecordsToPartition(int numRows, String partitionValue) { + BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder(); + try (BatchTableWrite writer = batchWriteBuilder.newWrite()) { + List rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + GenericRow row = buildGenericRow(i, paimonTable.schema(), partitionValue); + writer.write(row); + rows.add(row); + } + commitWrites(batchWriteBuilder, writer); + compactTable(); + return rows; + } catch (Exception e) { + throw new RuntimeException("Failed to insert rows into Paimon table", e); + } + } + + @Override + public void upsertRows(List rows) { + BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder(); + try (BatchTableWrite writer = batchWriteBuilder.newWrite()) { + for (GenericRow row : rows) { + writer.write(row); + } + commitWrites(batchWriteBuilder, writer); + compactTable(); + } catch (Exception e) { + throw new RuntimeException("Failed to upsert rows into Paimon table", e); + } + } + + @Override + public void deleteRows(List rows) { + BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder(); + try (BatchTableWrite writer = batchWriteBuilder.newWrite()) { + for (GenericRow row : rows) { + row.setRowKind(RowKind.DELETE); + writer.write(row); + } + commitWrites(batchWriteBuilder, writer); + compactTable(); + } catch (Exception e) { + throw new RuntimeException("Failed to delete rows from Paimon table", e); + } + } + + private void compactTable() { + BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder(); + SnapshotReader snapshotReader = paimonTable.newSnapshotReader(); + try (BatchTableWrite writer = batchWriteBuilder.newWrite()) { + for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) { + writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true); + } + commitWrites(batchWriteBuilder, writer); + } catch (Exception e) { + throw new RuntimeException("Failed to compact writes in Paimon table", e); + } + } + + private static void commitWrites(BatchWriteBuilder batchWriteBuilder, BatchTableWrite writer) + throws Exception { + BatchTableCommit commit = batchWriteBuilder.newCommit(); + List messages = writer.prepareCommit(); + try { + commit.commit(messages); + } catch (Exception e) { + commit.abort(messages); + throw new RuntimeException("Failed to commit writes to Paimon table", e); + } finally { + commit.close(); + } + } + + @Override + public void deletePartition(String partitionValue) { + try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) { + commit.truncatePartitions( + ParameterUtils.getPartitions(partitionField + "=" + partitionValue)); + } catch (Exception e) { + throw new RuntimeException("Failed to delete partition from Paimon table", e); + } + } + + @Override + public void deleteSpecialPartition() { + deletePartition(SPECIAL_PARTITION_VALUE); + } + + @Override + public String getBasePath() { + return paimonTable.location().toString(); + } + + @Override + public String getMetadataPath() { + return paimonTable.snapshotManager().snapshotDirectory().toString(); + } + + @Override + public String getOrderByColumn() { + return "id"; + } + + @Override + public void close() {} + + @Override + public void reload() {} + + @Override + public List getColumnsToSelect() { + return paimonTable.schema().fieldNames().stream() + .filter( + // TODO Hudi thinks that paimon buckets are partition values, not sure how to handle it + // filtering out the partition field on the comparison for now + field -> !field.equals(partitionField)) + .collect(Collectors.toList()); + } + + @Override + public String getFilterQuery() { + return "id % 2 = 0"; + } + + public FileStoreTable getPaimonTable() { + return paimonTable; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java new file mode 100644 index 000000000..fff8e7ce6 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.file.Path; +import java.time.Instant; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.FileStoreTable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.xtable.GenericTable; +import org.apache.xtable.TestPaimonTable; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; + +public class TestPaimonConversionSource { + + @TempDir private Path tempDir; + + private Configuration hadoopConf; + private TestPaimonTable testTable; + private FileStoreTable paimonTable; + private PaimonConversionSource conversionSource; + + @BeforeEach + void setUp() { + hadoopConf = new Configuration(); + testTable = + ((TestPaimonTable) + TestPaimonTable.createTable("test_table", "level", tempDir, hadoopConf, false)); + paimonTable = testTable.getPaimonTable(); + conversionSource = new PaimonConversionSource(paimonTable); + } + + @Test + void testGetTableWithPartitionedTable() { + testTable.insertRows(5); + + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(snapshot); + + InternalTable result = conversionSource.getTable(snapshot); + + assertNotNull(result); + assertEquals("test_table", result.getName()); + assertEquals(TableFormat.PAIMON, result.getTableFormat()); + assertNotNull(result.getReadSchema()); + assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, result.getLayoutStrategy()); + assertTrue(result.getBasePath().contains("test_table")); + assertEquals(1, result.getPartitioningFields().size()); + assertEquals("level", result.getPartitioningFields().get(0).getSourceField().getName()); + assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()), result.getLatestCommitTime()); + assertNotNull(result.getLatestMetadataPath()); + } + + @Test + void testGetTableWithUnpartitionedTable() { + GenericTable unpartitionedTable = + TestPaimonTable.createTable("unpartitioned_table", null, tempDir, hadoopConf, false); + FileStoreTable unpartitionedPaimonTable = + ((TestPaimonTable) unpartitionedTable).getPaimonTable(); + PaimonConversionSource unpartitionedSource = + new PaimonConversionSource(unpartitionedPaimonTable); + + unpartitionedTable.insertRows(3); + + Snapshot snapshot = unpartitionedPaimonTable.snapshotManager().latestSnapshot(); + assertNotNull(snapshot); + + InternalTable result = unpartitionedSource.getTable(snapshot); + + assertNotNull(result); + assertEquals("test_table", result.getName()); + assertEquals(TableFormat.PAIMON, result.getTableFormat()); + assertNotNull(result.getReadSchema()); + assertEquals(DataLayoutStrategy.FLAT, result.getLayoutStrategy()); + assertTrue(result.getBasePath().contains("unpartitioned_table")); + assertEquals(0, result.getPartitioningFields().size()); + assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()), result.getLatestCommitTime()); + assertNotNull(result.getLatestMetadataPath()); + } + + @Test + void testGetCurrentTableSuccess() { + testTable.insertRows(3); + + InternalTable result = conversionSource.getCurrentTable(); + + assertNotNull(result); + assertEquals(TableFormat.PAIMON, result.getTableFormat()); + assertEquals("test_table", result.getName()); + assertNotNull(result.getReadSchema()); + assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, result.getLayoutStrategy()); + assertEquals(1, result.getPartitioningFields().size()); + } + + @Test + void testGetCurrentTableThrowsExceptionWhenNoSnapshot() { + GenericTable emptyTable = + TestPaimonTable.createTable("empty_table", "level", tempDir, hadoopConf, false); + FileStoreTable emptyPaimonTable = ((TestPaimonTable) emptyTable).getPaimonTable(); + PaimonConversionSource emptySource = new PaimonConversionSource(emptyPaimonTable); + + ReadException exception = assertThrows(ReadException.class, emptySource::getCurrentTable); + + assertTrue(exception.getMessage().contains("No snapshots found for table")); + } + + @Test + void testGetCurrentSnapshotSuccess() { + testTable.insertRows(5); + + InternalSnapshot result = conversionSource.getCurrentSnapshot(); + + assertNotNull(result); + assertNotNull(result.getTable()); + assertEquals(TableFormat.PAIMON, result.getTable().getTableFormat()); + assertNotNull(result.getVersion()); + assertNotNull(result.getSourceIdentifier()); + assertNotNull(result.getPartitionedDataFiles()); + + List partitionFileGroups = result.getPartitionedDataFiles(); + assertFalse(partitionFileGroups.isEmpty()); + assertTrue(partitionFileGroups.stream().allMatch(group -> !group.getDataFiles().isEmpty())); + } + + @Test + void testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() { + GenericTable emptyTable = + TestPaimonTable.createTable("empty_table2", "level", tempDir, hadoopConf, false); + FileStoreTable emptyPaimonTable = ((TestPaimonTable) emptyTable).getPaimonTable(); + PaimonConversionSource emptySource = new PaimonConversionSource(emptyPaimonTable); + + ReadException exception = assertThrows(ReadException.class, emptySource::getCurrentSnapshot); + + assertTrue(exception.getMessage().contains("No snapshots found for table")); + } + + @Test + void testGetTableChangeForCommitThrowsUnsupportedOperationException() { + testTable.insertRows(3); + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + + UnsupportedOperationException exception = + assertThrows( + UnsupportedOperationException.class, + () -> conversionSource.getTableChangeForCommit(snapshot)); + + assertEquals("Incremental Sync is not supported yet.", exception.getMessage()); + } + + @Test + void testGetCommitsBacklogThrowsUnsupportedOperationException() { + InstantsForIncrementalSync mockInstants = + InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build(); + + UnsupportedOperationException exception = + assertThrows( + UnsupportedOperationException.class, + () -> conversionSource.getCommitsBacklog(mockInstants)); + + assertEquals("Incremental Sync is not supported yet.", exception.getMessage()); + } + + @Test + void testIsIncrementalSyncSafeFromReturnsFalse() { + Instant testInstant = Instant.now(); + + boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant); + + assertFalse(result); + } + + @Test + void testGetCommitIdentifier() { + testTable.insertRows(3); + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + + String result = conversionSource.getCommitIdentifier(snapshot); + + assertNotNull(result); + assertEquals(String.valueOf(snapshot.commitIdentifier()), result); + } + + @Test + void testCloseDoesNotThrowException() { + assertDoesNotThrow(() -> conversionSource.close()); + } + + @Test + void testConstructorInitializesFieldsCorrectly() { + assertNotNull(conversionSource); + + testTable.insertRows(1); + assertDoesNotThrow(() -> conversionSource.getCurrentTable()); + } + + @Test + void testMultipleSnapshots() { + testTable.insertRows(2); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + testTable.insertRows(3); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); + + assertNotEquals(firstSnapshot.id(), secondSnapshot.id()); + + InternalTable firstTable = conversionSource.getTable(firstSnapshot); + InternalTable secondTable = conversionSource.getTable(secondSnapshot); + + assertNotNull(firstTable); + assertNotNull(secondTable); + assertEquals(firstTable.getName(), secondTable.getName()); + assertEquals(firstTable.getTableFormat(), secondTable.getTableFormat()); + } + + @Test + void testSchemaEvolution() { + testTable.insertRows(2); + + GenericTable tableWithExtraColumns = + TestPaimonTable.createTable("table_with_extra", "level", tempDir, hadoopConf, true); + FileStoreTable extraColumnsPaimonTable = + ((TestPaimonTable) tableWithExtraColumns).getPaimonTable(); + PaimonConversionSource extraColumnsSource = new PaimonConversionSource(extraColumnsPaimonTable); + + tableWithExtraColumns.insertRows(2); + + InternalTable originalTable = conversionSource.getCurrentTable(); + InternalTable expandedTable = extraColumnsSource.getCurrentTable(); + + assertNotNull(originalTable); + assertNotNull(expandedTable); + + assertTrue( + expandedTable.getReadSchema().getFields().size() + >= originalTable.getReadSchema().getFields().size()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java new file mode 100644 index 000000000..dc8945a20 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.table.FileStoreTable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.xtable.TestPaimonTable; +import org.apache.xtable.model.storage.InternalDataFile; + +public class TestPaimonDataFileExtractor { + private static final PaimonDataFileExtractor extractor = PaimonDataFileExtractor.getInstance(); + + @TempDir private Path tempDir; + private TestPaimonTable testTable; + private FileStoreTable paimonTable; + + @Test + void testToInternalDataFilesWithUnpartitionedTable() { + createUnpartitionedTable(); + + // Insert some data to create files + testTable.insertRows(5); + + List result = + extractor.toInternalDataFiles(paimonTable, paimonTable.snapshotManager().latestSnapshot()); + + assertNotNull(result); + assertFalse(result.isEmpty()); + + InternalDataFile dataFile = result.get(0); + assertNotNull(dataFile.getPhysicalPath()); + assertTrue(dataFile.getPhysicalPath().contains("bucket-")); + assertTrue(dataFile.getFileSizeBytes() > 0); + assertEquals(5, dataFile.getRecordCount()); + assertEquals(0, dataFile.getPartitionValues().size()); + } + + @Test + void testToInternalDataFilesWithPartitionedTable() { + createPartitionedTable(); + + // Insert some data to create files + testTable.insertRows(5); + + List result = + extractor.toInternalDataFiles(paimonTable, paimonTable.snapshotManager().latestSnapshot()); + + assertNotNull(result); + assertFalse(result.isEmpty()); + + InternalDataFile dataFile = result.get(0); + assertNotNull(dataFile.getPhysicalPath()); + assertTrue(dataFile.getPhysicalPath().contains("bucket-")); + assertTrue(dataFile.getFileSizeBytes() > 0); + assertEquals(5, dataFile.getRecordCount()); + assertNotNull(dataFile.getPartitionValues()); + } + + @Test + void testToInternalDataFilesWithTableWithPrimaryKeys() { + createTableWithPrimaryKeys(); + + // Insert some data to create files + testTable.insertRows(5); + + // Get the latest snapshot + List result = + extractor.toInternalDataFiles(paimonTable, paimonTable.snapshotManager().latestSnapshot()); + + assertNotNull(result); + assertFalse(result.isEmpty()); + + InternalDataFile dataFile = result.get(0); + assertNotNull(dataFile.getPhysicalPath()); + assertTrue(dataFile.getFileSizeBytes() > 0); + assertEquals(5, dataFile.getRecordCount()); + } + + @Test + void testPhysicalPathFormat() { + createUnpartitionedTable(); + + // Insert data + testTable.insertRows(2); + + List result = + extractor.toInternalDataFiles(paimonTable, paimonTable.snapshotManager().latestSnapshot()); + + assertFalse(result.isEmpty()); + + for (InternalDataFile dataFile : result) { + String path = dataFile.getPhysicalPath(); + assertTrue(path.contains("bucket-")); + assertTrue(path.endsWith(".orc") || path.endsWith(".parquet")); + } + } + + @Test + void testColumnStatsAreEmpty() { + createUnpartitionedTable(); + + testTable.insertRows(1); + + List result = + extractor.toInternalDataFiles(paimonTable, paimonTable.snapshotManager().latestSnapshot()); + + assertFalse(result.isEmpty()); + for (InternalDataFile dataFile : result) { + assertEquals(0, dataFile.getColumnStats().size()); + } + } + + private void createUnpartitionedTable() { + testTable = + (TestPaimonTable) + TestPaimonTable.createTable("test_table", null, tempDir, new Configuration(), false); + paimonTable = testTable.getPaimonTable(); + } + + private void createPartitionedTable() { + testTable = + (TestPaimonTable) + TestPaimonTable.createTable("test_table", "level", tempDir, new Configuration(), false); + paimonTable = testTable.getPaimonTable(); + } + + private void createTableWithPrimaryKeys() { + testTable = + (TestPaimonTable) + TestPaimonTable.createTable("test_table", null, tempDir, new Configuration(), false); + paimonTable = testTable.getPaimonTable(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java new file mode 100644 index 000000000..b4394f794 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.table.FileStoreTable; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.xtable.TestPaimonTable; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; + +public class TestPaimonPartitionExtractor { + private static final PaimonPartitionExtractor extractor = PaimonPartitionExtractor.getInstance(); + + @TempDir private Path tempDir; + + @Test + void testToInternalPartitionFieldsWithEmptyKeys() { + InternalSchema schema = createMockSchema(); + + List result = extractor.toInternalPartitionFields(null, schema); + assertEquals(Collections.emptyList(), result); + + result = extractor.toInternalPartitionFields(Collections.emptyList(), schema); + assertEquals(Collections.emptyList(), result); + } + + @Test + void testToInternalPartitionFieldsWithSingleKey() { + InternalSchema schema = createMockSchema(); + List partitionKeys = Collections.singletonList("level"); + + List result = + extractor.toInternalPartitionFields(partitionKeys, schema); + + assertEquals(1, result.size()); + InternalPartitionField partitionField = result.get(0); + assertEquals("level", partitionField.getSourceField().getName()); + assertEquals(PartitionTransformType.VALUE, partitionField.getTransformType()); + } + + @Test + void testToInternalPartitionFieldsWithMultipleKeys() { + InternalSchema schema = createMockSchema(); + List partitionKeys = Arrays.asList("level", "status"); + + List result = + extractor.toInternalPartitionFields(partitionKeys, schema); + + assertEquals(2, result.size()); + assertEquals("level", result.get(0).getSourceField().getName()); + assertEquals("status", result.get(1).getSourceField().getName()); + assertEquals(PartitionTransformType.VALUE, result.get(0).getTransformType()); + assertEquals(PartitionTransformType.VALUE, result.get(1).getTransformType()); + } + + @Test + void testToInternalPartitionFieldsWithMissingKey() { + InternalSchema schema = createMockSchema(); + List partitionKeys = Collections.singletonList("missing_key"); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> extractor.toInternalPartitionFields(partitionKeys, schema)); + + assertTrue(exception.getMessage().contains("Partition key not found in schema: missing_key")); + } + + @Test + void testToPartitionValuesWithPartitionedTable() { + TestPaimonTable testTable = createPartitionedTable(); + FileStoreTable paimonTable = testTable.getPaimonTable(); + + testTable.insertRows(1); + + BinaryRow partition = BinaryRow.singleColumn("INFO"); + + List result = extractor.toPartitionValues(paimonTable, partition); + + assertEquals(1, result.size()); + PartitionValue partitionValue = result.get(0); + assertEquals("level", partitionValue.getPartitionField().getSourceField().getName()); + assertEquals(Range.scalar("INFO"), partitionValue.getRange()); + } + + @Test + @Disabled("TODO: make it easier to create multi-partitioned table in tests") + void testToPartitionPathWithMultiplePartitionValues() { + // TODO this table is fixed at single partition, need to create a multi-partitioned table + TestPaimonTable testTable = createPartitionedTable(); + FileStoreTable paimonTable = testTable.getPaimonTable(); + + BinaryRow partition = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeString(0, BinaryString.fromString("INFO")); + writer.writeString(1, BinaryString.fromString("active")); + writer.complete(); + + Optional result = extractor.toPartitionPath(paimonTable, partition); + + assertTrue(result.isPresent()); + assertEquals("level=INFO/level=DEBUG", result.get()); + } + + @Test + void testToPartitionPathWithEmptyPartitions() { + TestPaimonTable testTable = createUnpartitionedTable(); + FileStoreTable paimonTable = testTable.getPaimonTable(); + + BinaryRow emptyPartition = BinaryRow.EMPTY_ROW; + + Optional result = extractor.toPartitionPath(paimonTable, emptyPartition); + + assertFalse(result.isPresent()); + } + + private InternalSchema createMockSchema() { + InternalField levelField = + InternalField.builder() + .name("level") + .schema( + InternalSchema.builder() + .name("STRING") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(); + + InternalField statusField = + InternalField.builder() + .name("status") + .schema( + InternalSchema.builder() + .name("STRING") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(); + + return InternalSchema.builder() + .name("test_schema") + .dataType(InternalType.RECORD) + .fields(Arrays.asList(levelField, statusField)) + .build(); + } + + private TestPaimonTable createPartitionedTable() { + return (TestPaimonTable) + TestPaimonTable.createTable("test_table", "level", tempDir, new Configuration(), false); + } + + private TestPaimonTable createUnpartitionedTable() { + return (TestPaimonTable) + TestPaimonTable.createTable("test_table", null, tempDir, new Configuration(), false); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java new file mode 100644 index 000000000..4952bf752 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.paimon; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestPaimonSchemaExtractor { + private static final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance(); + + private void assertField(DataField paimonField, InternalField expectedInternalField) { + assertField(paimonField, expectedInternalField, Collections.emptyList()); + } + + private void assertField( + DataField paimonField, InternalField expectedInternalField, List primaryKeys) { + TableSchema paimonSchema = + new TableSchema( + 0, + Collections.singletonList(paimonField), + 0, + Collections.emptyList(), + primaryKeys, + new HashMap<>(), + ""); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + List recordKeyFields = + primaryKeys.isEmpty() + ? Collections.emptyList() + : Collections.singletonList(expectedInternalField); + InternalSchema expectedSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Collections.singletonList(expectedInternalField)) + .recordKeyFields(recordKeyFields) + .build(); + assertEquals(expectedSchema, internalSchema); + } + + @Test + void testCharField() { + DataField paimonField = new DataField(0, "char_field", new CharType(10)); + InternalField expectedField = + InternalField.builder() + .name("char_field") + .fieldId(0) + .schema( + InternalSchema.builder() + .name("CHAR(10)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testVarcharField() { + DataField paimonField = new DataField(1, "varchar_field", new VarCharType(255)); + InternalField expectedField = + InternalField.builder() + .name("varchar_field") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBooleanField() { + DataField paimonField = new DataField(2, "boolean_field", new BooleanType()); + InternalField expectedField = + InternalField.builder() + .name("boolean_field") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("BOOLEAN") + .dataType(InternalType.BOOLEAN) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTinyIntField() { + DataField paimonField = new DataField(3, "tinyint_field", new TinyIntType()); + InternalField expectedField = + InternalField.builder() + .name("tinyint_field") + .fieldId(3) + .schema( + InternalSchema.builder() + .name("TINYINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testSmallIntField() { + DataField paimonField = new DataField(4, "smallint_field", new SmallIntType()); + InternalField expectedField = + InternalField.builder() + .name("smallint_field") + .fieldId(4) + .schema( + InternalSchema.builder() + .name("SMALLINT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testIntField() { + DataField paimonField = new DataField(5, "int_field", new IntType()); + InternalField expectedField = + InternalField.builder() + .name("int_field") + .fieldId(5) + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testBigIntField() { + DataField paimonField = new DataField(6, "bigint_field", new BigIntType()); + InternalField expectedField = + InternalField.builder() + .name("bigint_field") + .fieldId(6) + .schema( + InternalSchema.builder() + .name("BIGINT") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testFloatField() { + DataField paimonField = new DataField(7, "float_field", new FloatType()); + InternalField expectedField = + InternalField.builder() + .name("float_field") + .fieldId(7) + .schema( + InternalSchema.builder() + .name("FLOAT") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDoubleField() { + DataField paimonField = new DataField(8, "double_field", new DoubleType()); + InternalField expectedField = + InternalField.builder() + .name("double_field") + .fieldId(8) + .schema( + InternalSchema.builder() + .name("DOUBLE") + .dataType(InternalType.DOUBLE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDateField() { + DataField paimonField = new DataField(9, "date_field", new DateType()); + InternalField expectedField = + InternalField.builder() + .name("date_field") + .fieldId(9) + .schema( + InternalSchema.builder() + .name("DATE") + .dataType(InternalType.DATE) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testTimestampField() { + DataField paimonField = new DataField(10, "timestamp_field", new TimestampType(3)); + Map timestampMetadata = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + InternalField expectedField = + InternalField.builder() + .name("timestamp_field") + .fieldId(10) + .schema( + InternalSchema.builder() + .name("TIMESTAMP(3)") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testDecimalField() { + DataField paimonField = new DataField(11, "decimal_field", new DecimalType(10, 2)); + Map decimalMetadata = new HashMap<>(); + decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10); + decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2); + InternalField expectedField = + InternalField.builder() + .name("decimal_field") + .fieldId(11) + .schema( + InternalSchema.builder() + .name("DECIMAL(10, 2)") + .dataType(InternalType.DECIMAL) + .isNullable(true) + .metadata(decimalMetadata) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testStructField() { + DataField paimonField = + new DataField( + 12, + "struct_field", + RowType.of( + new DataType[] {new IntType(), new VarCharType(255)}, + new String[] {"nested_int", "nested_varchar"})); + InternalField nestedIntField = + InternalField.builder() + .name("nested_int") + .fieldId(0) + .parentPath("struct_field") + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + InternalField nestedVarcharField = + InternalField.builder() + .name("nested_varchar") + .fieldId(1) + .parentPath("struct_field") + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + InternalField expectedField = + InternalField.builder() + .name("struct_field") + .fieldId(12) + .schema( + InternalSchema.builder() + .name("ROW<`nested_int` INT, `nested_varchar` VARCHAR(255)>") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields(Arrays.asList(nestedIntField, nestedVarcharField)) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testArrayField() { + DataField paimonField = new DataField(13, "array_field", new ArrayType(new IntType())); + InternalField arrayElementField = + InternalField.builder() + .name("_one_field_element") + .parentPath("array_field") + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(); + InternalField expectedField = + InternalField.builder() + .name("array_field") + .fieldId(13) + .schema( + InternalSchema.builder() + .name("ARRAY") + .dataType(InternalType.LIST) + .isNullable(true) + .fields(Collections.singletonList(arrayElementField)) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testMapField() { + DataField paimonField = + new DataField(14, "map_field", new MapType(new VarCharType(255), new IntType())); + InternalField mapKeyField = + InternalField.builder() + .name("_one_field_key") + .parentPath("map_field") + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(); + InternalField mapValueField = + InternalField.builder() + .name("_one_field_value") + .parentPath("map_field") + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(); + InternalField expectedField = + InternalField.builder() + .name("map_field") + .fieldId(14) + .schema( + InternalSchema.builder() + .name("MAP") + .dataType(InternalType.MAP) + .isNullable(true) + .fields(Arrays.asList(mapKeyField, mapValueField)) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField); + } + + @Test + void testPrimaryKey() { + DataField paimonField = new DataField(0, "pk_field", new IntType()); + InternalField expectedField = + InternalField.builder() + .name("pk_field") + .fieldId(0) + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + assertField(paimonField, expectedField, Collections.singletonList("pk_field")); + } + + @Test + void testMultiplePrimaryKeys() { + DataField intField = new DataField(0, "int_pk", new IntType()); + DataField stringField = new DataField(1, "string_pk", new VarCharType(255)); + List paimonFields = Arrays.asList(intField, stringField); + List primaryKeys = Arrays.asList("int_pk", "string_pk"); + TableSchema paimonSchema = + new TableSchema( + 0, paimonFields, 0, Collections.emptyList(), primaryKeys, new HashMap<>(), ""); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema); + + InternalField expectedIntField = + InternalField.builder() + .name("int_pk") + .fieldId(0) + .schema( + InternalSchema.builder() + .name("INT") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + InternalField expectedStringField = + InternalField.builder() + .name("string_pk") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("VARCHAR(255)") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(); + List expectedFields = Arrays.asList(expectedIntField, expectedStringField); + InternalSchema expectedSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(expectedFields) + .recordKeyFields(expectedFields) + .build(); + assertEquals(expectedSchema, internalSchema); + } +} diff --git a/xtable-service/pom.xml b/xtable-service/pom.xml index 381aa3d0d..b17707d36 100644 --- a/xtable-service/pom.xml +++ b/xtable-service/pom.xml @@ -216,6 +216,17 @@ test + + + org.apache.paimon + paimon-bundle + + + org.apache.paimon + paimon-spark-${spark.version.prefix} + ${paimon.version} + test + org.apache.parquet diff --git a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java index 0e7a7e26b..c53aa52ea 100644 --- a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java +++ b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java @@ -22,6 +22,7 @@ import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; import static org.apache.xtable.model.storage.TableFormat.ICEBERG; +import static org.apache.xtable.model.storage.TableFormat.PAIMON; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -97,6 +98,15 @@ public static void setupOnce() { .sparkContext() .hadoopConfiguration() .set("parquet.avro.write-old-list-structure", "false"); + sparkSession + .sparkContext() + .conf() + .set( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog") + .set("spark.sql.catalog.paimon.warehouse", tempDir.toUri().toString()); + jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); } catch (IOException e) { throw new RuntimeException(e); @@ -116,14 +126,18 @@ public void setUp() { new DeltaConversionSourceProvider(); ConversionSourceProvider icebergConversionSourceProvider = new IcebergConversionSourceProvider(); + ConversionSourceProvider paimonConversionSourceProvider = + new org.apache.xtable.paimon.PaimonConversionSourceProvider(); hudiConversionSourceProvider.init(jsc.hadoopConfiguration()); deltaConversionSourceProvider.init(jsc.hadoopConfiguration()); icebergConversionSourceProvider.init(jsc.hadoopConfiguration()); + paimonConversionSourceProvider.init(jsc.hadoopConfiguration()); sourceProviders.put(HUDI, hudiConversionSourceProvider); sourceProviders.put(DELTA, deltaConversionSourceProvider); sourceProviders.put(ICEBERG, icebergConversionSourceProvider); + sourceProviders.put(PAIMON, paimonConversionSourceProvider); this.conversionService = new ConversionService( @@ -232,7 +246,7 @@ public void testVariousOperations(String sourceTableFormat, boolean isPartitione private static Stream generateTestParametersFormatsAndPartitioning() { List arguments = new ArrayList<>(); - for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) { + for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) { for (boolean isPartitioned : new boolean[] {true, false}) { arguments.add(Arguments.of(sourceTableFormat, isPartitioned)); } @@ -243,6 +257,7 @@ private static Stream generateTestParametersFormatsAndPartitioning() protected static List getOtherFormats(String sourceTableFormat) { return Arrays.stream(TableFormat.values()) .filter(format -> !format.equals(sourceTableFormat)) + .filter(format -> !format.equals(PAIMON)) // Paimon target not supported yet .collect(Collectors.toList()); }