Skip to content

Commit

Permalink
Fixes: Crash on Databricks when using zip deployment, exceptions in c…
Browse files Browse the repository at this point in the history
…onsole after successful run on windows. Added logging to help with troubleshooting
  • Loading branch information
grazy27 committed Dec 23, 2024
1 parent 28088f9 commit c7f4ae1
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.UnitTest/TypeConverterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public void TestBaseCase()
Assert.Equal((short)1, TypeConverter.ConvertTo<short>((short)1));
Assert.Equal((ushort)1, TypeConverter.ConvertTo<ushort>((ushort)1));
Assert.Equal(1, TypeConverter.ConvertTo<int>(1));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1));
Assert.Equal(1u, TypeConverter.ConvertTo<uint>(1u));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1L));
Assert.Equal(1ul, TypeConverter.ConvertTo<ulong>(1ul));
Expand Down
10 changes: 9 additions & 1 deletion src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using Microsoft.Spark.Network;
Expand Down Expand Up @@ -184,7 +185,7 @@ private object CallJavaMethod(
ISocketWrapper socket = null;

try
{
{
// Limit the number of connections to the JVM backend. Netty is configured
// to use a set number of threads to process incoming connections. Each
// new connection is delegated to these threads in a round robin fashion.
Expand Down Expand Up @@ -299,6 +300,13 @@ private object CallJavaMethod(
}
else
{
if (e.InnerException is SocketException)
{
_logger.LogError(
"Scala worker abandoned the connection, likely fatal crash on Java side. \n" +
"Ensure Spark runs with sufficient memory.");
}

// In rare cases we may hit the Netty connection thread deadlock.
// If max backend threads is 10 and we are currently using 10 active
// connections (0 in the _sockets queue). When we hit this exception,
Expand Down
14 changes: 8 additions & 6 deletions src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,22 @@ public Database GetDatabase(string dbName) =>
new Database((JvmObjectReference)Reference.Invoke("getDatabase", dbName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built
/// function then use the unqualified name.
/// Get the function with the specified name. This function can be a temporary function
/// or a function.
/// </summary>
/// <param name="functionName">Is either a qualified or unqualified name that designates a
/// function. If no database identifier is provided, it refers to a temporary function or
/// a function in the current database.</param>
/// function. It follows the same resolution rule with SQL: search for built-in/temp
/// functions first then functions in the current database(namespace).</param>
/// <returns>`Function` object which includes the class name, database, description,
/// whether it is temporary and the name of the function.</returns>
public Function GetFunction(string functionName) =>
new Function((JvmObjectReference)Reference.Invoke("getFunction", functionName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built function
/// then pass null as the dbName.
/// Get the function with the specified name in the specified database under the Hive
/// Metastore.
/// To get built-in functions, or functions in other catalogs, please use `getFunction(functionName)` with
/// qualified function name instead.
/// </summary>
/// <param name="dbName">Is a name that designates a database. Built-in functions will be
/// in database null rather than default.</param>
Expand Down
5 changes: 5 additions & 0 deletions src/csharp/Microsoft.Spark/Utils/TypeConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ private static object Convert(object obj, Type toType)
{
return ConvertToDictionary(hashtable, toType);
}
// Fails to convert int to long otherwise
else if (toType.IsPrimitive)
{
return System.Convert.ChangeType(obj, toType);
}

return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ class DotnetBackendHandler(server: DotnetBackend, objectsTracker: JVMObjectTrack
// the .NET side so that .NET side doesn't have to explicitly close the connection via
// "stopBackend." Note that an exception is still thrown if the exit status is non-zero,
// so skipping this kind of exception message does not affect the debugging.
if (!cause.getMessage.contains(
"An existing connection was forcibly closed by the remote host")) {
if (
!cause.getMessage.contains("An existing connection was forcibly closed by the remote host")
&& !cause.getMessage.contains("Connection reset")
) {
logError("Exception caught: ", cause)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ object DotnetRunner extends Logging {

if (!localFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user file $filePath to $driverDir")
Utils.fetchFile(
DotnetUtils.fetchFileWithbackwardCompatibility(
hdfsFilePath,
new File(driverDir),
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ import java.nio.file.attribute.PosixFilePermission
import java.nio.file.attribute.PosixFilePermission._
import java.nio.file.{FileSystems, Files}
import java.util.{Timer, TimerTask}

import org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.hadoop.conf.Configuration
import org.apache.spark.util.Utils
import java.io.File
import java.lang.NoSuchMethodException
import java.lang.reflect.InvocationTargetException
import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream, ZipFile}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -39,6 +45,94 @@ object Utils extends Logging {
val supportPosix: Boolean =
FileSystems.getDefault.supportedFileAttributeViews().contains("posix")

/**
* Provides a backward-compatible implementation of the `fetchFile` method
* from Apache Spark's `org.apache.spark.util.Utils` class.
*
* This method handles differences in method signatures between Spark versions,
* specifically the inclusion or absence of a `SecurityManager` parameter. It uses
* reflection to dynamically resolve and invoke the correct version of `fetchFile`.
*
* @param url The source URL of the file to be fetched.
* @param targetDir The directory where the fetched file will be saved.
* @param conf The Spark configuration object used to determine runtime settings.
* @param hadoopConf Hadoop configuration settings for file access.
* @param timestamp A timestamp indicating the cache validity of the fetched file.
* @param useCache Whether to use Spark's caching mechanism to reuse previously downloaded files.
* @param shouldUntar Whether to untar the downloaded file if it is a tarball. Defaults to `true`.
*
* @return A `File` object pointing to the fetched and stored file.
*
* @throws IllegalArgumentException If neither method signature is found.
* @throws Throwable If an error occurs during reflection or method invocation.
*
* Note:
* - This method was introduced as a fix for DataBricks-specific file copying issues
* and was referenced in PR #1048.
* - Reflection is used to ensure compatibility across Spark environments.
*/
def fetchFileWithbackwardCompatibility(
url: String,
targetDir: File,
conf: SparkConf,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean,
shouldUntar: Boolean = true): File = {

val signatureWithSecurityManager = Array(
classOf[String],
classOf[File],
classOf[SparkConf],
classOf[SecurityManager],
classOf[Configuration],
java.lang.Long.TYPE,
java.lang.Boolean.TYPE,
java.lang.Boolean.TYPE
)

val signatureWithoutSecurityManager = Array(
classOf[String],
classOf[File],
classOf[SparkConf],
classOf[Configuration],
classOf[Long],
classOf[Boolean],
classOf[Boolean]
)

val utilsClass = Class.forName("org.apache.spark.util.Utils$")
val utilsObject = utilsClass.getField("MODULE$").get(null)

val (needSecurityManagerArg, method) = {
try {
(true, utilsClass.getMethod("fetchFile", signatureWithSecurityManager: _*))
} catch {
case _: NoSuchMethodException =>
(false, utilsClass.getMethod("fetchFile", signatureWithoutSecurityManager: _*))
}
}

val args: Seq[Any] =
Seq(
url,
targetDir,
conf
) ++ (if (needSecurityManagerArg) Seq(null) else Nil) ++ Seq(
hadoopConf,
timestamp,
useCache,
shouldUntar)

// Unwrap InvocationTargetException to preserve exception in case of errors:
try {
method.invoke(utilsObject, args.map(_.asInstanceOf[Object]): _*).asInstanceOf[File]
} catch {
case e: InvocationTargetException =>
throw e.getCause()
}
}

/**
* Compress all files under given directory into one zip file and drop it to the target directory
*
Expand Down Expand Up @@ -225,7 +319,8 @@ object Utils extends Logging {
throw new IllegalArgumentException(
s"Unsupported spark version used: '$sparkVersion'. " +
s"Normalized spark version used: '$normalizedSparkVersion'. " +
s"Supported versions: '$supportedVersions'.")
s"Supported versions: '$supportedVersions'." +
"Patch version can be ignored, use setting 'spark.dotnet.ignoreSparkPatchVersionCheck'" )
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ class UtilsTest {
assertEquals(
s"Unsupported spark version used: '$sparkVersion'. " +
s"Normalized spark version used: '$normalizedSparkVersion'. " +
s"Supported versions: '${supportedSparkVersions.toSeq.sorted.mkString(", ")}'.",
s"Supported versions: '${supportedSparkVersions.toSeq.sorted.mkString(", ")}'." +
"Patch version can be ignored, use setting 'spark.dotnet.ignoreSparkPatchVersionCheck'",

exception.getMessage)
}

Expand Down

0 comments on commit c7f4ae1

Please sign in to comment.