Skip to content

Commit

Permalink
Upgrade to spark 3.2 (#416)
Browse files Browse the repository at this point in the history
* Use spark 3.2.1 and fix hasCorrelation Check fail

* Fix scalastyle fail

* Disable spark.sql.adaptive.enabled

Co-authored-by: tan.vu <[email protected]>
  • Loading branch information
2 people authored and Tammo Rukat committed Feb 15, 2022
1 parent 52519e6 commit 9334176
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 24 deletions.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<artifact.scala.version>${scala.major.version}</artifact.scala.version>
<scala-maven-plugin.version>4.4.0</scala-maven-plugin.version>

<spark.version>3.2.0</spark.version>
<spark.version>3.2.1</spark.version>
</properties>

<name>deequ</name>
Expand Down Expand Up @@ -74,6 +74,14 @@
<version>${scala.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.major.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
package org.apache.spark.sql.catalyst.expressions.aggregate

import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -26,12 +25,13 @@ import org.apache.spark.sql.types._

/** Adjusted version of org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
* (github tag v2.2.0) */
private[sql] case class StatefulApproxQuantile(
child: Expression,
accuracyExpression: Expression,
override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[PercentileDigest] with ImplicitCastInputTypes with BinaryLike[Expression] {
private[sql] case class StatefulApproxQuantile(child: Expression,
accuracyExpression: Expression,
override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[PercentileDigest]
with ImplicitCastInputTypes
with BinaryLike[Expression] {

def this(child: Expression, accuracyExpression: Expression) = {
this(child, accuracyExpression, 0, 0)
Expand Down Expand Up @@ -111,11 +111,12 @@ private[sql] case class StatefulApproxQuantile(
}

override def left: Expression = child

override def right: Expression = accuracyExpression
// override def third: Expression = accuracyExpression

protected def withNewChildrenInternal(
newFirst: Expression, newSecond: Expression): StatefulApproxQuantile =
protected def withNewChildrenInternal(newFirst: Expression,
newSecond: Expression): StatefulApproxQuantile =
copy(child = newFirst, accuracyExpression = newSecond)

// protected def withNewChildrenInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ private[sql] class StatefulCorrelation(
val state = Seq(super.hashCode(), evaluateExpression)
state.map { _.hashCode() }.foldLeft(0) {(a, b) => 31 * a + b }
}

override protected def withNewChildrenInternal(newLeft: Expression,
newRight: Expression): StatefulCorrelation =
new StatefulCorrelation(newLeft, newRight, nullOnDivideByZero)
}
1 change: 1 addition & 0 deletions src/test/scala/com/amazon/deequ/SparkContextSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ trait SparkContextSpec {
.appName("test")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", 2.toString)
.config("spark.sql.adaptive.enabled", value = false)
.getOrCreate()
session.sparkContext.setCheckpointDir(System.getProperty("java.io.tmpdir"))
session
Expand Down

0 comments on commit 9334176

Please sign in to comment.