Skip to content

Commit

Permalink
Add qualification support for Photon jobs in the Python Tool (#1409)
Browse files Browse the repository at this point in the history
* Add support for different speedup threshold in Photon

Signed-off-by: Partho Sarthi <[email protected]>

* Add speed up strategy per app

Signed-off-by: Partho Sarthi <[email protected]>

* Assign app execution engine and categorize speedup based on engine type

Signed-off-by: Partho Sarthi <[email protected]>

* Add E2E test cases

Signed-off-by: Partho Sarthi <[email protected]>

* Rename App Execution Engine to Execution Engine

Signed-off-by: Partho Sarthi <[email protected]>

* Rename loop variables

Signed-off-by: Partho Sarthi <[email protected]>

* Address review comments and parse spark runtime

Signed-off-by: Partho Sarthi <[email protected]>

* Revert "Add E2E test cases"

This reverts commit 8921a85

Signed-off-by: Partho Sarthi <[email protected]>

* Buffer logging

Signed-off-by: Partho Sarthi <[email protected]>

* Add directory list helper methods in CspFs and Photon comment

Signed-off-by: Partho Sarthi <[email protected]>

* Use create_sub_path() to keep protocol intact

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Nov 14, 2024
1 parent 5dd244a commit 43825d8
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 38 deletions.
74 changes: 69 additions & 5 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.storagelib import CspFs
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
Expand Down Expand Up @@ -289,12 +290,12 @@ def __build_global_report_summary(self,
total_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: JSONPropertiesContainer) -> QualificationSummary:
# TODO: This method does a lot of critical but unrelated work. Refactor this into smaller steps/methods
# to improve readability and maintainability.
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(total_apps=total_apps, tools_processed_apps=all_apps)

unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Generate the statistics report
try:
stats_report = SparkQualificationStats(ctxt=self.ctxt)
Expand All @@ -303,37 +304,52 @@ def __build_global_report_summary(self,
self.logger.error('Failed to generate the statistics report: %s', e)

# Calculate unsupported operators stage duration before grouping
unsupported_ops_obj = UnsupportedOpsStageDuration(
self.ctxt.get_value('local', 'output', 'unsupportedOperators'))
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)

# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_rapids_output_folder(),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))

# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
df_final_result = speedup_category_ob.build_category_column(apps_grouped_df)

# Assign the runtime type (Spark/Photon etc.) and speedup categories (Small/Medium/Large) to each application.
# Note: Strategy for speedup categorization will be based on the execution engine of the application.
apps_with_runtime_df = self._assign_spark_runtime_to_apps(apps_grouped_df)
speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories')
speedup_category_ob = SpeedupCategory(speedup_category_confs)
df_final_result = speedup_category_ob.build_category_column(apps_with_runtime_df)

# Generate the cluster shape report
reshaped_notes = self.__generate_cluster_shape_report()
report_comments = [group_notes] if group_notes else []
if reshaped_notes:
report_comments.append(reshaped_notes)

# Write the final result to the output file
csv_out = output_files_info.get_value('summary', 'path')
if not df_final_result.empty:
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
df_final_result.to_csv(csv_out, float_format='%.2f')

# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Merge the total_apps with the processed_apps to get the Event Log
df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']],
left_on='App ID', right_on='AppID')

# Write the app metadata
app_metadata_info = output_files_info.get_value('appMetadata')
config_recommendations_info = output_files_info.get_value('configRecommendations')
self._write_app_metadata(df_final_result, app_metadata_info, config_recommendations_info)

# Return the summary
return QualificationSummary(total_apps=total_apps,
tools_processed_apps=df_final_result,
comments=report_comments)
Expand Down Expand Up @@ -595,6 +611,54 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
return pd.read_csv(report_file_path)

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
"""
Helper method to read metric files from the qualification tool's output metric folder.
Returns a dictionary of DataFrames, where each key is an application ID, and each
DataFrame contains the corresponding application's metrics data.
Example:
{
'appId1': pd.DataFrame(...),
'appId2': pd.DataFrame(...),
}
:param file_name: Name of the metric file to read from each application's folder
"""
metrics = {}
root_metric_dir = self.ctxt.get_metrics_output_folder()
apps_with_missing_metrics = []
for metric_dir in CspFs.list_all_dirs(root_metric_dir):
app_id_str = metric_dir.base_name()
report_file_path = metric_dir.create_sub_path(file_name)
try:
metrics[app_id_str] = pd.read_csv(str(report_file_path))
except Exception: # pylint: disable=broad-except
# Some apps may not have the given metrics file, we should ensure
# that the dictionary contains entries for all apps to avoid KeyErrors
# and maintain consistency in processing.
metrics[app_id_str] = pd.DataFrame()
apps_with_missing_metrics.append(app_id_str)

# Log apps with missing metrics files
if apps_with_missing_metrics:
self.logger.warning('Unable to read metrics file \'%s\' for apps: %s', file_name,
', '.join(apps_with_missing_metrics))
return metrics

