Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add penalty for row conversions #471

Merged
merged 25 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5008ad8
Qualification tool: Add penalty for row conversions
nartal1 Aug 1, 2023
a32d81c
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into apple…
nartal1 Aug 1, 2023
af962b8
optimize code
nartal1 Aug 2, 2023
1ae0eaa
addressed review comments
nartal1 Aug 8, 2023
7da1acf
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 14, 2023
e2dbb12
addressed review comments and added test
nartal1 Aug 18, 2023
29f0e8c
fix unit test
nartal1 Aug 22, 2023
a52803a
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Aug 22, 2023
7b6803f
addressed review comments
nartal1 Aug 23, 2023
2607489
addressed review comments and updated test results
nartal1 Aug 28, 2023
f8a867f
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 19, 2023
0c5b342
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Sep 22, 2023
89fccf4
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 8, 2023
2ba211a
Address review comments
nartal1 Oct 9, 2023
5e06370
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 9, 2023
8dfa245
address review comments
nartal1 Oct 10, 2023
7e1ebe0
update tests
nartal1 Oct 11, 2023
132605f
Revert "update tests"
nartal1 Oct 11, 2023
f608365
add penalty to durations
nartal1 Oct 12, 2023
f8c8d40
change transitiontime calculation
nartal1 Oct 13, 2023
db6eaf0
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 13, 2023
36f8d56
update variable name
nartal1 Oct 13, 2023
4a8ab84
addressed review comments
nartal1 Oct 13, 2023
b17df32
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
nartal1 Oct 14, 2023
052b22f
change penaly percentage
nartal1 Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ object QualOutputWriter {
val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup"
val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved"
val STAGE_ESTIMATED_STR = "Stage Estimated"
val NUM_TRANSITIONS = "Number of CPU-GPU Transitions"
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
val UNSUPPORTED_EXECS = "Unsupported Execs"
val UNSUPPORTED_EXPRS = "Unsupported Expressions"
val CLUSTER_TAGS = "Cluster Tags"
Expand Down Expand Up @@ -856,7 +857,8 @@ object QualOutputWriter {
AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size,
STAGE_DUR_STR -> STAGE_DUR_STR.size,
UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size,
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size,
NUM_TRANSITIONS -> NUM_TRANSITIONS.size
)
detailedHeadersAndFields
}
Expand All @@ -878,7 +880,8 @@ object QualOutputWriter {
headersAndSizes(AVERAGE_SPEEDUP_STR),
info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR),
info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR),
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR))
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR),
info.numTransitions.toString -> headersAndSizes(NUM_TRANSITIONS))
constructOutputRow(data, delimiter, prettyPrint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean,
printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean,
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging {
reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean,
ignoreTransitions: Boolean) extends Logging {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand Down Expand Up @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
try {
val startTime = System.currentTimeMillis()
val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker,
reportSqlLevel, mlOpsEnabled)
reportSqlLevel, mlOpsEnabled, ignoreTransitions)
val qualAppResult = appResult match {
case Left(errorMessage: String) =>
// Case when an error occurred during QualificationAppInfo creation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
opt[Boolean](required = false,
descr = "Whether to parse ML functions in the eventlogs. Default is false.",
default = Some(false))
val ignoreTransitions: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Whether to ignore durations for ColumnarToRow and RowToColumnar transitions " +
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
"in the eventlogs while calculating the speedup. Default is false.",
default = Some(false))
val sparkProperty: ScallopOption[List[String]] =
opt[List[String]](required = false,
descr = "Filter applications based on certain Spark properties that were set during " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object QualificationMain extends Logging {
val reportSqlLevel = appArgs.perSql.getOrElse(false)
val platform = appArgs.platform.getOrElse("onprem")
val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false)
val ignoreTransitions = appArgs.ignoreTransitions.getOrElse(false)

val hadoopConf = RapidsToolsConfUtil.newHadoopConf

Expand Down Expand Up @@ -93,7 +94,7 @@ object QualificationMain extends Logging {

val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled,
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled)
enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, ignoreTransitions)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.rapids.tool.qualification

import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
Expand All @@ -37,7 +39,8 @@ class QualificationAppInfo(
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
perSqlOnly: Boolean = false,
mlOpsEnabled: Boolean = false)
mlOpsEnabled: Boolean = false,
ignoreTransitions: Boolean = false)
extends AppBase(eventLogInfo, hadoopConf) with Logging {

var appId: String = ""
Expand All @@ -51,6 +54,8 @@ class QualificationAppInfo(
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] =
HashMap.empty[Long, StageTaskQualificationSummary]
val stageIdToGpuCpuTransitions: HashMap[Int, Int] = HashMap.empty[Int, Int]
var execsNoStageTransitions: Int = 0
nartal1 marked this conversation as resolved.
Show resolved Hide resolved

val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]
val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]]
Expand Down Expand Up @@ -161,11 +166,11 @@ class QualificationAppInfo(
}

