Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode #5075

Merged
merged 7 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public class UdfJniWrapper {

public UdfJniWrapper() {}

public native void nativeLoadUdfLibraries(String udfLibPaths);
public native void getFunctionSignatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {

override def genInjectedFunctions()
: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
UDFResolver.loadAndGetFunctionDescriptions
UDFResolver.getFunctionSignatures
}

override def rewriteSpillPath(path: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
import com.google.common.collect.Lists

import java.io.File
import java.nio.file.{Files, Paths}
import java.net.URI
import java.nio.file.{Files, FileVisitOption, Paths}

import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.collection.mutable
Expand Down Expand Up @@ -107,7 +108,7 @@ object UDFResolver extends Logging {

def getFilesWithExtension(directory: java.nio.file.Path, extension: String): Seq[String] = {
Files
.walk(directory)
.walk(directory, FileVisitOption.FOLLOW_LINKS)
.iterator()
.asScala
.filter(p => Files.isRegularFile(p) && p.toString.endsWith(extension))
Expand All @@ -116,19 +117,19 @@ object UDFResolver extends Logging {
}

def resolveUdfConf(conf: java.util.Map[String, String]): Unit = {
if (isDriver) {
if (localLibraryPaths != null) {
conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, localLibraryPaths)
}
val sparkConf = SparkEnv.get.conf
val udfLibPaths = if (isDriver) {
sparkConf
.getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
} else {
val sparkConf = SparkEnv.get.conf
Option(conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) match {
case Some(libs) =>
conf.put(
BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS,
getAllLibraries(libs, sparkConf, canAccessSparkFiles = true))
case None =>
}
sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
}

udfLibPaths match {
case Some(paths) =>
conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, getAllLibraries(paths, sparkConf))
case None =>
}
}

Expand All @@ -150,24 +151,44 @@ object UDFResolver extends Logging {
dest
}

private def isRelativePath(path: String): Boolean = {
try {
val uri = new URI(path)
!uri.isAbsolute && uri.getPath == path
} catch {
case _: Exception => false
}
}

// Get the full paths of all libraries.
// If it's a directory, get all files ends with ".so" recursively.
def getAllLibraries(files: String, sparkConf: SparkConf, canAccessSparkFiles: Boolean): String = {
def getAllLibraries(files: String, sparkConf: SparkConf): String = {
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val master = sparkConf.getOption("spark.master")
val isYarnCluster =
master.isDefined && master.get.equals("yarn") && !Utils.isClientMode(sparkConf)
val isYarnClient =
master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf)

files
.split(",")
.map {
f =>
val file = new File(f)
// Relative paths should be uploaded via --files or --archives
// Use SparkFiles.get to download and unpack
if (!file.isAbsolute) {
if (!canAccessSparkFiles) {
if (isRelativePath(f)) {
logInfo(s"resolve relative path: $f")
if (isDriver && isYarnClient) {
throw new IllegalArgumentException(
"On yarn-client mode, driver only accepts absolute paths, but got " + f)
}
new File(SparkFiles.get(f))
if (isYarnCluster || isYarnClient) {
file
} else {
new File(SparkFiles.get(f))
}
} else {
logInfo(s"resolve absolute URI path: $f")
// Download or copy absolute paths to JniWorkspace.
val uri = Utils.resolveURI(f)
val name = file.getName
Expand All @@ -192,26 +213,16 @@ object UDFResolver extends Logging {
.mkString(",")
}

def loadAndGetFunctionDescriptions: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
def getFunctionSignatures: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
val sparkContext = SparkContext.getActive.get
val sparkConf = sparkContext.conf
val udfLibPaths = sparkConf
.getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
val udfLibPaths = sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)

udfLibPaths match {
case None =>
Seq.empty
case Some(paths) =>
val master = sparkConf.getOption("spark.master")
val isYarnClient =
master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf)
// For Yarn-client mode, driver cannot get uploaded files via SparkFiles.get.
localLibraryPaths = getAllLibraries(paths, sparkConf, canAccessSparkFiles = !isYarnClient)

logInfo(s"Loading UDF libraries from paths: $localLibraryPaths")
new UdfJniWrapper().nativeLoadUdfLibraries(localLibraryPaths)

new UdfJniWrapper().getFunctionSignatures()
UDFMap.map {
case (name, t) =>
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ abstract class VeloxUdfSuite extends GlutenQueryTest with SQLHelper {
"/path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so")
}

protected lazy val udfLibRelativePath: String =
udfLibPath.split(",").map(p => Paths.get(p).getFileName.toString).mkString(",")

override protected def beforeAll(): Unit = {
super.beforeAll()
if (_spark == null) {
Expand Down Expand Up @@ -82,7 +85,8 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite {
override val master: String = "local[2]"
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", udfLibPath)
.set("spark.files", udfLibPath)
.set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", udfLibRelativePath)
}
}

Expand Down
5 changes: 1 addition & 4 deletions cpp/velox/jni/JniUdf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
env->DeleteGlobalRef(udfResolverClass);
}

void gluten::jniLoadUdf(JNIEnv* env, const std::string& libPaths) {
void gluten::jniGetFunctionSignatures(JNIEnv* env) {
auto udfLoader = gluten::UdfLoader::getInstance();
udfLoader->loadUdfLibraries(libPaths);

const auto& udfMap = udfLoader->getUdfMap();
for (const auto& udf : udfMap) {
auto udfString = udf.second;
jbyteArray returnType = env->NewByteArray(udf.second.length());
env->SetByteArrayRegion(returnType, 0, udfString.length(), reinterpret_cast<const jbyte*>(udfString.c_str()));
jstring name = env->NewStringUTF(udf.first.c_str());

jobject instance = env->GetStaticObjectField(
udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$", kUdfResolverClassPath.c_str()));
env->CallVoidMethod(instance, registerUDFMethod, name, returnType);
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/jni/JniUdf.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env);

void finalizeVeloxJniUDF(JNIEnv* env);

void jniLoadUdf(JNIEnv* env, const std::string& libPaths);
void jniGetFunctionSignatures(JNIEnv* env);

} // namespace gluten
7 changes: 3 additions & 4 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ JNIEXPORT void JNICALL Java_io_glutenproject_init_NativeBackendInitializer_initi
JNI_METHOD_END()
}

JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT
JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
JNIEnv* env,
jclass,
jstring libPaths) {
jclass) {
JNI_METHOD_START
gluten::jniLoadUdf(env, jStringToCString(env, libPaths));
gluten::jniGetFunctionSignatures(env);
JNI_METHOD_END()
}

Expand Down
17 changes: 12 additions & 5 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,23 +434,23 @@ Gluten loads the UDF libraries at runtime. You can upload UDF libraries via `--f

Note if running on Yarn client mode, the uploaded files are not reachable on driver side. Users should copy those files to somewhere reachable for driver and set `spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths`. This configuration is also useful when the `udfLibraryPaths` is different between driver side and executor side.

- Use `--files`
- Use the `--files` option to upload a library and configure its relative path
```shell
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
# Needed for Yarn client mode
--conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
```

- Use `--archives`
- Use the `--archives` option to upload an archive and configure its relative path
```shell
--archives /path/to/udf_archives.zip#udf_archives
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=udf_archives
# Needed for Yarn client mode
--conf spark.gluten.sql.columnar.backend.velox.driver.udfLibraryPaths=file:///path/to/udf_archives.zip
```

- Specify URI
- Only configure URI

You can also specify the local or HDFS URIs to the UDF libraries or archives. Local URIs should exist on driver and every worker nodes.
```shell
Expand All @@ -462,10 +462,17 @@ You can also specify the local or HDFS URIs to the UDF libraries or archives. Lo
We provided an Velox UDF example file [MyUDF.cpp](../../cpp/velox/udf/examples/MyUDF.cpp). After building gluten cpp, you can find the example library at /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so

Start spark-shell or spark-sql with below configuration
```
```shell
# Use the `--files` option to upload a library and configure its relative path
--files /path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=libmyudf.so
```
or
```shell
# Only configure URI
--conf spark.gluten.sql.columnar.backend.velox.udfLibraryPaths=file:///path/to/gluten/cpp/build/velox/udf/examples/libmyudf.so
```

Run query. The functions `myudf1` and `myudf2` increment the input value by a constant of 5
```
select myudf1(1), myudf2(100L)
Expand Down
Loading