Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pom to fail on warnings #701

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.scaladoc.skip>false</maven.scaladoc.skip>
<maven.scalastyle.skip>false</maven.scalastyle.skip>
<project.build.sourceEncoding>${platform-encoding}</project.build.sourceEncoding>
<project.reporting.sourceEncoding>${platform-encoding}</project.reporting.sourceEncoding>
<project.reporting.outputEncoding>${platform-encoding}</project.reporting.outputEncoding>
Expand Down Expand Up @@ -702,8 +704,10 @@
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-Yno-adapted-args</arg>
<arg>-Ywarn-unused:imports</arg>
<arg>-Ywarn-unused:imports,locals,patvars,privates</arg>
<arg>-Xlint:missing-interpolator</arg>
<arg>-Xfatal-warnings</arg>
<arg>-Wconf:cat=lint-adapted-args:e</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object EventLogPathProcessor extends Logging {
}.toMap
}
} catch {
case fe: FileNotFoundException =>
case _: FileNotFoundException =>
logWarning(s"$pathString not found, skipping!")
Map.empty[EventLogInfo, Long]
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ object PlatformFactory extends Logging {
*
* @param platformKey The key representing the desired platform.
* @return An instance of the specified platform.
* @throws IllegalArgumentException if the specified platform key is not supported.
*/
@throws[IllegalArgumentException]
@tailrec
def createInstance(platformKey: String = PlatformNames.DEFAULT): Platform = {
platformKey match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ object DataWritingCommandExecParser {
val saveIntoDataSrcCMD = "SaveIntoDataSourceCommand"
val insertIntoHadoopCMD = "InsertIntoHadoopFsRelationCommand"

// List of writeExecs that represent a physical command.
// Note: List of writeExecs that represent a physical command.
// hardcode because InsertIntoHadoopFsRelationCommand uses this same exec
// and InsertIntoHadoopFsRelationCommand doesn't have an entry in the
// supported execs file
private val physicalWriteCommands = Set(
defaultPhysicalCMD
)
// supported execs file Set(defaultPhysicalCMD)


// A set of the logical commands that will be mapped to the physical write command
// which has an entry in the speedupSheet
Expand All @@ -76,13 +74,9 @@ object DataWritingCommandExecParser {
saveIntoDataSrcCMD
)

// Defines a list of the execs that include formatted data.
// Note: Defines a list of the execs that include formatted data.
// This will be used to extract the format and then check whether the
// format is supported or not.
private val formattedWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD
)
// format is supported or not. Set(dataWriteCMD, insertIntoHadoopCMD)

// For now, we map the SaveIntoDataSourceCommand to defaultPhysicalCMD because we do not
// have speedup entry for the deltaLake write operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}
val allJobStageRows = apps.flatMap { app =>
app.jobIdToInfo.flatMap { case (id, jc) =>
app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
}
if (stagesInJob.isEmpty) {
None
} else {
stagesInJob.map { case ((id, said), sc) =>
stagesInJob.map { case ((id, _), sc) =>
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == id
}
Expand Down Expand Up @@ -153,7 +153,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
// stages that are missing from a job, perhaps dropped events
val stagesWithoutJobs = apps.flatMap { app =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (id, jc) =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
Expand Down Expand Up @@ -224,15 +224,13 @@ class Analysis(apps: Seq[ApplicationInfo]) {
} else {
Seq.empty
}

}

// SQL Level TaskMetrics Aggregation(Only when SQL exists)
def sqlMetricsAggregation(): Seq[SQLTaskAggMetricsProfileResult] = {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
val jcid = jc.sqlID.getOrElse(-1)
jc.sqlID.getOrElse(-1) == sqlId
}
if (jcs.isEmpty) {
Expand Down Expand Up @@ -431,10 +429,6 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}

val groupedTasks = tasksWithSkew.groupBy { tc =>
(tc.stageId, tc.stageAttemptId)
}

tasksWithSkew.map { tc =>
val avgShuffleDur = avgsStageInfos.get((tc.stageId, tc.stageAttemptId))
avgShuffleDur match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
val rp = app.resourceProfIdToInfo.get(rpId)
val execMem = rp.map(_.executorResources.get(ResourceProfile.MEMORY)
.map(_.amount).getOrElse(0L))
val execCores = rp.map(_.executorResources.get(ResourceProfile.CORES)
.map(_.amount).getOrElse(0L))
val execGpus = rp.map(_.executorResources.get("gpu")
.map(_.amount).getOrElse(0L))
val taskCpus = rp.map(_.taskResources.get(ResourceProfile.CPUS)
Expand Down Expand Up @@ -177,7 +175,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
// get job related information
def getJobInfo: Seq[JobInfoProfileResult] = {
val allRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (jobId, j) =>
app.jobIdToInfo.map { case (_, j) =>
JobInfoProfileResult(app.index, j.jobID, j.stageIds, j.sqlID, j.startTime, j.endTime)
}
}
Expand Down Expand Up @@ -270,7 +268,6 @@ object CollectInformation extends Logging {
app.allSQLMetrics.map { metric =>
val sqlId = metric.sqlID
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
val jcid = jc.sqlID.getOrElse(-1)
jc.sqlID.getOrElse(-1) == sqlId
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// combine them into single tables in the output.
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/combined",
Profiler.COMBINED_LOG_FILE_NAME_PREFIX, numOutputRows, outputCSV = outputCSV)
val sums = createAppsAndSummarize(eventLogInfos, false, profileOutputWriter)
val sums = createAppsAndSummarize(eventLogInfos, profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, sums, outputCombined)
profileOutputWriter.close()
}
Expand Down Expand Up @@ -159,7 +159,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
}

private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
Expand Down Expand Up @@ -201,9 +200,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
}

private def createAppsAndSummarize(allPaths: Seq[EventLogInfo],
printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter): Seq[ApplicationSummaryInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationSummaryInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
Expand Down Expand Up @@ -253,7 +250,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea

private def createAppAndProcess(
allPaths: Seq[EventLogInfo],
startIndex: Int = 1): Unit = {
startIndex: Int): Unit = {
class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
try {
Expand Down Expand Up @@ -309,7 +306,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}")
Some(app)
} catch {
case json: com.fasterxml.jackson.core.JsonParseException =>
case _: com.fasterxml.jackson.core.JsonParseException =>
logWarning(s"Error parsing JSON: $path")
None
case il: IllegalArgumentException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
speedupFactorFile: Option[String] = None) extends Logging {

private val NS = "NS"
private val PS = "PS"
private val PSPART = "PS*"
private val SPART = "S*"
// configured off
private val CO = "CO"
private val NA = "NA"
Expand Down Expand Up @@ -231,7 +228,7 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
case "float" => Seq("real")
case "decimal" => Seq("dec", "numeric")
case "calendar" => Seq("interval")
case other => Seq.empty[String]
case _ => Seq.empty[String]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ object QualOutputWriter {

private def constructOutputRow(
strAndSizes: Buffer[(String, Int)],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean = false): String = {
delimiter: String,
prettyPrint: Boolean): String = {
val entireHeader = new StringBuffer
if (prettyPrint) {
entireHeader.append(delimiter)
Expand Down Expand Up @@ -866,7 +866,7 @@ object QualOutputWriter {
private def constructExecInfoBuffer(
info: ExecInfo,
appId: String,
delimiter: String = TEXT_DELIMITER,
delimiter: String,
prettyPrint: Boolean,
headersAndSizes: LinkedHashMap[String, Int],
reformatCSV: Boolean = true): String = {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ object QualOutputWriter {
private def constructDetailedAppInfoCSVRow(
appInfo: FormattedQualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
reportReadSchema: Boolean = false,
reportReadSchema: Boolean,
reformatCSV: Boolean = true): ListBuffer[(String, Int)] = {
val reformatCSVFunc : String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ object QualOutputWriter {
private def constructStatusReportInfo(
statusInfo: StatusSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
delimiter: String,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {
val reformatCSVFunc: String => String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class RunningQualificationApp(
val sqlInfo = aggregatePerSQLStats(sqlID)
val csvResult =
constructPerSqlResult(sqlInfo, QualOutputWriter.CSV_DELIMITER, false, escapeCSV = true)
val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER, true)
val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER,
prettyPrint = true)
(csvResult, textResult)
}

Expand All @@ -176,8 +177,8 @@ class RunningQualificationApp(

private def constructPerSqlResult(
sqlInfo: Option[EstimatedPerSQLSummaryInfo],
delimiter: String = "|",
prettyPrint: Boolean = true,
delimiter: String,
prettyPrint: Boolean,
sqlDescLength: Int = SQL_DESC_LENGTH,
escapeCSV: Boolean = false): String = {
sqlInfo match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ abstract class AppBase(
val fs = eventLogPath.getFileSystem(hconf)
var totalNumEvents = 0
val readerOpt = eventLog match {
case dblog: DatabricksEventLog =>
case _: DatabricksEventLog =>
Some(new DatabricksRollingEventLogFilesFileReader(fs, eventLogPath))
case apachelog => EventLogFileReader(fs, eventLogPath)
case _ => EventLogFileReader(fs, eventLogPath)
}

if (readerOpt.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object AppFilterImpl {
val timeInt = try {
timeStr.toInt
} catch {
case ne: NumberFormatException =>
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class ApplicationInfo(
}

def aggregateSQLStageInfo: Seq[SQLStageInfoProfileResult] = {
val jobsWithSQL = jobIdToInfo.filter { case (id, j) =>
val jobsWithSQL = jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
Expand Down Expand Up @@ -360,7 +360,7 @@ class ApplicationInfo(
val res = this.appInfo

val estimatedResult = this.appEndTime match {
case Some(t) => this.appEndTime
case Some(_) => this.appEndTime
case None =>
val jobEndTimes = jobIdToInfo.map { case (_, jc) => jc.endTime }.filter(_.isDefined)
val sqlEndTimes = sqlIdToInfo.map { case (_, sc) => sc.endTime }.filter(_.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.rapids.tool.qualification

import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, IgnoreExecs, SupportedMLFuncsName, ToolUtils}
import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, SupportedMLFuncsName, ToolUtils}

class QualificationAppInfo(
eventLogInfo: Option[EventLogInfo],
Expand Down Expand Up @@ -106,7 +104,7 @@ class QualificationAppInfo(
if (startTime > 0) {
val estimatedResult =
this.appEndTime match {
case Some(t) => this.appEndTime
case Some(_) => this.appEndTime
case None =>
if (lastSQLEndTime.isEmpty && lastJobEndTime.isEmpty) {
None
Expand Down Expand Up @@ -312,7 +310,7 @@ class QualificationAppInfo(
}
val transitionsTime = numTransitions match {
case 0 => 0L // no transitions
case gpuCpuTransitions =>
case _ =>
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
Expand All @@ -323,13 +321,11 @@ class QualificationAppInfo(
}
if (totalBytesRead > 0) {
val transitionTime = (totalBytesRead /
QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions
QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * numTransitions
(transitionTime * 1000).toLong // convert to milliseconds
} else {
0L
}

case _ => 0L
}
val finalEachStageUnsupported = if (transitionsTime != 0) {
// Add 50% penalty for unsupported duration if there are transitions. This number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class RunningQualificationEventProcessor(sparkConf: SparkConf) extends SparkList
listener.onOtherEvent(event)
event match {
case e: SparkListenerSQLExecutionStart =>
logDebug("Starting new SQL query")
logDebug(s"Starting new SQL query: ${e.executionId}")
case e: SparkListenerSQLExecutionEnd =>
writeSQLDetails(e.executionId)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ object EventUtils extends Logging {
*
* @param data value stored in the (value/update) of the AccumulableInfo
* @return valid parsed long of the content or the duration
* @throws java.lang.NullPointerException if the argument is `null`
*/
@throws[NullPointerException]
def parseAccumFieldToLong(data: Any): Option[Long] = {
val strData = data.toString
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object ToolTestUtils extends Logging {

def processProfileApps(logs: Array[String],
sparkSession: SparkSession): ArrayBuffer[ApplicationInfo] = {
var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs = new ProfileArgs(logs)
var index: Int = 1
for (path <- appArgs.eventlog()) {
Expand Down
Loading
Loading