private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = {
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum
all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum - calculateNoExecsStageDurations(all)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I'm a bit unclear how this work with the job overhead we add later and/or the mapping we try to do with execs without stages already.

is this counting it twice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this.

}

private def calculateSQLUnsupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = {
all.map(_.unsupportedTaskDur).sum
all.map(_.unsupportedTaskDur).sum + calculateNoExecsStageDurations(all)
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
}

private def calculateSpeedupFactor(all: Seq[StageQualSummaryInfo]): Double = {
Expand All @@ -174,6 +179,23 @@ class QualificationAppInfo(
res
}

private def calculateNoExecsStageDurations(all: Seq[StageQualSummaryInfo]): Long = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit rename to calculateExecsNoStageDurations or actually this is durations due to transitions? then it should have something liek that in the name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method. Filed a follow on issue to add penalties for execs not associated with any stages - #514

// If there are Execs not associated with any stage, then some of the Execs may not be
// supported on GPU. We need to estimate the duration of these Execs and add it to the
// unsupportedTaskDur. We estimate the duration by taking the average of the unsupportedTaskDur
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this estimation. We are trying to give some penalty for execs that have transitions but don't map to a stage (ie we don't have a duration), correct?

I'm wondering if we are already calculating this with like either the stages with no execs or job overhead time.

// of all the stages and multiplying it by the number of Execs that are not associated with
// any stage. We multiply with a penalty factor of 0.05
// TODO: Need to come up with better heuristics for penalty factor.
val unsupportedTasksize= all.map(_.unsupportedTaskDur).size
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
if (execsNoStageTransitions != 0 && unsupportedTasksize != 0) {
execsNoStageTransitions * (
all.map(_.unsupportedTaskDur).sum / unsupportedTasksize) * 0.05
}.toLong
else {
0L
}
}

private def getAllReadFileFormats: Seq[String] = {
dataSourceInfo.map { ds =>
s"${ds.format.toLowerCase()}[${ds.schema}]"
Expand Down Expand Up @@ -241,13 +263,79 @@ class QualificationAppInfo(
stages.map { stageId =>
val stageTaskTime = stageIdToTaskEndSum.get(stageId)
.map(_.totalTaskDuration).getOrElse(0L)
val numTransitions = ignoreTransitions match {
case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
case true => 0
}
// val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out line

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val transitionsTime = numTransitions match {
case 0 => 0L // no transitions
case gpuCpuTransitions if gpuCpuTransitions > 0 =>
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
// Duration in Spark metrics is in milliseconds and CPU-GPU transfer rate is in bytes/sec.
// So we need to convert the transitions time to milliseconds.
val totalBytesRead =
stageIdToTaskEndSum.get(stageId).map(_.totalbytesRead).getOrElse(0L)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
if (totalBytesRead > 0) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
TimeUnit.SECONDS.toMillis(
totalBytesRead / QualificationAppInfo.CPU_GPU_TRANSFER_RATE) * gpuCpuTransitions
} else {
0L
}
case _ => 0L
}
// Update totaltaskduration of stageIdToTaskEndSum to include transitions time
val stageIdToTasksMetrics = stageIdToTaskEndSum.get(stageId).orElse(None)
if (stageIdToTasksMetrics.isDefined) {
stageIdToTasksMetrics.get.totalTaskDuration += transitionsTime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this, why are we changing the task durations here? this has traditionally been the real task durations then we add/remove things later. Is this because supported + unsupported is now longer due to the transition Time? it seems odd to change it in this datastructure that is the real values from file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done as we are adding transitionTime to unsupportedTaskDuration i.e unsupportedTaskDuration=eachStageUnsupported + transitionsTime . So the totalTaskDuration should also incude transitionTime else we will end up in a case where unsupportedTaskDuration > totalTaskDuration (which would be incorrect)

Copy link
Collaborator Author

@nartal1 nartal1 Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this code. Now we are considering transitionTime in unsupportedDurations only. stageTaskTime is the totalTaskDuration from the eventlog. Returning transitionTime from StageQualSummaryInfo so that it could be used in calculation of calculateNonSQLTaskDataframeDuration

}
StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime,
eachStageUnsupported, estimated)
eachStageUnsupported + transitionsTime, estimated, numTransitions)
}.toSet
}

