Skip to content

Commit

Permalink
ignore output of column o_orderkey for query18
Browse files Browse the repository at this point in the history
Signed-off-by: Yinqing Hao <[email protected]>
  • Loading branch information
yinqingh committed Oct 17, 2024
1 parent 31e6efa commit a3cb119
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions nds-h/nds_h_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,22 @@

from nds_h_power import gen_sql_from_stream, get_query_subset

SKIP_QUERIES = [
'query15_part1', # create view query
'query15_part3', # drop view query
]
SKIP_NON_DETERMINISTIC_COLUMNS = {
'query18': ['o_orderkey'],
}


def compare_results(spark_session: SparkSession,
input1: str,
input2: str,
input1_format: str,
input2_format: str,
ignore_ordering: bool,
query_name: str,
use_iterator=False,
max_errors=10,
epsilon=0.00001) -> bool:
Expand All @@ -65,6 +75,7 @@ def compare_results(spark_session: SparkSession,
input2_format (str): data source format for input2, e.g. parquet, orc
ignore_ordering (bool): whether ignoring the order of input data.
If true, we will order by ourselves.
query_name (str): Query name.
use_iterator (bool, optional): When set to true, use `toLocalIterator` to load one partition
at a time into driver memory, reducing memory usage at the cost of performance because
processing will be single-threaded. Defaults to False.
Expand All @@ -75,15 +86,18 @@ def compare_results(spark_session: SparkSession,
Returns:
bool: True if result matches otherwise False
"""
if query_name in SKIP_QUERIES:
return True

df1 = spark_session.read.format(input1_format).load(input1)
df2 = spark_session.read.format(input2_format).load(input2)
count1 = df1.count()
count2 = df2.count()

if(count1 == count2):
#TODO: need partitioned collect for NDS? there's no partitioned output currently
result1 = collect_results(df1, ignore_ordering, use_iterator)
result2 = collect_results(df2, ignore_ordering, use_iterator)
result1 = collect_results(df1, query_name, ignore_ordering, use_iterator)
result2 = collect_results(df2, query_name, ignore_ordering, use_iterator)

errors = 0
i = 0
Expand All @@ -110,8 +124,13 @@ def compare_results(spark_session: SparkSession,
return False

def collect_results(df: DataFrame,
ignore_ordering: bool,
use_iterator: bool):
query_name: str,
ignore_ordering: bool,
use_iterator: bool):
# skip non-deterministic columns
if query_name in SKIP_NON_DETERMINISTIC_COLUMNS:
df = df.drop(*SKIP_NON_DETERMINISTIC_COLUMNS[query_name])

# apply sorting if specified
non_float_cols = [col(field.name) for \
field in df.schema.fields \
Expand Down Expand Up @@ -182,21 +201,19 @@ def iterate_queries(spark_session: SparkSession,
# Providing a list instead of hard-coding all queires is to satisfy the arbitary queries run.
unmatch_queries = []
for query_name in query_dict.keys():
if query_name in ['query15_part1', 'query15_part3']:
# query15_part1 and query15_part3 are skipped since these are create/drop view queries
continue
sub_input1 = input1 + '/' + query_name
sub_input2 = input2 + '/' + query_name
print(f"=== Comparing Query: {query_name} ===")
result_equal = compare_results(spark_session,
sub_input1,
sub_input2,
input1_format,
input2_format,
ignore_ordering,
use_iterator=use_iterator,
max_errors=max_errors,
epsilon=epsilon)
sub_input1,
sub_input2,
input1_format,
input2_format,
ignore_ordering,
query_name,
use_iterator=use_iterator,
max_errors=max_errors,
epsilon=epsilon)
if result_equal == False:
unmatch_queries.append(query_name)
if len(unmatch_queries) != 0:
Expand Down

0 comments on commit a3cb119

Please sign in to comment.