Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add output stats file for Execs(operators) #1225

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
stagesInformation:
fileName: rapids_4_spark_qualification_output_stages.csv
execsInformation:
fileName: rapids_4_spark_qualification_output_execs.csv
tunings:
subFolder: tuning
appsStatusReport:
Expand Down Expand Up @@ -137,9 +139,9 @@ local:
- 'SQL ID'
- 'Operator'
- 'Count'
- 'Stage Task Exec duration(seconds)'
- 'Impacted Stage duration(seconds)'
- '% of Stage Duration'
- 'Stage Task Exec Duration(s)'
- 'Total SQL Duration(s)'
- '% of Total SQL Duration'
- 'Supported'
intermediateOutput:
name: 'intermediate_output'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class SparkQualificationStats:
unsupported_operators_df: pd.DataFrame = field(default=None, init=False)
stages_df: pd.DataFrame = field(default=None, init=False)
result_df: pd.DataFrame = field(default=None, init=False)
execs_df: pd.DataFrame = field(default=None, init=False)
output_columns: dict = field(default=None, init=False)
qual_output: str = field(default=None, init=True)
ctxt: ToolContext = field(default=None, init=True)
Expand All @@ -59,36 +60,85 @@ def _read_csv_files(self) -> None:
'fileName')
rapids_stages_file = FSUtil.build_path(qual_output_dir, stages_report_file)
self.stages_df = pd.read_csv(rapids_stages_file)

rapids_execs_file = self.ctxt.get_value('toolOutput', 'csv', 'execsInformation',
'fileName')
self.execs_df = pd.read_csv(FSUtil.build_path(qual_output_dir, rapids_execs_file))
self.logger.info('Reading CSV files completed.')

def _convert_durations(self) -> None:
# Convert durations from milliseconds to seconds
self.unsupported_operators_df[['Stage Duration', 'App Duration']] /= 1000
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
self.stages_df[['Stage Task Duration', 'Unsupported Task Duration']] /= 1000

def _preprocess_dataframes(self) -> None:
self.logger.info('Preprocessing dataframes...')

# Filter out 'WholeStageCodegen' operators as the child operators are already included
# in the other rows
self.execs_df = self.execs_df[
~self.execs_df['Exec Name'].str.startswith('WholeStageCodegen')]

# Split 'Exec Stages' and explode the list into separate rows so that the stageID
# from this dataframe can be matched with the stageID of stages dataframe
self.execs_df['Exec Stages'] = self.execs_df['Exec Stages'].str.split(':')
self.execs_df = self.execs_df.explode('Exec Stages').dropna(subset=['Exec Stages'])
self.execs_df['Exec Stages'] = self.execs_df['Exec Stages'].astype(int)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

# Remove duplicate 'Stage ID' rows and rename some columns so that join on dataframes
# can be done easily
self.stages_df = self.stages_df.drop_duplicates(subset=['App ID', 'Stage ID'])
self.stages_df.rename(columns={'Stage Task Duration': 'StageTaskDuration'}, inplace=True)
self.execs_df.rename(columns={'Exec Name': 'Operator'}, inplace=True)
self.unsupported_operators_df.rename(columns={'Unsupported Operator': 'Operator'},
inplace=True)
self.logger.info('Preprocessing dataframes completed.')

def _merge_dataframes(self) -> None:
self.logger.info('Merging dataframes to get stats...')
# Merge unsupported_operators_df with stages_df on App ID and Stage ID
merged_df = pd.merge(self.unsupported_operators_df, self.stages_df,
on=['App ID', 'Stage ID'])

agg_unsupported_df = (merged_df.groupby(['App ID', 'SQL ID', 'Unsupported Operator']).agg(
Count=('Unsupported Operator', 'size'),
Impacted_Stage_Duration=('Stage Duration', 'sum'),
App_Duration=('App Duration', 'first'),
Stage_Task_Duration=('Stage Task Duration', 'sum')
).reset_index())

agg_unsupported_df['% of Stage Duration'] = (
(agg_unsupported_df['Impacted_Stage_Duration'] /
agg_unsupported_df['App_Duration']) * 100).round(3)

agg_unsupported_df['Supported'] = False
final_df = agg_unsupported_df.rename(columns={
'Unsupported Operator': 'Operator',
'Impacted_Stage_Duration': 'Impacted Stage duration(seconds)',
'Stage_Task_Duration': 'Stage Task Exec duration(seconds)'
})
self._preprocess_dataframes()

