Collect and aggregate on spark events for profitz. In 🐍 way!
pip install pyspark-spy
You register a listener
import pyspark_spy
listener = pyspark_spy.PersistingSparkListener()
pyspark_spy.register_listener(spark_context, listener)
Execute your spark job as usual
spark_context.range(1, 100).count()
And you have all metrics collected!
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
Look Ma, no actions!
Tested on python 3.5 - 3.7 and pyspark 2.3 - 3.0
-
pyspark_spy.interface.SparkListener
- Base listener class. It defineson_spark_event(event_name, java_event)
method that you can implement yourself for custom logic when any event is received. -
LoggingSparkListener
- just logs event names received into supplied or automatically created logger. -
StdoutSparkListener
- writes event names into stdout -
PersistingSparkListener
- saves spark events into internal buffer -
ContextSparkListener
- same as PersistingSparkListener but also allows you to record only events occured within python context manager scope. More on that later
Spark events collected (as java objects):
- applicationEnd
- applicationStart
- blockManagerRemoved
- blockUpdated
- environmentUpdate
- executorAdded
- executorMetricsUpdate
- executorRemoved
- jobEnd
- jobStart
- otherEvent
- stageCompleted
- stageSubmitted
- taskEnd
- taskGettingResult
- taskStart
- unpersistRDD
listener.java_events['executorMetricsUpdate'] # -> List of py4j java objects
View all possible spark events and their fields https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/scheduler/SparkListener.html
Events converted to python objects:
- jobEnd
- stageCompleted
listener.python_events['jobEnd'] # -> List of java events converted to typed namedtuples.
listener.jobEnd # same
Only in PersistingSparkListener
and ContextSparkListener
stage_input_metrics_aggregate
- sums up allstageCompleted
event inputMetrics into one
print(listener.stage_input_metrics_aggregate())
InputMetrics(bytesRead=21574, recordsRead=584)
stage_output_metrics_aggregate
- sums up allstageCompleted
event outputMetrics into one
print(listener.stage_output_metrics_aggregate())
OutputMetrics(bytesWritten=12861, recordsWritten=2426)
To collect events from different actions and to build separate aggregations, use ContextSparkListener
.
listener = ContextSparkListener()
register_listener(sc, listener)
with listener as events: # events is basically another listener
run_spark_job()
events.stage_output_metrics_aggregate() # events collected only within context manager
with listener as events_2:
run_other_spark_job()
events_2.stage_output_metrics_aggregate() # metrics collected during second job
listener.stage_output_metrics_aggregate() # metrics collected for all jobs