Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename spark-variant-checkpoint to just variant?

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1723768497710,"operation":"CREATE OR REPLACE TABLE AS SELECT","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"100","numOutputBytes":"14767"},"engineInfo":"Apache-Spark/4.0.0-SNAPSHOT Delta-Lake/3.3.0-SNAPSHOT","txnId":"2cc10429-f586-4c74-805c-8d19fd180c87"}}
{"metaData":{"id":"d7eb0848-b002-4e0b-9d8d-dd335c90946f","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"variant\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_of_variants\",\"type\":{\"type\":\"array\",\"elementType\":\"variant\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_variants\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"v\",\"type\":\"variant\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_of_variants\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"variant\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array_of_struct_of_variants\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"v\",\"type\":\"variant\",\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_array_of_variants\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"v\",\"type\":{\"type\":\"array\",\"elementType\":\"variant\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"2"},"createdTime":1723768495302}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["variantType-preview"],"writerFeatures":["variantType-preview","appendOnly","invariants"]}}
{"add":{"path":"part-00000-16c852df-ba66-4080-be25-530a05922422-c000.snappy.parquet","partitionValues":{},"size":7443,"modificationTime":1723768496908,"dataChange":true,"stats":"{\"numRecords\":50,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":49},\"nullCount\":{\"id\":0,\"v\":0,\"array_of_variants\":0,\"struct_of_variants\":{\"v\":0},\"map_of_variants\":0,\"array_of_struct_of_variants\":0,\"struct_of_array_of_variants\":{\"v\":0}}}"}}
{"add":{"path":"part-00001-664313d3-14b4-4dbf-8110-77001b877182-c000.snappy.parquet","partitionValues":{},"size":7324,"modificationTime":1723768496908,"dataChange":true,"stats":"{\"numRecords\":50,\"minValues\":{\"id\":50},\"maxValues\":{\"id\":99},\"nullCount\":{\"id\":0,\"v\":0,\"array_of_variants\":0,\"struct_of_variants\":{\"v\":0},\"map_of_variants\":0,\"array_of_struct_of_variants\":0,\"struct_of_array_of_variants\":{\"v\":0}}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1723768498557,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5072"},"engineInfo":"Apache-Spark/4.0.0-SNAPSHOT Delta-Lake/3.3.0-SNAPSHOT","txnId":"78417efa-a13f-45df-add0-f96aa113fd68"}}
{"add":{"path":"part-00000-9a9c570c-ee32-4322-ad2f-8c837a77d398-c000.snappy.parquet","partitionValues":{},"size":5072,"modificationTime":1723768498551,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0,\"v\":0,\"array_of_variants\":0,\"struct_of_variants\":{\"v\":0},\"map_of_variants\":0,\"array_of_struct_of_variants\":0,\"struct_of_array_of_variants\":{\"v\":0}}}"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1723768498990,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5072"},"engineInfo":"Apache-Spark/4.0.0-SNAPSHOT Delta-Lake/3.3.0-SNAPSHOT","txnId":"d90393d5-9cdd-40f1-8861-121f2169808b"}}
{"add":{"path":"part-00000-1e14ba22-3114-46d1-96fb-48b4912507ce-c000.snappy.parquet","partitionValues":{},"size":5072,"modificationTime":1723768498986,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0,\"v\":0,\"array_of_variants\":0,\"struct_of_variants\":{\"v\":0},\"map_of_variants\":0,\"array_of_struct_of_variants\":0,\"struct_of_array_of_variants\":{\"v\":0}}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":2,"size":6,"sizeInBytes":21929,"numOfAddFiles":4,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}},{"name":"clusteringProvider","type":"string","nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"domainMetadata","type":{"type":"struct","fields":[{"name":"domain","type":"string","nullable":true,"metadata":{}},{"name":"configuration","type":"string","nullable":true,"metadata":{}},{"name":"removed","type":"boolean","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"checksum":"a8d400a03ead8a86dbb412f2a693e26e"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
118 changes: 104 additions & 14 deletions kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,31 @@
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.utils.SchemaUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.TimestampNTZType;
import io.delta.kernel.types.TimestampType;
import io.delta.kernel.types.VariantType;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.*;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** DataSource V2 Table implementation for Delta Lake using the Delta Kernel API. */
Expand All @@ -46,13 +62,82 @@ public class SparkTable implements Table, SupportsRead {
private final SnapshotImpl snapshot;
private final Configuration hadoopConf;

private final StructType schema;
private final org.apache.spark.sql.types.StructType schema;
private final List<String> partColNames;
private final StructType dataSchema;
private final StructType partitionSchema;
private final org.apache.spark.sql.types.StructType dataSchema;
private final org.apache.spark.sql.types.StructType partitionSchema;
private final Column[] columns;
private final Transform[] partitionTransforms;

/**
* Validates all fields in kernel schema use allowed data types.
*
* @param schema the kernel schema to validate
* @throws IllegalArgumentException if any field uses a disallowed data type
*/
private static void validateSchemaTypes(io.delta.kernel.types.StructType schema) {
for (io.delta.kernel.types.StructField field : schema.fields()) {
validateKernelDataType(field.getDataType(), field.getName());
}
}

/**
* Recursively validate kernel data type and nested types.
*
* @param dataType the kernel data type to validate
* @param fieldPath the path to the field (for error messages)
* @throws IllegalArgumentException if the data type is not allowed
*/
private static void validateKernelDataType(DataType dataType, String fieldPath) {
if (isAllowedKernelType(dataType)) {
// Validate nested fields for structs, arrays, maps
if (dataType instanceof io.delta.kernel.types.StructType) {
io.delta.kernel.types.StructType structType = (io.delta.kernel.types.StructType) dataType;
for (io.delta.kernel.types.StructField field : structType.fields()) {
validateKernelDataType(field.getDataType(), fieldPath + "." + field.getName());
}
} else if (dataType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) dataType;
validateKernelDataType(arrayType.getElementType(), fieldPath + ".element");
} else if (dataType instanceof MapType) {
MapType mapType = (MapType) dataType;
validateKernelDataType(mapType.getKeyType(), fieldPath + ".key");
validateKernelDataType(mapType.getValueType(), fieldPath + ".value");
}
} else {
throw new IllegalArgumentException(
String.format(
"Unsupported data type '%s' for field '%s'. ", dataType.toString(), fieldPath));
}
}

/**
* Checks if a kernel data type is allowed.
*
* @param dataType the kernel data type to check
* @return true if the data type is allowed, false otherwise
*/
private static boolean isAllowedKernelType(DataType dataType) {
// allowed kernel data types
return dataType instanceof ArrayType
|| dataType instanceof BinaryType
|| dataType instanceof BooleanType
|| dataType instanceof ByteType
|| dataType instanceof DateType
|| dataType instanceof DecimalType
|| dataType instanceof DoubleType
|| dataType instanceof FloatType
|| dataType instanceof IntegerType
|| dataType instanceof LongType
|| dataType instanceof MapType
|| dataType instanceof ShortType
|| dataType instanceof StringType
|| dataType instanceof io.delta.kernel.types.StructType
|| dataType instanceof TimestampType
|| dataType instanceof TimestampNTZType
|| dataType instanceof VariantType;
}

/**
* Creates a SparkTable backed by a Delta Kernel snapshot and initializes Spark-facing metadata
* (schemas, partitioning, capabilities).
Expand Down Expand Up @@ -81,16 +166,17 @@ public SparkTable(Identifier identifier, String tablePath, Map<String, String> o
io.delta.kernel.TableManager.loadSnapshot(tablePath)
.build(io.delta.kernel.defaults.engine.DefaultEngine.create(hadoopConf));

validateSchemaTypes(snapshot.getSchema());
this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(snapshot.getSchema());
this.partColNames =
Collections.unmodifiableList(new ArrayList<>(snapshot.getPartitionColumnNames()));

final List<StructField> dataFields = new ArrayList<>();
final List<StructField> partitionFields = new ArrayList<>();
final List<org.apache.spark.sql.types.StructField> dataFields = new ArrayList<>();
final List<org.apache.spark.sql.types.StructField> partitionFields = new ArrayList<>();

// Build a map for O(1) field lookups to improve performance
Map<String, StructField> fieldMap = new HashMap<>();
for (StructField field : schema.fields()) {
Map<String, org.apache.spark.sql.types.StructField> fieldMap = new HashMap<>();
for (org.apache.spark.sql.types.StructField field : schema.fields()) {
fieldMap.put(field.name(), field);
}

Expand All @@ -99,21 +185,25 @@ public SparkTable(Identifier identifier, String tablePath, Map<String, String> o
// in snapshotSchema, and we need to preserve the partColNames order for
// proper partitioning behavior
for (String partColName : partColNames) {
StructField field = fieldMap.get(partColName);
org.apache.spark.sql.types.StructField field = fieldMap.get(partColName);
if (field != null) {
partitionFields.add(field);
}
}

// Add remaining fields as data fields (non-partition columns)
// These are fields that exist in the schema but are not partition columns
for (StructField field : schema.fields()) {
for (org.apache.spark.sql.types.StructField field : schema.fields()) {
if (!partColNames.contains(field.name())) {
dataFields.add(field);
}
}
this.dataSchema = new StructType(dataFields.toArray(new StructField[0]));
this.partitionSchema = new StructType(partitionFields.toArray(new StructField[0]));
this.dataSchema =
new org.apache.spark.sql.types.StructType(
dataFields.toArray(new org.apache.spark.sql.types.StructField[0]));
this.partitionSchema =
new org.apache.spark.sql.types.StructType(
partitionFields.toArray(new org.apache.spark.sql.types.StructField[0]));

this.columns = CatalogV2Util.structTypeToV2Columns(schema);
this.partitionTransforms =
Expand All @@ -138,7 +228,7 @@ public String name() {
}

@Override
public StructType schema() {
public org.apache.spark.sql.types.StructType schema() {
return schema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.spark.read;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.delta.golden.GoldenTableUtils$;
Expand Down Expand Up @@ -575,6 +576,32 @@ public Dataset<Row> apply() {
checkAnswer(dfFunc, expectedSeq);
}

@Test
public void testVariantUnsupportedDataType() {
String tablePath = goldenTablePath("spark-variant-checkpoint");

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
spark.sql("SELECT * FROM `dsv2`.`delta`.`" + tablePath + "`").collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error comes from:

	at io.delta.kernel.spark.utils.SchemaUtils.convertKernelDataTypeToSparkDataType(SchemaUtils.java:116)
	at io.delta.kernel.spark.utils.SchemaUtils.convertKernelSchemaToSparkSchema(SchemaUtils.java:60)

It seesm that it is already handled in SchemaUtils. In such a case, we can just remove all the code changes in SparkTable.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, when getting table schema from Unity catalog, we can check whether the UC schema is supported in Kernel. For example, the Time type is not supported yet.

});

Throwable rootCause = getRootCause(exception);
String errorMessage = rootCause.getMessage();

assertTrue(errorMessage.contains("Unsupported data type"));
assertTrue(errorMessage.contains("variant"));
}

private Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}

@Test
public void testAllGoldenTables() {
List<String> tableNames = getAllGoldenTableNames();
Expand Down
Loading