# Merge execs_df with stages_df
merged_df = self.execs_df.merge(self.stages_df, left_on=['App ID', 'Exec Stages'],
right_on=['App ID', 'Stage ID'], how='left')

# Merge with unsupported_operators_df to find unsupported operations
merged_df = merged_df.merge(self.unsupported_operators_df,
on=['App ID', 'SQL ID', 'Stage ID', 'Operator'],
how='left', indicator=True)
merged_df['Supported'] = merged_df['_merge'] == 'left_only'
merged_df.drop(columns=['_merge', 'Exec Stages'], inplace=True)

# Calculate total duration by summing unique stages per SQLID
total_duration_df = merged_df.drop_duplicates(subset=['App ID', 'SQL ID', 'Stage ID']) \
.groupby(['App ID', 'SQL ID'])['StageTaskDuration'] \
.sum().reset_index().rename(columns={'StageTaskDuration': 'TotalSQLDuration'})

merged_df = merged_df.merge(total_duration_df, on=['App ID', 'SQL ID'], how='left')

# Mark unique stage task durations
merged_df['Unique StageTaskDuration'] = ~merged_df.duplicated(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for each operator we will get all stages and then possibly separate rows if have both supported and unsupported and we just want to make sure we have those unique so we aren't double counting on time, correct?

So does this mean if you have an two operators in a stage, one that is supported and one that is not supported that we the time will be in there twice, correct? which is fine I just want to make sure I'm understanding properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I updated it to handle this case. Until now, it was taking into account same operators within a SQL ID and the assumption was that the operator will be either supported or unsupported within a stage. But there could be a scenario where the same operator can be supported and unsupported within a stage(due to underlying Expression not supported).

Sample output: There are 4 entries of Project for SQLID=1, StageID(5,6) in execs.csv and 3 are unsupported as below:
unsupported_operators_df

    App ID  SQL ID  Stage ID       Operator
0       1       3         7  HashAggregate
1       1       1         6        Project
2       2       1        -1        Project
3       1       1        -1         Filter
4       1       1         5        Project
5       1       1         5        Project

stages_df:

 stages_df
   App ID  Stage ID  StageTaskDuration
0       1         5                 50
2       1         4                 20
3       1         3                100
4       1         6                 60
5       1         7                100

final dataframe output(before renaming columns):

      App ID  SQL ID     Operator      Count  StageTaskDuration  TotalSQLDuration  % of Total SQL Duration  Supported
0       1       1         Filter         2                70                230                30.434783       True
1       1       1        Project         3                110               230                47.826087      False
2       1       1        Project         1                 50               230                21.739130       True
3       1       1           Sort         3                170               230                73.913043       True
4       1       3  HashAggregate         1                100               100               100.000000      False

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I assume the sql id 1 above example has some execs - likely the Project that are unsupposed in the same stages as another exec that is supported, correct? Because you hadd up the 4 of those 70 + 110 + 50 + 170 and its more then the 230 for total.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. In the above example For SQLID=1, there are 4 Projects in total.
StageID=5, Unsupported=2, Supported=1, StageTaskDuration=50
StageID=6, Unsupported=1, StageTaskDuration=60

So we have 2 rows in output column for Project.
StageTaskDuration=110 ( 50 + 60) for Unsupported
StageTaskDuration=50 for Supported

['App ID', 'SQL ID', 'Operator', 'Stage ID', 'Supported'])
merged_df['Adjusted StageTaskDuration'] = (merged_df['StageTaskDuration'] *
merged_df['Unique StageTaskDuration'])
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved

# Aggregate data
final_df = merged_df.groupby(['App ID', 'SQL ID', 'Operator', 'Supported']).agg({
'Adjusted StageTaskDuration': 'sum',
'Stage ID': 'count'
}).reset_index().rename(columns={'Stage ID': 'Count',
'Adjusted StageTaskDuration': 'StageTaskDuration'})

# Merge total duration and calculate percentage
final_df = final_df.merge(total_duration_df, on=['App ID', 'SQL ID'], how='left')
final_df['% of Total SQL Duration'] = (
final_df['StageTaskDuration'] / final_df['TotalSQLDuration'] * 100)

# Rename columns
final_df.rename(columns={
'StageTaskDuration': 'Stage Task Exec Duration(s)',
'TotalSQLDuration': 'Total SQL Duration(s)'
}, inplace=True)
self.result_df = final_df[self.output_columns.get('columns')].copy()
self.logger.info('Merging stats dataframes completed.')

Expand Down