def _assign_spark_runtime_to_apps(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame:
"""
Assigns the Spark Runtime (Spark/Photon) to each application. This will be used to categorize
applications into speedup categories (Small/Medium/Large).
"""
app_info_dict = self._read_qualification_metric_file('application_information.csv')
# Rename columns from each DataFrame in the app_info_dict and merge them with the tools_processed_apps
merged_dfs = []
for df in app_info_dict.values():
merged_dfs.append(
df[['appId', 'sparkRuntime']].rename(columns={'appId': 'App ID', 'sparkRuntime': 'Spark Runtime'})
)
spark_runtime_df = pd.concat(merged_dfs, ignore_index=True)
return tools_processed_apps.merge(spark_runtime_df, on='App ID', how='left')


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
7 changes: 6 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from spark_rapids_pytools.common.prop_manager import YAMLPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_tools import CspEnv
from spark_rapids_tools import CspEnv, CspPath
from spark_rapids_tools.utils import Utilities


Expand Down Expand Up @@ -215,6 +215,11 @@ def get_rapids_output_folder(self) -> str:
return root_dir
return FSUtil.build_path(root_dir, rapids_subfolder)

def get_metrics_output_folder(self) -> CspPath:
root_dir = CspPath(self.get_rapids_output_folder())
metrics_subfolder = self.get_value('toolOutput', 'metricsSubFolder')
return root_dir.create_sub_path(metrics_subfolder)

def get_log4j_properties_file(self) -> str:
return self.get_value_silent('toolOutput', 'textFormat', 'log4jFileName')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
toolOutput:
completeOutput: true
subFolder: rapids_4_spark_qualification_output
metricsSubFolder: raw_metrics
textFormat:
summaryLog:
fileName: rapids_4_spark_qualification_output.log
Expand Down Expand Up @@ -148,6 +149,7 @@ local:
- 'App Name'
- 'Event Log'
- 'Cluster Info'
- 'Spark Runtime'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
Expand Down Expand Up @@ -254,27 +256,51 @@ local:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
sparkRuntimeColumnName: 'Spark Runtime'
defaultCategory: 'Not Recommended'
strategies:
spark: # Spark specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.3
- title: 'Small'
lowerBound: 1.3
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.3
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
photon: # Photon specific speedup categories
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
upperBound: 1.0 # Lower threshold than Spark since Photon is faster and smaller speedups are valuable
- title: 'Small'
lowerBound: 1.0
upperBound: 2.0
- title: 'Medium'
lowerBound: 2.0
upperBound: 3.0
- title: 'Large'
lowerBound: 3.0
upperBound: 1000000.0
eligibilityConditions:
- columnName: 'Estimated GPU Speedup'
lowerBound: 1.0
upperBound: 1000000.0
- columnName: 'Unsupported Operators Stage Duration Percent'
lowerBound: 0.0
upperBound: 25.0
additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
Expand Down
6 changes: 5 additions & 1 deletion user_tools/src/spark_rapids_tools/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,6 +54,10 @@ class CspPathAttributeError(CspPathException, ValueError):
pass


class CspPathTypeMismatchError(CspPathException, ValueError):
pass


class InvalidPropertiesSchema(CspPathException, ValueError):
"""
Defines a class to represent errors caused by invalid properties schema
Expand Down
34 changes: 32 additions & 2 deletions user_tools/src/spark_rapids_tools/storagelib/cspfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import abc
import os
from typing import Generic, Callable, TypeVar, Any, Union
from typing import Generic, Callable, TypeVar, Any, Union, List, Optional

from pyarrow import fs as arrow_fs
from pyarrow.fs import FileType

from .csppath import CspPathImplementation, CspPath, path_impl_registry

from ..exceptions import (
CspPathNotFoundException
CspPathNotFoundException, CspPathTypeMismatchError
)

BoundedCspPath = TypeVar('BoundedCspPath', bound=CspPath)
Expand Down Expand Up @@ -151,3 +152,32 @@ def copy_resources(cls, src: BoundedCspPath, dest: BoundedCspPath):
destination_filesystem=dest.fs_obj.fs,
# 64 MB chunk size
chunk_size=64 * 1024 * 1024)

@classmethod
def list_all_files(cls, path: BoundedCspPath) -> List[BoundedCspPath]:
return cls._list_items_by_type(path, FileType.File)

@classmethod
def list_all_dirs(cls, path: BoundedCspPath) -> List[BoundedCspPath]:
return cls._list_items_by_type(path, FileType.Directory)

@classmethod
def list_all(cls, path: BoundedCspPath) -> List[BoundedCspPath]:
return cls._list_items_by_type(path, None)

@staticmethod
def _list_items_by_type(path: BoundedCspPath, item_type: Optional[FileType]) -> List[BoundedCspPath]:
"""
Helper function to list files, directories, or all items in the given path.
"""
if not path.exists():
raise CspPathNotFoundException(f'Path does not exist: {path}')
if not path.is_dir():
raise CspPathTypeMismatchError(f'Path is not a directory: {path}')

dir_info_list = path.fs_obj.get_file_info(arrow_fs.FileSelector(path.no_scheme, recursive=False))
return [
path.create_sub_path(dir_info.base_name)
for dir_info in dir_info_list
if item_type is None or dir_info.type == item_type
]
Loading

0 comments on commit 43825d8

Please sign in to comment.