Skip to content

Commit

Permalink
Add more edge cases; Address a comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Dec 19, 2024
1 parent 253c6f4 commit 8270ea0
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 34 deletions.
144 changes: 111 additions & 33 deletions integration_tests/src/main/python/hashing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from marks import allow_non_gpu, ignore_order

_xxhash_gens = [
_atomic_gens = [
null_gen,
boolean_gen,
byte_gen,
Expand All @@ -34,53 +34,131 @@
double_gen
]

_struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_xxhash_gens)])
_struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_atomic_gens)])

# will be used by HyperLogLogPlusPLus(approx_count_distinct)
xxhash_gens = (_atomic_gens + [_struct_of_xxhash_gens] + single_level_array_gens
+ nested_array_gens_sample + [
all_basic_struct_gen,
struct_array_gen,
_struct_of_xxhash_gens
] + map_gens_sample)

_xxhash_gens = (_xxhash_gens + [_struct_of_xxhash_gens] + single_level_array_gens
+ nested_array_gens_sample + [
all_basic_struct_gen,
struct_array_gen,
_struct_of_xxhash_gens
] + map_gens_sample)

@ignore_order(local=True)
@pytest.mark.parametrize("gen", _xxhash_gens, ids=idfn)
@pytest.mark.parametrize("gen", xxhash_gens, ids=idfn)
def test_xxhash64_single_column(gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, gen).selectExpr("a", "xxhash64(a)"),
{"spark.sql.legacy.allowHashOnMapType" : True})
lambda spark: unary_op_df(spark, gen).selectExpr("a", "xxhash64(a)"),
{"spark.sql.legacy.allowHashOnMapType": True})


@ignore_order(local=True)
def test_xxhash64_multi_column():
gen = StructGen(_struct_of_xxhash_gens.children, nullable=False)
col_list = ",".join(gen.data_type.fieldNames())
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).selectExpr("c0", f"xxhash64({col_list})"),
{"spark.sql.legacy.allowHashOnMapType" : True})
lambda spark: gen_df(spark, gen).selectExpr("c0", f"xxhash64({col_list})"),
{"spark.sql.legacy.allowHashOnMapType": True})


def test_xxhash64_8_depth():
gen_8_depth = StructGen([('l1', # level 1
StructGen([('l2',
StructGen([('l3',
StructGen([('l4',
StructGen([('l5',
StructGen([('l6',
StructGen([('l7',
int_gen)]))]))]))]))]))]))]) # level 8
gen_8_depth = (
StructGen([('l1', # level 1
StructGen([('l2',
StructGen([('l3',
StructGen([('l4',
StructGen([('l5',
StructGen([('l6',
StructGen([('l7',
int_gen)]))]))]))]))]))]))])) # level 8
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, gen_8_depth).selectExpr("a", "xxhash64(a)"))
lambda spark: unary_op_df(spark, gen_8_depth).selectExpr("a", "xxhash64(a)"))


@allow_non_gpu("ProjectExec")
def test_xxhash64_fallback_exceeds_stack_size():
gen_9_depth = StructGen([('l1', # level 1
StructGen([('l2',
StructGen([('l3',
StructGen([('l4',
StructGen([('l5',
StructGen([('l6',
StructGen([('l7',
StructGen([('l8',
int_gen)]))]))]))]))]))]))]))]) # level 9
def test_xxhash64_fallback_exceeds_stack_size_array_of_structure():
gen_9_depth = (
ArrayGen( # depth += 1
StructGen([('c', # depth += 1
ArrayGen( # depth += 1
StructGen([('c', # depth += 1
ArrayGen( # depth += 1
StructGen([('c', # depth += 1
ArrayGen( # depth += 1
StructGen([('c', # depth += 1
int_gen)]), # depth += 1
max_length=1))]),
max_length=1))]),
max_length=1))]),
max_length=1))
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, gen_9_depth).selectExpr("a", "xxhash64(a)"),
lambda spark: unary_op_df(spark, gen_9_depth).selectExpr("a", "xxhash64(a)"),
"ProjectExec")


@allow_non_gpu("ProjectExec")
def test_xxhash64_array_of_other():
gen_9_depth = (
ArrayGen( # array(other: not struct): depth += 0
ArrayGen( # array(other: not struct): depth += 0
ArrayGen( # array(other: not struct): depth += 0
MapGen( # map: depth += 2
IntegerGen(nullable=False),
ArrayGen( # array(other: not struct): depth += 0
MapGen( # map: depth += 2
IntegerGen(nullable=False),
ArrayGen( # array(other: not struct): depth += 0
MapGen( # map: depth += 2
IntegerGen(nullable=False),
int_gen, # primitive: depth += 1
max_length=1),
max_length=1),
max_length=1),
max_length=1),
max_length=1),
max_length=1),
max_length=1),
max_length=1))
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen_9_depth).selectExpr("a", "xxhash64(a)"),
{"spark.sql.legacy.allowHashOnMapType": True})


@allow_non_gpu("ProjectExec")
def test_xxhash64_fallback_exceeds_stack_size_structure():
gen_9_depth = (
StructGen([('l1', # level 1
StructGen([('l2',
StructGen([('l3',
StructGen([('l4',
StructGen([('l5',
StructGen([('l6',
StructGen([('l7',
StructGen([('l8',
int_gen)]))]))]))]))]))]))]))])) # level 9
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen_9_depth).selectExpr("a", "xxhash64(a)"),
"ProjectExec")


@allow_non_gpu("ProjectExec")
def test_xxhash64_fallback_exceeds_stack_size_map():
gen_9_depth = (
MapGen( # depth += 2
IntegerGen(nullable=False),
MapGen( # depth += 2
IntegerGen(nullable=False),
MapGen( # depth += 2
IntegerGen(nullable=False),
MapGen( # depth += 2
IntegerGen(nullable=False), # depth += 1
IntegerGen(nullable=False),
max_length=1),
max_length=1),
max_length=1),
max_length=1))
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen_9_depth).selectExpr("a", "xxhash64(a)"),
"ProjectExec",
{"spark.sql.legacy.allowHashOnMapType": True})
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object XxHash64Utils {
def computeMaxStackSize(inputType: DataType): Int = {
inputType match {
case ArrayType(c: StructType, _) => 1 + computeMaxStackSize(c)
case ArrayType(c: DataType, _) => computeMaxStackSize(c)
case ArrayType(c, _) => computeMaxStackSize(c)
case st: StructType =>
1 + st.map(f => computeMaxStackSize(f.dataType)).max
case mt: MapType =>
Expand Down

0 comments on commit 8270ea0

Please sign in to comment.