Skip to content

Commit

Permalink
Merge pull request #10847 from NVIDIA/merge-branch-24.04-to-main
Browse files Browse the repository at this point in the history
Merge branch-24.04 into main
  • Loading branch information
NvTimLiu authored May 21, 2024
2 parents c0c142e + 4202049 commit f5090ba
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 38 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2024-05-09
Generated on 2024-05-20

## Release 24.04

Expand Down Expand Up @@ -29,6 +29,7 @@ Generated on 2024-05-09
### Bugs Fixed
|||
|:---|:---|
|[#10700](https://github.com/NVIDIA/spark-rapids/issues/10700)|[BUG] get_json_object cannot handle ints or boolean values|
|[#10645](https://github.com/NVIDIA/spark-rapids/issues/10645)|[BUG] java.lang.IllegalStateException: Expected to only receive a single batch|
|[#10665](https://github.com/NVIDIA/spark-rapids/issues/10665)|[BUG] Need to update private jar's version to v24.04.1 for spark-rapids v24.04.0 release|
|[#10589](https://github.com/NVIDIA/spark-rapids/issues/10589)|[BUG] ZSTD version mismatch in integration tests|
Expand Down Expand Up @@ -85,6 +86,7 @@ Generated on 2024-05-09
|||
|:---|:---|
|[#10782](https://github.com/NVIDIA/spark-rapids/pull/10782)|Update latest changelog [skip ci]|
|[#10780](https://github.com/NVIDIA/spark-rapids/pull/10780)|[DOC]Update download page for v24.04.1 [skip ci]|
|[#10777](https://github.com/NVIDIA/spark-rapids/pull/10777)|Update rapids JNI dependency: private to 24.04.2|
|[#10683](https://github.com/NVIDIA/spark-rapids/pull/10683)|Update latest changelog [skip ci]|
|[#10681](https://github.com/NVIDIA/spark-rapids/pull/10681)|Update rapids JNI dependency to 24.04.0, private to 24.04.1|
Expand Down
46 changes: 46 additions & 0 deletions docs/archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,52 @@ nav_order: 15
---
Below are archived releases for RAPIDS Accelerator for Apache Spark.

## Release v24.04.0
### Hardware Requirements:

The plugin is tested on the following architectures:
@@ -67,14 +67,14 @@ for your hardware's minimum driver version.
### RAPIDS Accelerator's Support Policy for Apache Spark
The RAPIDS Accelerator maintains support for Apache Spark versions available for download from [Apache Spark](https://spark.apache.org/downloads.html)

### Download RAPIDS Accelerator for Apache Spark v24.04.0

| Processor | Scala Version | Download Jar | Download Signature |
|-----------|---------------|--------------|--------------------|
| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar.asc) |
| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar.asc) |
| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar.asc) |
| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar.asc) |

This package is built against CUDA 11.8. It is tested on V100, T4, A10, A100, L4 and H100 GPUs with
CUDA 11.8 through CUDA 12.0.

### Verify signature
* Download the [PUB_KEY](https://keys.openpgp.org/[email protected]).
* Import the public key: `gpg --import PUB_KEY`
* Verify the signature for Scala 2.12 jar:
`gpg --verify rapids-4-spark_2.12-24.04.0.jar.asc rapids-4-spark_2.12-24.04.0.jar`
* Verify the signature for Scala 2.13 jar:
`gpg --verify rapids-4-spark_2.13-24.04.0.jar.asc rapids-4-spark_2.13-24.04.0.jar`

The output of signature verify:

gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) <[email protected]>"

### Release Notes
* New functionality and performance improvements for this release include:
* Performance improvements for S3 reading.
Refer to perfio.s3.enabled in [advanced_configs](./additional-functionality/advanced_configs.md) for more details.
* Performance improvements when doing a joins on unique keys.
* Enhanced decompression kernels for zstd and snappy.
* Enhanced Parquet reading performance with modular kernels.
* Added compatibility with Spark version 3.5.1.
* Deprecated support for Databricks 10.4 ML LTS.
* For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases).

For a detailed list of changes, please refer to the
[CHANGELOG](https://github.com/NVIDIA/spark-rapids/blob/main/CHANGELOG.md).

## Release v24.02.0
### Hardware Requirements:

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>24.04.0</spark-rapids-jni.version>
<spark-rapids-private.version>24.04.2</spark-rapids-private.version>
<spark-rapids-private.version>24.04.3</spark-rapids-private.version>
<scala.binary.version>2.12</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
Expand Down
2 changes: 1 addition & 1 deletion scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>24.04.0</spark-rapids-jni.version>
<spark-rapids-private.version>24.04.2</spark-rapids-private.version>
<spark-rapids-private.version>24.04.3</spark-rapids-private.version>
<scala.binary.version>2.13</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner
import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim}
import org.apache.spark.sql.rapids.execution.python.shims.GpuGroupedPythonRunnerFactory
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -109,8 +109,6 @@ case class GpuAggregateInPandasExec(
val (mNumInputRows, mNumInputBatches, mNumOutputRows, mNumOutputBatches) = commonGpuMetrics()

lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)
val childOutput = child.output
val resultExprs = resultExpressions

Expand Down Expand Up @@ -204,27 +202,22 @@ case class GpuAggregateInPandasExec(
}
}

val runnerFactory = GpuGroupedPythonRunnerFactory(conf, pyFuncs, argOffsets,
aggInputSchema, DataTypeUtilsShim.fromAttributes(pyOutAttributes),
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)

// Third, sends to Python to execute the aggregate and returns the result.
if (pyInputIter.hasNext) {
// Launch Python workers only when the data is not empty.
val pyRunner = new GpuArrowPythonRunner(
pyFuncs,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
argOffsets,
aggInputSchema,
sessionLocalTimeZone,
pythonRunnerConf,
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
DataTypeUtilsShim.fromAttributes(pyOutAttributes))

val pyRunner = runnerFactory.getRunner()
val pyOutputIterator = pyRunner.compute(pyInputIter, context.partitionId(), context)

val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes
val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs)
// Gets the combined batch for each group and projects for the output.
new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner,
mNumOutputRows, mNumOutputBatches).map { combinedBatch =>
new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator,
pyRunner.asInstanceOf[GpuArrowOutput], mNumOutputRows,
mNumOutputBatches).map { combinedBatch =>
withResource(combinedBatch) { batch =>
GpuProjectExec.project(batch, resultRefs)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
Expand Down Expand Up @@ -123,7 +123,8 @@ case class GpuFlatMapGroupsInPandasExec(
resolveArgOffsets(child, groupingAttributes)

val runnerFactory = GpuGroupedPythonRunnerFactory(conf, chainedFunc, Array(argOffsets),
DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema)
DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)

// Start processing. Map grouped batches to ArrowPythonRunner results.
child.executeColumnar().mapPartitionsInternal { inputIter =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution.python.shims

import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -49,14 +49,15 @@ case class GpuGroupedPythonRunnerFactory(
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)

def getRunner(): GpuBasePythonRunner[ColumnarBatch] = {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -65,4 +66,4 @@ case class GpuGroupedPythonRunnerFactory(
Int.MaxValue,
pythonOutputSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution.python.shims

import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -30,7 +30,8 @@ case class GpuGroupedPythonRunnerFactory(
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
// Configs from DB runtime
val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice
val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled
Expand All @@ -41,7 +42,7 @@ case class GpuGroupedPythonRunnerFactory(
if (zeroConfEnabled && maxBytes > 0L) {
new GpuGroupUDFArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory(
} else {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

//TODO is this needed? we already have a similar version in spark330db
case class GpuGroupedPythonRunnerFactory(
conf: org.apache.spark.sql.internal.SQLConf,
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
// Configs from DB runtime
val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice
val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled
val isArrowBatchSlicingEnabled = conf.pythonArrowBatchSlicingEnabled
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)

def getRunner(): GpuBasePythonRunner[ColumnarBatch] = {
if (zeroConfEnabled && maxBytes > 0L) {
if (isArrowBatchSlicingEnabled || (zeroConfEnabled && maxBytes > 0L)) {
new GpuGroupUDFArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory(
} else {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand Down

0 comments on commit f5090ba

Please sign in to comment.