Skip to content

Conversation

@Kimahriman
Copy link
Contributor

@Kimahriman Kimahriman commented Nov 3, 2025

What changes were proposed in this pull request?

Updates the v2 Spark-session based Python UDF profiler to support profiling iterator based UDFs.

from collections.abc import Iterator
from pstats import SortKey
import pyarrow as pa

df = spark.range(100000)

def map_func(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
    for batch in iter:
        yield pa.RecordBatch.from_arrays([pa.compute.add(batch.column("id"), 10)], ["id"])

spark.conf.set('spark.sql.pyspark.udf.profiler', 'perf')
df.mapInArrow(map_func, df.schema).collect()

spark.conf.set('spark.sql.pyspark.udf.profiler', 'memory')
df.mapInArrow(map_func, df.schema).collect()

for stats in spark.profile.profiler_collector._perf_profile_results.values():
    stats.sort_stats(SortKey.CUMULATIVE).print_stats(20)

spark.profile.show(type="memory")
1395288 function calls (1359888 primitive calls) in 2.850 seconds

   Ordered by: cumulative time
   List reduced from 1546 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      416    0.008    0.000    5.901    0.014 __init__.py:1(<module>)
   424/24    0.000    0.000    2.850    0.119 {built-in method builtins.next}
       24    0.001    0.000    2.850    0.119 test.py:11(map_func)
       16    0.002    0.000    2.646    0.165 compute.py:244(wrapper)
  2752/24    0.016    0.000    2.642    0.110 <frozen importlib._bootstrap>:1349(_find_and_load)
  2752/24    0.013    0.000    2.641    0.110 <frozen importlib._bootstrap>:1304(_find_and_load_unlocked)
       64    0.002    0.000    2.618    0.041 api.py:1(<module>)
  2704/24    0.009    0.000    2.612    0.109 <frozen importlib._bootstrap>:911(_load_unlocked)
  2264/24    0.005    0.000    2.611    0.109 <frozen importlib._bootstrap_external>:993(exec_module)
  6336/48    0.004    0.000    2.591    0.054 <frozen importlib._bootstrap>:480(_call_with_frames_removed)
  2400/24    0.023    0.000    2.591    0.108 {built-in method builtins.exec}
       24    0.002    0.000    1.927    0.080 generic.py:1(<module>)
  520/320    0.002    0.000    1.429    0.004 {built-in method builtins.__import__}
4312/2896    0.006    0.000    1.190    0.000 <frozen importlib._bootstrap>:1390(_handle_fromlist)
        8    0.001    0.000    1.014    0.127 frame.py:1(<module>)
4392/4312    0.069    0.000    0.953    0.000 {built-in method builtins.__build_class__}
     2264    0.030    0.000    0.562    0.000 <frozen importlib._bootstrap_external>:1066(get_code)
       16    0.001    0.000    0.551    0.034 indexing.py:1(<module>)
       24    0.001    0.000    0.451    0.019 datetimes.py:1(<module>)
       16    0.001    0.000    0.401    0.025 datetimelike.py:1(<module>)


============================================================
Profile of UDF<id=3>
============================================================
Filename: /data/projects/spark/python/test.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    11   1212.1 MiB   1212.1 MiB           8   def map_func(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
    12   1212.1 MiB     -0.2 MiB          24       for batch in iter:
    13   1212.1 MiB     -0.2 MiB          32           yield pa.RecordBatch.from_arrays([pa.compute.add(batch.column("id"), 10)], ["id"])

Why are the changes needed?

To add valuable profiling support to all types of UDFs.

Does this PR introduce any user-facing change?

Yes, iterator based Python UDFs can now be profiled with the SQL config based profiler.

How was this patch tested?

Updated UTs that were specifically testing that this wasn't supported to show they are now supported.

Was this patch authored or co-authored using generative AI tooling?

No

@Kimahriman
Copy link
Contributor Author

@zhengruifeng

@Kimahriman
Copy link
Contributor Author

The OG profilers via spark.python.profile and spark.python.profile.memoryare a little trickier because of the Profiler instances wrapping all the logic. I can try to tackle those separately

return profiling_func


def wrap_iter_perf_profiler(f, result_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new wrap function here? We only need a different profiling_func right? Should we do wrap_perf_profiler(f, eval_type, result_id) and do all the logic inside this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about doing that, I can update

return f, args_offsets


def _supports_profiler(eval_type: int) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ueshin to check whether iterator-based UDFs can be supported in memory profiler now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some code to support memory profiler: https://github.com/apache/spark/pull/52853/files#diff-e91c3ea5c69ba1ce65bbaae56a7164a927bb013aac053d5167936ce9c9522051R1228?
Also the description contains the example output.

@ueshin
Copy link
Member

ueshin commented Nov 4, 2025

cc @allisonwang-db too

@zhengruifeng zhengruifeng changed the title [SPARK-54153] Support profiling iterator based Python UDFs [SPARK-54153][PYTHON] Support profiling iterator based Python UDFs Nov 4, 2025
Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome! Let me try something in my local, but basically LGTM.

jobArtifactUUID,
sessionUUID)
sessionUUID,
None)
Copy link
Member

@ueshin ueshin Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also enable it here? IIUC, this will run on executors, so it would help to debug python datasources. cc @allisonwang-db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep added it and a simple test for it

@pandas_udf("long")
def add1(x):
return x + 1
def add1(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code has 2 UDFs because one of them is not supported. It tests how an unsupported UDF works with a supported UDF. Now having two exact same UDFs does not make sense anymore. It's a bit confusing actually.

I would suggest to make this simple enough. Just test how a single iterator UDF works with profiler and confirm that we can read profiling data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I wasn't sure how much to modify things, I can update

def profiling_func(*args, **kwargs):
profiler = UDFLineProfilerV2()

wrapped = profiler(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would highly suggest making the two profiling_func use the same path. We should either use add_function then with, or call profiler. I'm assuming the wrapped way is not easily applicable to iterators. I think it will eliminate a lot of confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, updated to use the same context manager approach. And yeah the explicit add_function is necessary for the iterator approach to work

if _supports_profiler(eval_type) and has_memory_profiler:
profiling_func = wrap_memory_profiler(chained_func, result_id)
else:
if not has_memory_profiler:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably move this check to wrap_memory_profiler() and let it handle the logic. I always think we should handle these logic in one location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants