Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python/pyspark/sql/_typing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ from typing import (
List,
Optional,
Tuple,
TypedDict,
TypeVar,
Union,
)
Expand Down Expand Up @@ -85,4 +86,8 @@ class UserDefinedFunctionLike(Protocol):
def __call__(self, *args: ColumnOrName) -> Column: ...
def asNondeterministic(self) -> UserDefinedFunctionLike: ...

ProfileResults = Dict[Union[int, str], Tuple[Optional[pstats.Stats], Optional[CodeMapDict]]]
class ProfileResult(TypedDict, total=False):
perf: pstats.Stats
memory: CodeMapDict

ProfileResults = Dict[Union[int, str], ProfileResult]
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ConnectProfilerCollector(ProfilerCollector):

def __init__(self) -> None:
super().__init__()
self._value = ProfileResultsParam.zero(None)
self._value = ProfileResultsParam.zero({})

@property
def _profile_results(self) -> "ProfileResults":
Expand Down
91 changes: 43 additions & 48 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,32 @@
from pyspark.sql._typing import ProfileResults


class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]):
class _ProfileResultsParam(AccumulatorParam["ProfileResults"]):
"""
AccumulatorParam for profilers.
"""

@staticmethod
def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]:
return value
def zero(value: "ProfileResults") -> "ProfileResults":
return {}

@staticmethod
def addInPlace(
value1: Optional["ProfileResults"], value2: Optional["ProfileResults"]
) -> Optional["ProfileResults"]:
if value1 is None or len(value1) == 0:
value1 = {}
if value2 is None or len(value2) == 0:
value2 = {}

value = value1.copy()
for key, (perf, mem, *_) in value2.items():
if key in value1:
orig_perf, orig_mem, *_ = value1[key]
def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileResults":
for key, result in value2.items():
if key not in value1:
value1[key] = result
else:
orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None))
value[key] = (
PStatsParam.addInPlace(orig_perf, perf),
MemUsageParam.addInPlace(orig_mem, mem),
)
return value
perf = PStatsParam.addInPlace(
value1[key].get("perf", None), result.get("perf", None)
)
if perf is not None:
value1[key]["perf"] = perf
memory = MemUsageParam.addInPlace(
value1[key].get("memory", None), result.get("memory", None)
)
if memory is not None:
value1[key]["memory"] = memory
return value1


ProfileResultsParam = _ProfileResultsParam()
Expand All @@ -94,7 +91,7 @@ class WorkerPerfProfiler:
"""

def __init__(
self, accumulator: Accumulator[Optional["ProfileResults"]], result_key: Union[int, str]
self, accumulator: Accumulator["ProfileResults"], result_key: Union[int, str]
) -> None:
self._accumulator = accumulator
self._profiler = cProfile.Profile()
Expand All @@ -111,7 +108,7 @@ def save(self) -> None:
# make it picklable
st.stream = None # type: ignore[attr-defined]
st.strip_dirs()
self._accumulator.add({self._result_key: (st, None)})
self._accumulator.add({self._result_key: {"perf": st}})

def __enter__(self) -> "WorkerPerfProfiler":
self.start()
Expand All @@ -134,7 +131,7 @@ class WorkerMemoryProfiler:

def __init__(
self,
accumulator: Accumulator[Optional["ProfileResults"]],
accumulator: Accumulator["ProfileResults"],
result_key: Union[int, str],
func_or_code: Union[Callable, CodeType],
) -> None:
Expand All @@ -159,7 +156,7 @@ def save(self) -> None:
filename: list(line_iterator)
for filename, line_iterator in self._profiler.code_map.items()
}
self._accumulator.add({self._result_key: (None, codemap_dict)})
self._accumulator.add({self._result_key: {"memory": codemap_dict}})

def __enter__(self) -> "WorkerMemoryProfiler":
self.start()
Expand Down Expand Up @@ -226,9 +223,9 @@ def show(id: Union[int, str]) -> None:
def _perf_profile_results(self) -> Dict[Union[int, str], pstats.Stats]:
with self._lock:
return {
result_id: perf
for result_id, (perf, _, *_) in self._profile_results.items()
if perf is not None
result_id: result["perf"]
for result_id, result in self._profile_results.items()
if result.get("perf", None) is not None
}

def show_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None:
Expand Down Expand Up @@ -272,9 +269,9 @@ def show(id: Union[int, str]) -> None:
def _memory_profile_results(self) -> Dict[Union[int, str], CodeMapDict]:
with self._lock:
return {
result_id: mem
for result_id, (_, mem, *_) in self._profile_results.items()
if mem is not None
result_id: result["memory"]
for result_id, result in self._profile_results.items()
if result.get("memory", None) is not None
}

@property
Expand Down Expand Up @@ -368,15 +365,14 @@ def clear_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None:
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
self._profile_results[id].pop("perf", None)
if not self._profile_results[id]:
self._profile_results.pop(id)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
for id in list(self._profile_results.keys()):
self._profile_results[id].pop("perf", None)
if not self._profile_results[id]:
self._profile_results.pop(id)

def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None:
"""
Expand All @@ -393,15 +389,14 @@ def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None:
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id]
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
self._profile_results[id].pop("memory", None)
if not self._profile_results[id]:
self._profile_results.pop(id)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
for id in list(self._profile_results.keys()):
self._profile_results[id].pop("memory", None)
if not self._profile_results[id]:
self._profile_results.pop(id)


class AccumulatorProfilerCollector(ProfilerCollector):
Expand All @@ -411,7 +406,7 @@ def __init__(self) -> None:
self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER]
else:
self._accumulator = Accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
)

@property
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def test_perf_profiler_clear(self):
class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
super().setUp()
self.spark._profiler_collector._accumulator._value = None
self.spark._profiler_collector._accumulator._value = {}


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None:

_accumulatorRegistry.clear()
accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
)

if main.__module__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ def test_profilers_clear(self):
class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
super().setUp()
self.spark._profiler_collector._accumulator._value = None
self.spark._profiler_collector._accumulator._value = {}


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ def wrap_perf_profiler(f, eval_type, result_id):
from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler

accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
)

if _is_iter_based(eval_type):
Expand Down Expand Up @@ -1103,7 +1103,7 @@ def wrap_memory_profiler(f, eval_type, result_id):
return f

accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
)

if _is_iter_based(eval_type):
Expand Down