def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = {
val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos)

// Get the total number of transitions between CPU and GPU for each stage and
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
// store it in a Map.
allStagesToExecs.foreach { case (stageId, execs) =>
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// Flatten all the Execs within a stage.
// Example: Exchange;WholeStageCodegen (14);Exchange;WholeStageCodegen (13);Exchange
// will be flattened to Exchange;Sort;Exchange;Sort;SortMergeJoin;SortMergeJoin;Exchange;
val allExecs = execs.map(x => if (x.exec.startsWith("WholeStage")) {
x.children.getOrElse(Seq.empty)
} else {
Seq(x)
}).flatten.reverse

// If it's a shuffle stage, then we need to keep the first and last Exchange and remove
// all the intermediate Exchanges as input size is captured in Exchange node.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val dedupedExecs = if (allExecs.size > 2) {
allExecs.head +:
allExecs.tail.init.filter(x => x.exec != "Exchange") :+ allExecs.last
} else {
allExecs
}
// Create a list of transitions by zipping allExecs with itself but with the first element
// This will create a list of adjacent pairs.
// Example: If allExecs = (ScanExec, FilterExec, SortExec, ProjectExec), then it will create
// a list of tuples as follows:
// (ScanExec, FilterExec), (FilterExec, SortExec), (SortExec, ProjectExec)
val transitions = dedupedExecs.zip(dedupedExecs.drop(1)).count {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// If the current execution (currExec) is supported, and the next execution (nextExec)
// is not supported, or if the current execution is not supported and the next execution
// is supported, then we consider this as a transition.
case (currExec, nextExec) => (currExec.isSupported && !nextExec.isSupported) ||
(!currExec.isSupported && nextExec.isSupported)
}
stageIdToGpuCpuTransitions(stageId) = transitions
}
if (execsNoStage.nonEmpty) {
execsNoStageTransitions += execsNoStage.filterNot(exec => exec.isSupported).size
}
if (allStagesToExecs.isEmpty) {
// use job level
// also get the job ids associated with the SQLId
Expand Down Expand Up @@ -670,7 +758,8 @@ class StageTaskQualificationSummary(
val stageAttemptId: Int,
var executorRunTime: Long,
var executorCPUTime: Long,
var totalTaskDuration: Long)
var totalTaskDuration: Long,
var totalbytesRead: Long)

case class QualApplicationInfo(
appName: String,
Expand Down Expand Up @@ -736,7 +825,8 @@ case class StageQualSummaryInfo(
averageSpeedup: Double,
stageTaskTime: Long,
unsupportedTaskDur: Long,
estimated: Boolean = false)
estimated: Boolean = false,
numTransitions: Int)

