Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5074][VL] fix: UDF load error in yarn-cluster mode #5075

Merged
merged 7 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public class UdfJniWrapper {

public UdfJniWrapper() {}

public native void nativeLoadUdfLibraries(String udfLibPaths);
public native void getFunctionSignatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {

override def genInjectedFunctions()
: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
UDFResolver.loadAndGetFunctionDescriptions
UDFResolver.getFunctionSignatures
}

override def rewriteSpillPath(path: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.util.Utils
import com.google.common.collect.Lists

import java.io.File
import java.nio.file.{Files, Paths}
import java.net.URI
import java.nio.file.{Files, FileVisitOption, Paths}

import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.collection.mutable
Expand Down Expand Up @@ -107,7 +108,7 @@ object UDFResolver extends Logging {

def getFilesWithExtension(directory: java.nio.file.Path, extension: String): Seq[String] = {
Files
.walk(directory)
.walk(directory, FileVisitOption.FOLLOW_LINKS)
.iterator()
.asScala
.filter(p => Files.isRegularFile(p) && p.toString.endsWith(extension))
Expand All @@ -116,19 +117,19 @@ object UDFResolver extends Logging {
}

def resolveUdfConf(conf: java.util.Map[String, String]): Unit = {
if (isDriver) {
if (localLibraryPaths != null) {
conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, localLibraryPaths)
}
val sparkConf = SparkEnv.get.conf
val udfLibPaths = if (isDriver) {
sparkConf
.getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
} else {
val sparkConf = SparkEnv.get.conf
Option(conf.get(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)) match {
case Some(libs) =>
conf.put(
BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS,
getAllLibraries(libs, sparkConf, canAccessSparkFiles = true))
case None =>
}
sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)
}

udfLibPaths match {
case Some(paths) =>
conf.put(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, getAllLibraries(paths, sparkConf))
case None =>
}
}

Expand All @@ -150,24 +151,44 @@ object UDFResolver extends Logging {
dest
}

private def isRelativePath(path: String): Boolean = {
try {
val uri = new URI(path)
!uri.isAbsolute && uri.getPath == path
} catch {
case _: Exception => false
}
}

// Get the full paths of all libraries.
// If it's a directory, get all files ends with ".so" recursively.
def getAllLibraries(files: String, sparkConf: SparkConf, canAccessSparkFiles: Boolean): String = {
def getAllLibraries(files: String, sparkConf: SparkConf): String = {
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val master = sparkConf.getOption("spark.master")
val isYarnCluster =
master.isDefined && master.get.equals("yarn") && !Utils.isClientMode(sparkConf)
val isYarnClient =
master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf)

files
.split(",")
.map {
f =>
val file = new File(f)
// Relative paths should be uploaded via --files or --archives
// Use SparkFiles.get to download and unpack
if (!file.isAbsolute) {
if (!canAccessSparkFiles) {
if (isRelativePath(f)) {
logInfo(s"resolve relative path: $f")
if (isDriver && isYarnClient) {
throw new IllegalArgumentException(
"On yarn-client mode, driver only accepts absolute paths, but got " + f)
}
new File(SparkFiles.get(f))
if (isYarnCluster || isYarnClient) {
file
} else {
new File(SparkFiles.get(f))
}
} else {
logInfo(s"resolve absolute URI path: $f")
// Download or copy absolute paths to JniWorkspace.
val uri = Utils.resolveURI(f)
val name = file.getName
Expand All @@ -192,26 +213,16 @@ object UDFResolver extends Logging {
.mkString(",")
}

def loadAndGetFunctionDescriptions: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
def getFunctionSignatures: Seq[(FunctionIdentifier, ExpressionInfo, FunctionBuilder)] = {
val sparkContext = SparkContext.getActive.get
val sparkConf = sparkContext.conf
val udfLibPaths = sparkConf
.getOption(BackendSettings.GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS)
.orElse(sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS))
val udfLibPaths = sparkConf.getOption(BackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS)

udfLibPaths match {
case None =>
Seq.empty
case Some(paths) =>
val master = sparkConf.getOption("spark.master")
val isYarnClient =
master.isDefined && master.get.equals("yarn") && Utils.isClientMode(sparkConf)
// For Yarn-client mode, driver cannot get uploaded files via SparkFiles.get.
localLibraryPaths = getAllLibraries(paths, sparkConf, canAccessSparkFiles = !isYarnClient)

logInfo(s"Loading UDF libraries from paths: $localLibraryPaths")
new UdfJniWrapper().nativeLoadUdfLibraries(localLibraryPaths)

new UdfJniWrapper().getFunctionSignatures()
UDFMap.map {
case (name, t) =>
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class VeloxUdfSuiteLocal extends VeloxUdfSuite {
override val master: String = "local[2]"
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", udfLibPath)
.set("spark.files", udfLibPath)
.set("spark.gluten.sql.columnar.backend.velox.udfLibraryPaths", "libmyudf.so")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not hard code "libmyudf.so" here. Use a helper function to extract the filename from udfLibPath. Note the udfLibPath can also be a comma-separated string with multiple paths.

}
}

Expand Down
5 changes: 1 addition & 4 deletions cpp/velox/jni/JniUdf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ void gluten::finalizeVeloxJniUDF(JNIEnv* env) {
env->DeleteGlobalRef(udfResolverClass);
}

void gluten::jniLoadUdf(JNIEnv* env, const std::string& libPaths) {
void gluten::jniGetFunctionSignatures(JNIEnv* env) {
auto udfLoader = gluten::UdfLoader::getInstance();
udfLoader->loadUdfLibraries(libPaths);

const auto& udfMap = udfLoader->getUdfMap();
for (const auto& udf : udfMap) {
auto udfString = udf.second;
jbyteArray returnType = env->NewByteArray(udf.second.length());
env->SetByteArrayRegion(returnType, 0, udfString.length(), reinterpret_cast<const jbyte*>(udfString.c_str()));
jstring name = env->NewStringUTF(udf.first.c_str());

jobject instance = env->GetStaticObjectField(
udfResolverClass, env->GetStaticFieldID(udfResolverClass, "MODULE$", kUdfResolverClassPath.c_str()));
env->CallVoidMethod(instance, registerUDFMethod, name, returnType);
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/jni/JniUdf.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ void initVeloxJniUDF(JNIEnv* env);

void finalizeVeloxJniUDF(JNIEnv* env);

void jniLoadUdf(JNIEnv* env, const std::string& libPaths);
void jniGetFunctionSignatures(JNIEnv* env);

} // namespace gluten
7 changes: 3 additions & 4 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ JNIEXPORT void JNICALL Java_io_glutenproject_init_NativeBackendInitializer_initi
JNI_METHOD_END()
}

JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT
JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_getFunctionSignatures( // NOLINT
JNIEnv* env,
jclass,
jstring libPaths) {
jclass) {
JNI_METHOD_START
gluten::jniLoadUdf(env, jStringToCString(env, libPaths));
gluten::jniGetFunctionSignatures(env);
JNI_METHOD_END()
}

Expand Down
Loading