diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala index d39323c7a..a5babf35d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.rapids.tool.util import java.io.{PrintWriter, StringWriter} +import java.lang.management.ManagementFactory + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import com.nvidia.spark.rapids.tool.ToolTextFileWriter import org.apache.hadoop.conf.Configuration @@ -49,6 +52,12 @@ object RuntimeUtil extends Logging { // Add the Spark version used in runtime. // Note that it is different from the Spark version used in the build. buildProps.setProperty("runtime.spark.version", ToolUtils.sparkRuntimeVersion) + // Add the JVM and OS information + getJVMOSInfo.foreach { + kv => buildProps.setProperty(s"runtime.${kv._1}", kv._2) + } + // get the JVM memory arguments + getJVMHeapArguments.foreach(kv => buildProps.setProperty(s"runtime.${kv._1}", kv._2)) val reportWriter = new ToolTextFileWriter(outputDir, REPORT_FILE_NAME, REPORT_LABEL, hadoopConf) try { reportWriter.writeProperties(buildProps, REPORT_LABEL) @@ -73,6 +82,25 @@ object RuntimeUtil extends Logging { "os.version" -> System.getProperty("os.version") ) } -} - + def getJVMHeapArguments: Map[String, String] = { + ManagementFactory.getRuntimeMXBean.getInputArguments.filter( + p => p.startsWith("-Xmx") || p.startsWith("-Xms") || p.startsWith("-XX:")).map { + sizeArg => + if (sizeArg.startsWith("-Xmx")) { + ("jvm.arg.heap.max", sizeArg.drop(4)) + } else if (sizeArg.startsWith("-Xms")) { + ("jvm.arg.heap.min", sizeArg.drop(4)) + } else { // this is heap argument + // drop the first "-XX:" + val dropSize = if (sizeArg.startsWith("-XX:+")) 5 else 4 + val parts = sizeArg.drop(dropSize).split("=") + if (parts.length == 2) { + (s"jvm.arg.gc.${parts(0)}", parts(1)) + } else { + (s"jvm.arg.gc.${parts(0)}", "") + } + } + }.toMap + } +} diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 55b91d81c..5f8d5cf1a 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -878,11 +878,16 @@ def _re_evaluate_platform_args(self, tool_name: str) -> dict: job_args = self.ctxt.get_ctxt('jobArgs') result = copy.deepcopy(job_args) job_resources = self._get_job_submission_resources(tool_name) + jvm_min_heap = job_resources['jvmMinHeapSize'] jvm_max_heap = job_resources['jvmMaxHeapSize'] - jvm_heap_key = f'Xmx{jvm_max_heap}g' + jvm_max_heap_key = f'Xmx{jvm_max_heap}g' + jvm_min_heap_key = f'Xms{jvm_min_heap}g' # At this point, we need to set the heap argument for the JVM. Otherwise, the process uses # its default values. - result['platformArgs']['jvmArgs'].update({jvm_heap_key: ''}) + result['platformArgs']['jvmArgs'].update({ + jvm_min_heap_key: '', + jvm_max_heap_key: '' + }) return result @timeit('Building Job Arguments and Executing Job CMD') # pylint: disable=too-many-function-args diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index ab90c34ea..cc449a4d0 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -291,13 +291,18 @@ def adjust_tools_resources(cls, else: prof_threads = max(1, jvm_threads - num_threads_unit) if concurrent_mode else jvm_threads + # calculate the min heap size based on the max heap size + min_heap = max(1, heap_unit // 2) + return { 'qualification': { 'jvmMaxHeapSize': heap_unit, + 'jvmMinHeapSize': min_heap, 'rapidsThreads': num_threads_unit }, 'profiling': { 'jvmMaxHeapSize': prof_heap, + 'jvmMinHeapSize': min_heap, 'rapidsThreads': prof_threads } }