Skip to content

Commit

Permalink
[SPARK-49960] Re-Enable DeveloperAPI provided custom encoding - with …
Browse files Browse the repository at this point in the history
…test
  • Loading branch information
chris-twiner committed Oct 15, 2024
1 parent 33f691e commit b457353
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import org.apache.spark.TestUtils.withListener
import org.apache.spark.internal.config.MAX_RESULT_SIZE
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, ExpressionEncoder, OuterScopes}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.BoxedIntEncoder
import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.DeserializerBuildHelper.createDeserializerForString
import org.apache.spark.sql.catalyst.SerializerBuildHelper.createSerializerForString
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoders, AgnosticExpressionPathEncoder, ExpressionEncoder, OuterScopes}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedIntEncoder, ProductEncoder}
import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, Expression, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
import org.apache.spark.sql.catalyst.util.sideBySide
Expand Down Expand Up @@ -2802,6 +2804,54 @@ class DatasetSuite extends QueryTest
}
}
}

test("SPARK-49960: joinWith custom encoder") {
/*
test based on "joinWith class with primitive, toDF"
with "custom" encoder. Removing the use of AgnosticExpressionPathEncoder
within SerializerBuildHelper and DeserializerBuildHelper will trigger MatchErrors
*/
val ds1 = Seq(1, 1, 2).toDS()
val ds2 = SparkSession.active.createDataset[ClassData](Seq(ClassData("a", 1),
ClassData("b", 2)))(CustomPathEncoder.custClassDataEnc)

checkAnswer(
ds1.joinWith(ds2, $"value" === $"b").toDF().select($"_1", $"_2.a", $"_2.b"),
Row(1, "a", 1) :: Row(1, "a", 1) :: Row(2, "b", 2) :: Nil)
}

}

/**
* SPARK-49960 - Mimic a custom encoder such as those provided by typelevel Frameless
*/
object CustomPathEncoder {

val realClassDataEnc: ProductEncoder[ClassData] =
Encoders.product[ClassData].asInstanceOf[ProductEncoder[ClassData]]

val custStringEnc: AgnosticExpressionPathEncoder[String] =
new AgnosticExpressionPathEncoder[String] {

override def toCatalyst(input: Expression): Expression =
createSerializerForString(input)

override def fromCatalyst(inputPath: Expression): Expression =
createDeserializerForString(inputPath, returnNullable = false)

override def isPrimitive: Boolean = false

override def dataType: DataType = StringType

override def clsTag: ClassTag[String] = implicitly[ClassTag[String]]

override def isStruct: Boolean = true
}

val custClassDataEnc: ProductEncoder[ClassData] = realClassDataEnc.copy(fields =
Seq(realClassDataEnc.fields.head.copy(enc = custStringEnc),
realClassDataEnc.fields.last)
)
}

class DatasetLargeResultCollectingSuite extends QueryTest
Expand Down

0 comments on commit b457353

Please sign in to comment.