I am attempting to create user defined functions for spark dataframes, but the engine crashes when using them on Windows.
But the creation source of the dataframe determines if it crashes or not.
#%%
import pyspark.pandas as ppd
import pyspark.sql as ss
import pyspark.sql.functions as psFunc
import pyspark.sql.types as sst
mySpark = ss.SparkSession.builder.appName('test') \
.config("spark.sql.catalogImplementation", "hive") \
.config("spark.sql.legacy.createHiveTableByDefault", "false")\
.config("spark.sql.repl.eagerEval.enabled", "true") \
.config("spark.sql.sources.default", "parquet") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \ #Crashes without this too.
.config("spark.sql.ansi.enabled", "false") \
.config("spark.python.worker.faulthandler.enabled", "true") \
.config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
.enableHiveSupport().getOrCreate()
# .config("spark.executor.memory", "6g") \
# .config("spark.executor.memoryOverhead", "1g") \
# .config("spark.delta.catalog.update.enabled", "false") \
DECIMAL_STR_LIST = ["1",
"2.2",
"3.3",
"44.4",
"555.55",
"654321.12",
"7.75",
"-800000.8",
"None",
"1000"]
def noChange(val: str) -> str:
return str(val)
# %%
#Test using Spark Dataframe created from Pandas on Spark dataframe. This does NOT crash.
cols = {"aDecimal82": DECIMAL_STR_LIST}
df2 = ppd.DataFrame(cols)
sparkDf = df2.to_spark()
ncUdfFunc = psFunc.udf(noChange, sst.StringType())
newCol = ncUdfFunc(psFunc.col("aDecimal82")).alias("aDecimal82")
newDf = sparkDf.select([newCol])
print(newDf)
# %%
#Test using Spark Dataframe created directly. This DOES crash.
cols = {"aDecimal82": DECIMAL_STR_LIST}
cols2 = list(map(list, zip(*cols)))
sparkDf = mySpark.createDataFrame(cols2, ["aDecimal82"])
ncUdfFunc = psFunc.udf(noChange, sst.StringType())
newCol = ncUdfFunc(psFunc.col("aDecimal82")).alias("aDecimal82")
newDf = sparkDf.select([newCol])
print(newDf)
I am attempting to create user defined functions for spark dataframes, but the engine crashes when using them on Windows.
But the creation source of the dataframe determines if it crashes or not.
Python Version: 3.12.10
Pandas Version: 2.2.3
Spark Version: 4.2.0-preview5
numpy Version: 2.1.3
pyArrow Version: 19.0.1
The final part of the error message:
Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 36) (machine_name executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
Lots of intermediate errors:
Caused by: java.io.IOException: An established connection was aborted by the software in your host machine
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:76)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:975)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:879)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1053)
... 45 more