object QualificationAppInfo extends Logging {
// define recommendation constants
Expand All @@ -746,6 +836,13 @@ object QualificationAppInfo extends Logging {
val NOT_APPLICABLE = "Not Applicable"
val LOWER_BOUND_RECOMMENDED = 1.3
val LOWER_BOUND_STRONGLY_RECOMMENDED = 2.5
// Below is the total time taken whenever there are ColumnarToRow or RowToColumnar transitions
// This includes the time taken to convert the data from one format to another and the time taken
// to transfer the data from CPU to GPU and vice versa. Current transfer rate is 10MB/s and is
// based on the testing on few eventlogs.
// TODO: Need to test this on more eventlogs including NDS queries
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// and come up with a better transfer rate.
val CPU_GPU_TRANSFER_RATE = 10000000L
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

private def handleException(e: Exception, path: EventLogInfo): String = {
val message: String = e match {
Expand Down Expand Up @@ -838,10 +935,11 @@ object QualificationAppInfo extends Logging {
hadoopConf: Configuration,
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = {
mlOpsEnabled: Boolean,
ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = {
try {
val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker,
reportSqlLevel, false, mlOpsEnabled)
reportSqlLevel, false, mlOpsEnabled, ignoreTransitions)
logInfo(s"${path.eventLog.toString} has App: ${app.appId}")
Right(app)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
super.doSparkListenerTaskEnd(app, event)
// keep all stage task times to see for nonsql duration
val taskSum = app.stageIdToTaskEndSum.getOrElseUpdate(event.stageId, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
taskSum.totalTaskDuration += event.taskInfo.duration
// Add the total bytes read from the task if it's available. This is from inputMetrics if
// it is reading from datasource, or shuffleReadMetrics if it is reading from shuffle.
val inputMetrics = event.taskMetrics.inputMetrics
if (inputMetrics != null) {
taskSum.totalbytesRead += inputMetrics.bytesRead
}
val shuffleReadMetrics = event.taskMetrics.shuffleReadMetrics
if (shuffleReadMetrics != null) {
taskSum.totalbytesRead += shuffleReadMetrics.totalBytesRead
}

// Adds in everything (including failures)
app.stageIdToSqlID.get(event.stageId).foreach { sqlID =>
val taskSum = app.sqlIDToTaskEndSum.getOrElseUpdate(sqlID, {
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0)
new StageTaskQualificationSummary(event.stageId, event.stageAttemptId, 0, 0, 0, 0)
})
taskSum.executorRunTime += event.taskMetrics.executorRunTime
taskSum.executorCPUTime += NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569385.42,2581.57,3627,19894,571967,3503,28.41,"","JDBC[*]","","","","",1812,544575,677,19217,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569387.57,2579.42,3627,19894,571967,3500,28.41,"","JDBC[*]","","","","",1812,544575,693,19201,3.8,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8472.65,7846.34,12434,132257,16319,10589,37.7,"","","JSON","","","",7143,4717,19616,112641,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1
"Spark shell","local-1651187225439","Not Recommended",1.0,355483.43,153.56,760,180,355637,350,87.88,"","JSON[string:bigint:int]","","","","",498,343411,97,83,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1
"Spark shell","local-1651188809790","Not Recommended",1.0,166199.97,15.02,911,283,166215,45,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,269,14,1.5,false,"CollectLimit;Scan json;Project","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,0,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,4664,2,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Recommended",1.92,8477.87,7841.12,12434,132257,16319,10582,37.7,"","","JSON","","","",7143,4717,19691,112566,3.86,false,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",1
"Spark shell","local-1651187225439","Not Recommended",1.0,355490.83,146.16,760,180,355637,333,87.88,"","JSON[string:bigint:int]","","","","",498,343411,101,79,1.78,false,"SerializeFromObject;CollectLimit;DeserializeToObject;Scan json;Filter;MapElements","",1
"Spark shell","local-1651188809790","Not Recommended",1.0,166213.92,1.07,911,283,166215,3,81.18,"","JSON[string:bigint:int]","","","","UDF",715,133608,282,1,1.5,false,"CollectLimit;Scan json;Project","UDF",1
"Rapids Spark Profiling Tool Unit Tests","local-1623281204390","Not Recommended",1.0,6240.0,0.0,2032,4666,6240,-151,46.27,"","JSON[string:bigint:int]","JSON","","","UDF",1209,5793,5013,-347,1.0,false,"Execute InsertIntoHadoopFsRelationCommand json;LocalTableScan;Project;Scan json;Execute CreateViewCommand","UDF",1
Loading