From b7e957858ae82294935927846ef85cbdafe20663 Mon Sep 17 00:00:00 2001 From: zhaokuo Date: Thu, 21 Mar 2024 14:46:09 +0800 Subject: [PATCH 1/7] fix yarn-cluster udf diapatch error --- .../io/glutenproject/udf/UdfJniWrapper.java | 2 +- .../velox/SparkPlanExecApiImpl.scala | 2 +- .../spark/sql/expression/UDFResolver.scala | 72 ++++++++++--------- .../expression/VeloxUdfSuite.scala | 3 +- cpp/velox/jni/JniUdf.cc | 5 +- cpp/velox/jni/JniUdf.h | 2 +- cpp/velox/jni/VeloxJniWrapper.cc | 7 +- 7 files changed, 49 insertions(+), 44 deletions(-) diff --git a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java index 50c778239e64..83cb9a16facb 100644 --- a/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java +++ b/backends-velox/src/main/java/io/glutenproject/udf/UdfJniWrapper.java @@ -20,5 +20,5 @@ public class UdfJniWrapper { public UdfJniWrapper() {} - public native void nativeLoadUdfLibraries(String udfLibPaths); + public native void getFunctionSignatures(); } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index e7d2b22255f7..61ea50695db3 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { override def genInjectedFunctions() : Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = { - UDFResolver.loadAndGetFunctionDescriptions + UDFResolver.getFunctionSignatures } override def rewriteSpillPath(path: String): String = { diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 05898b171394..9c6c7fc680e6 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -37,7 +37,8 @@ import org.apache.spark.util.Utils import com.google.common.collect.Lists import java.io.File -import java.nio.file.{Files, Paths} +import java.net.URI +import java.nio.file.{Files, FileVisitOption, Paths} import scala.collection.JavaConverters.asScalaIteratorConverter import scala.collection.mutable @@ -107,7 +108,7 @@ object UDFResolver extends Logging { def getFilesWithExtension(directory: java.nio.file.Path, extension: String): Seq[String] = { Files - .walk(directory) + .walk(directory, FileVisitOption.FOLLOW_LINKS) .iterator() .asScala .filter(p => Files.isRegularFile(p) && p.toString.endsWith(extension)) @@ -116,19 +117,19 @@ object UDFResolver extends Logging { } def resolveUdfConf(conf: java.util.Map[String, String]): Unit = { - if (isDriver) { - if (localLibraryPaths != null) { - conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, localLibraryPaths) - } + val sparkConf = SparkEnv.get.conf + val udfLibPaths = if (isDriver) { + sparkConf + .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS) + .orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) } else { - val sparkConf = SparkEnv.get.conf - Option(conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) match { - case Some(libs) => - conf.put( - BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, - getAllLibraries(libs, sparkConf, canAccessSparkFiles = true)) - case None => - } + sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS) + } + + udfLibPaths match { + case Some(paths) => + conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, getAllLibraries(paths, sparkConf)) + case None => } } @@ -150,23 +151,40 @@ object UDFResolver extends Logging { dest } + private def isRelativePath(path: String): Boolean = { + try { + val uri = new URI(path) + !uri.isAbsolute && uri.getPath == path + } catch { + case _: Exception => false + } + } + // Get the full paths of all libraries. // If it's a directory, get all files ends with ".so" recursively. - def getAllLibraries(files: String, sparkConf: SparkConf, canAccessSparkFiles: Boolean): String = { + def getAllLibraries(files: String, sparkConf: SparkConf): String = { val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val master = sparkConf.getOption("spark.master") + val isYarnCluster = + master.isDefined && master.get.equals("yarn") && !Utils.isClientMode(sparkConf) + val isYarnClient = + master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf) + files .split(",") .map { f => val file = new File(f) // Relative paths should be uploaded via --files or --archives - // Use SparkFiles.get to download and unpack - if (!file.isAbsolute) { - if (!canAccessSparkFiles) { + if (isRelativePath(f)) { + if (isYarnClient) { throw new IllegalArgumentException( "On yarn-client mode, driver only accepts absolute paths, but got " + f) + } else if (isYarnCluster) { + file + } else { + new File(SparkFiles.get(f)) } - new File(SparkFiles.get(f)) } else { // Download or copy absolute paths to JniWorkspace. val uri = Utils.resolveURI(f) @@ -192,26 +210,16 @@ object UDFResolver extends Logging { .mkString(",") } - def loadAndGetFunctionDescriptions: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = { + def getFunctionSignatures: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = { val sparkContext = SparkContext.getActive.get val sparkConf = sparkContext.conf - val udfLibPaths = sparkConf - .getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS) - .orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) + val udfLibPaths = sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS) udfLibPaths match { case None => Seq.empty case Some(paths) => - val master = sparkConf.getOption("spark.master") - val isYarnClient = - master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf) - // For Yarn-client mode, driver cannot get uploaded files via SparkFiles.get. - localLibraryPaths = getAllLibraries(paths, sparkConf, canAccessSparkFiles = !isYarnClient) - - logInfo(s"Loading UDF libraries from paths: $localLibraryPaths") - new UdfJniWrapper().nativeLoadUdfLibraries(localLibraryPaths) - + new UdfJniWrapper().getFunctionSignatures() UDFMap.map { case (name, t) => ( diff --git a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala index 4fd05f687b24..48c9d8d62376 100644 --- a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala @@ -82,7 +82,8 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite { override val master: String = "local[2]" override protected def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", udfLibPath) + .set("spark.files", udfLibPath) + .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", "libmyudf.so") } } diff --git a/cpp/velox/jni/JniUdf.cc b/cpp/velox/jni/JniUdf.cc index 28551cfe8a57..180788bee426 100644 --- a/cpp/velox/jni/JniUdf.cc +++ b/cpp/velox/jni/JniUdf.cc @@ -47,17 +47,14 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) { env->DeleteGlobalRef(udfResolverClass); } -void gluten::jniLoadUdf(JNIEnv* env, const std::string& libPaths) { +void gluten::jniGetFunctionSignatures(JNIEnv* env) { auto udfLoader = gluten::UdfLoader::getInstance(); - udfLoader->loadUdfLibraries(libPaths); - const auto& udfMap = udfLoader->getUdfMap(); for (const auto& udf : udfMap) { auto udfString = udf.second; jbyteArray returnType = env->NewByteArray(udf.second.length()); env->SetByteArrayRegion(returnType, 0, udfString.length(), reinterpret_cast(udfString.c_str())); jstring name = env->NewStringUTF(udf.first.c_str()); - jobject instance = env->GetStaticObjectField( udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$", kUdfResolverClassPath.c_str())); env->CallVoidMethod(instance, registerUDFMethod, name, returnType); diff --git a/cpp/velox/jni/JniUdf.h b/cpp/velox/jni/JniUdf.h index b666d8eb0185..b91ac08dedc5 100644 --- a/cpp/velox/jni/JniUdf.h +++ b/cpp/velox/jni/JniUdf.h @@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env); void finalizeVeloxJniUDF(JNIEnv* env); -void jniLoadUdf(JNIEnv* env, const std::string& libPaths); +void jniGetFunctionSignatures(JNIEnv* env); } // namespace gluten diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index cb1604629326..e73fe7eb0149 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -76,12 +76,11 @@ JNIEXPORT void JNICALL Java_io_glutenproject_init_NativeBackendInitializer_initi JNI_METHOD_END() } -JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT JNIEnv* env, - jclass, - jstring libPaths) { + jclass) { JNI_METHOD_START - gluten::jniLoadUdf(env, jStringToCString(env, libPaths)); + gluten::jniGetFunctionSignatures(env); JNI_METHOD_END() } From 8c7b5e8e35dd03aaefcc6e388d6f5ac1d5386b7b Mon Sep 17 00:00:00 2001 From: zhaokuo03 Date: Sat, 23 Mar 2024 22:02:35 +0800 Subject: [PATCH 2/7] fix yarn-client executor dispatch error; add debug log --- .../org/apache/spark/sql/expression/UDFResolver.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 9c6c7fc680e6..5eaef1ff2065 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -131,6 +131,9 @@ object UDFResolver extends Logging { conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, getAllLibraries(paths, sparkConf)) case None => } + + logInfo(s"after resolve path " + + s"is:${sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") } // Try to unpack archive. Throws exception if failed. @@ -177,15 +180,18 @@ object UDFResolver extends Logging { val file = new File(f) // Relative paths should be uploaded via --files or --archives if (isRelativePath(f)) { - if (isYarnClient) { + logInfo(s"resolve relative path: $f") + if (isDriver && isYarnClient) { throw new IllegalArgumentException( "On yarn-client mode, driver only accepts absolute paths, but got " + f) - } else if (isYarnCluster) { + } + if (isYarnCluster || isYarnClient) { file } else { new File(SparkFiles.get(f)) } } else { + logInfo(s"resolve absolute URI path: $f") // Download or copy absolute paths to JniWorkspace. val uri = Utils.resolveURI(f) val name = file.getName From cbf253e9202667b9bf491c6a554a108a1bb29b4d Mon Sep 17 00:00:00 2001 From: zhaokuo03 Date: Sat, 23 Mar 2024 22:10:06 +0800 Subject: [PATCH 3/7] Fix format --- .../scala/org/apache/spark/sql/expression/UDFResolver.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 5eaef1ff2065..49d0cccae00a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -132,8 +132,9 @@ object UDFResolver extends Logging { case None => } - logInfo(s"after resolve path " + - s"is:${sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") + logInfo( + s"after resolve path " + + s"is:${sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") } // Try to unpack archive. Throws exception if failed. From 6f9be96c4d4c6be99f6fddf1590306cfdaa3f10b Mon Sep 17 00:00:00 2001 From: zhaokuo03 Date: Sat, 23 Mar 2024 23:30:15 +0800 Subject: [PATCH 4/7] followup --- .../scala/org/apache/spark/sql/expression/UDFResolver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 49d0cccae00a..42f04295ec6c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -134,7 +134,7 @@ object UDFResolver extends Logging { logInfo( s"after resolve path " + - s"is:${sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") + s"is:${conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") } // Try to unpack archive. Throws exception if failed. From 5f145acfafdb8df75dad8c0f304dbbe1457fa52a Mon Sep 17 00:00:00 2001 From: zhaokuo03 Date: Sat, 23 Mar 2024 23:56:50 +0800 Subject: [PATCH 5/7] rm debug log --- .../scala/org/apache/spark/sql/expression/UDFResolver.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 42f04295ec6c..c6d8b591734e 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -131,10 +131,6 @@ object UDFResolver extends Logging { conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, getAllLibraries(paths, sparkConf)) case None => } - - logInfo( - s"after resolve path " + - s"is:${conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)}") } // Try to unpack archive. Throws exception if failed. From e7978cc159cefb526791f14ed9a17ebe7769c7ae Mon Sep 17 00:00:00 2001 From: zhaokuo Date: Mon, 25 Mar 2024 14:16:00 +0800 Subject: [PATCH 6/7] fix --- .../expression/VeloxUdfSuite.scala | 5 ++++- docs/get-started/Velox.md | 17 ++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala index 48c9d8d62376..effc790b6f31 100644 --- a/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/expression/VeloxUdfSuite.scala @@ -47,6 +47,9 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper { "/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so") } + protected lazy val udfLibRelativePath: String = + udfLibPath.split(",").map(p => Paths.get(p).getFileName.toString).mkString(",") + override protected def beforeAll(): Unit = { super.beforeAll() if (_spark == null) { @@ -83,7 +86,7 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite { override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.files", udfLibPath) - .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", "libmyudf.so") + .set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", udfLibRelativePath) } } diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 0f7ca1964129..8e4a053e09d5 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -434,15 +434,15 @@ Gluten loads the UDF libraries at runtime. You can upload UDF libraries via `--f Note if running on Yarn client mode, the uploaded files are not reachable on driver side. Users should copy those files to somewhere reachable for driver and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This configuration is also useful when the `udfLibraryPaths` is different between driver side and executor side. -- Use `--files` +- Use the `--files` option to upload a library and configure its relative path ```shell --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so # Needed for Yarn client mode ---conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/libmyudf.so +--conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so ``` -- Use `--archives` +- Use the `--archives` option to upload a archive and configure its relative path ```shell --archives /path/to/udf_archives.zip#udf_archives --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives @@ -450,7 +450,7 @@ Note if running on Yarn client mode, the uploaded files are not reachable on dri --conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip ``` -- Specify URI +- Only configure URI You can also specify the local or HDFS URIs to the UDF libraries or archives. Local URIs should exist on driver and every worker nodes. ```shell @@ -462,10 +462,17 @@ You can also specify the local or HDFS URIs to the UDF libraries or archives. Lo We provided an Velox UDF example file [MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp, you can find the example library at /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so Start spark-shell or spark-sql with below configuration -``` +```shell +# Use the `--files` option to upload a library and configure its relative path --files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so ``` +or +```shell +# Only configure URI +--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so +``` + Run query. The functions `myudf1` and `myudf2` increment the input value by a constant of 5 ``` select myudf1(1), myudf2(100L) From fd4d2e61c6681874c302ef74984a8a2eb2151903 Mon Sep 17 00:00:00 2001 From: zhaokuo Date: Mon, 25 Mar 2024 14:18:20 +0800 Subject: [PATCH 7/7] fix --- docs/get-started/Velox.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 8e4a053e09d5..019aa96062ca 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -442,7 +442,7 @@ Note if running on Yarn client mode, the uploaded files are not reachable on dri --conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so ``` -- Use the `--archives` option to upload a archive and configure its relative path +- Use the `--archives` option to upload an archive and configure its relative path ```shell --archives /path/to/udf_archives.zip#udf_archives --conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives