Skip to content

Commit 954f22c

Browse files
committed
feat: update spark description methods
1 parent 7bbba9e commit 954f22c

File tree

8 files changed

+111
-143
lines changed

8 files changed

+111
-143
lines changed

src/ydata_profiling/model/spark/describe_boolean_spark.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
from ydata_profiling.config import Settings
66
from ydata_profiling.model.summary_algorithms import describe_boolean_1d
7+
from ydata_profiling.model.var_description.default import VarDescriptionHashable
78

89

910
@describe_boolean_1d.register
1011
def describe_boolean_1d_spark(
11-
config: Settings, df: DataFrame, summary: dict
12-
) -> Tuple[Settings, DataFrame, dict]:
12+
config: Settings, df: DataFrame, summary: VarDescriptionHashable
13+
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
1314
"""Describe a boolean series.
1415
1516
Args:

src/ydata_profiling/model/spark/describe_categorical_spark.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
from ydata_profiling.config import Settings
66
from ydata_profiling.model.summary_algorithms import describe_categorical_1d
7+
from ydata_profiling.model.var_description.default import VarDescriptionHashable
78

89

910
@describe_categorical_1d.register
1011
def describe_categorical_1d_spark(
11-
config: Settings, df: DataFrame, summary: dict
12-
) -> Tuple[Settings, DataFrame, dict]:
12+
config: Settings, df: DataFrame, summary: VarDescriptionHashable
13+
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
1314
"""Describe a categorical series.
1415
1516
Args:

src/ydata_profiling/model/spark/describe_date_spark.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from ydata_profiling.config import Settings
88
from ydata_profiling.model.summary_algorithms import describe_date_1d
9+
from ydata_profiling.model.var_description.default import VarDescriptionHashable
910

1011

1112
def date_stats_spark(df: DataFrame, summary: dict) -> dict:
@@ -21,8 +22,8 @@ def date_stats_spark(df: DataFrame, summary: dict) -> dict:
2122

2223
@describe_date_1d.register
2324
def describe_date_1d_spark(
24-
config: Settings, df: DataFrame, summary: dict
25-
) -> Tuple[Settings, DataFrame, dict]:
25+
config: Settings, df: DataFrame, summary: VarDescriptionHashable
26+
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
2627
"""Describe a date series.
2728
2829
Args:

src/ydata_profiling/model/spark/describe_numeric_spark.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
describe_numeric_1d,
1010
histogram_compute,
1111
)
12+
from ydata_profiling.model.var_description.default import VarDescriptionHashable
1213

1314

14-
def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
15+
def numeric_stats_spark(df: DataFrame, summary: VarDescriptionHashable) -> dict:
1516
column = df.columns[0]
1617

1718
expr = [
@@ -29,8 +30,8 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
2930

3031
@describe_numeric_1d.register
3132
def describe_numeric_1d_spark(
32-
config: Settings, df: DataFrame, summary: dict
33-
) -> Tuple[Settings, DataFrame, dict]:
33+
config: Settings, df: DataFrame, summary: VarDescriptionHashable
34+
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
3435
"""Describe a boolean series.
3536
3637
Args:
@@ -51,7 +52,7 @@ def describe_numeric_1d_spark(
5152
summary["kurtosis"] = stats["kurtosis"]
5253
summary["sum"] = stats["sum"]
5354

54-
value_counts = summary["value_counts"]
55+
value_counts = summary.value_counts
5556

5657
n_infinite = (
5758
value_counts.where(F.col(df.columns[0]).isin([np.inf, -np.inf]))
@@ -106,12 +107,12 @@ def describe_numeric_1d_spark(
106107
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]
107108

108109
# FIXME: move to fmt
109-
summary["p_negative"] = summary["n_negative"] / summary["n"]
110+
summary["p_negative"] = summary["n_negative"] / summary.n
110111
summary["range"] = summary["max"] - summary["min"]
111112
summary["iqr"] = summary["75%"] - summary["25%"]
112113
summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.NaN
113-
summary["p_zeros"] = summary["n_zeros"] / summary["n"]
114-
summary["p_infinite"] = summary["n_infinite"] / summary["n"]
114+
summary["p_zeros"] = summary["n_zeros"] / summary.n
115+
summary["p_infinite"] = summary["n_infinite"] / summary.n
115116

116117
# TODO - enable this feature
117118
# because spark doesn't have an indexing system, there isn't really the idea of monotonic increase/decrease
@@ -124,14 +125,14 @@ def describe_numeric_1d_spark(
124125
# display in pandas display
125126
# the alternative is to do this in spark natively, but it is not trivial
126127
infinity_values = [np.inf, -np.inf]
127-
infinity_index = summary["value_counts_without_nan"].index.isin(infinity_values)
128+
infinity_index = summary.value_counts_without_nan.index.isin(infinity_values)
128129

129130
summary.update(
130131
histogram_compute(
131132
config,
132-
summary["value_counts_without_nan"][~infinity_index].index.values,
133-
summary["n_distinct"],
134-
weights=summary["value_counts_without_nan"][~infinity_index].values,
133+
summary.value_counts_without_nan[~infinity_index].index.values,
134+
summary.n_distinct,
135+
weights=summary.value_counts_without_nan[~infinity_index].values,
135136
)
136137
)
137138

src/ydata_profiling/model/spark/describe_supported_spark.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
from pyspark.sql import DataFrame
44

55
from ydata_profiling.config import Settings
6+
from ydata_profiling.model.spark.var_description.default_spark import (
7+
get_default_spark_description,
8+
)
69
from ydata_profiling.model.summary_algorithms import describe_supported
10+
from ydata_profiling.model.var_description.default import VarDescription
711

812

913
@describe_supported.register
1014
def describe_supported_spark(
1115
config: Settings, series: DataFrame, summary: dict
12-
) -> Tuple[Settings, DataFrame, dict]:
16+
) -> Tuple[Settings, DataFrame, VarDescription]:
1317
"""Describe a supported series.
1418
Args:
1519
series: The Series to describe.
@@ -18,16 +22,6 @@ def describe_supported_spark(
1822
A dict containing calculated series description values.
1923
"""
2024

21-
# number of non-NaN observations in the Series
22-
count = summary["count"]
23-
n_distinct = summary["value_counts"].count()
25+
series_description = get_default_spark_description(config, series, summary)
2426

25-
summary["n_distinct"] = n_distinct
26-
summary["p_distinct"] = n_distinct / count if count > 0 else 0
27-
28-
n_unique = summary["value_counts"].where("count == 1").count()
29-
summary["is_unique"] = n_unique == count
30-
summary["n_unique"] = n_unique
31-
summary["p_unique"] = n_unique / count
32-
33-
return config, series, summary
27+
return config, series, series_description

src/ydata_profiling/model/spark/describe_text_spark.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
from ydata_profiling.config import Settings
66
from ydata_profiling.model.summary_algorithms import describe_text_1d
7+
from ydata_profiling.model.var_description.default import VarDescriptionHashable
78

89

910
@describe_text_1d.register
1011
def describe_text_1d_spark(
11-
config: Settings, df: DataFrame, summary: dict
12-
) -> Tuple[Settings, DataFrame, dict]:
12+
config: Settings, df: DataFrame, summary: VarDescriptionHashable
13+
) -> Tuple[Settings, DataFrame, VarDescriptionHashable]:
1314
"""Describe a categorical series.
1415
1516
Args:

src/ydata_profiling/model/spark/var_description/counts_spark.py

Lines changed: 49 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,69 +4,52 @@
44
from ydata_profiling.model.var_description.counts import VarCounts
55

66

7-
class VarCountsSpark(VarCounts):
8-
value_counts_without_nan: DataFrame
9-
"""Counts of values in the series without NaN."""
10-
value_counts_index_sorted: DataFrame
11-
"""Sorted counts of values in the series without NaN."""
12-
value_counts: DataFrame
13-
14-
def __init__(self, config: Settings, series: DataFrame):
15-
"""Counts the values in a series (with and without NaN, distinct).
16-
17-
Args:
18-
config: report Settings object
19-
series: Series for which we want to calculate the values.
20-
summary: series' summary
21-
22-
Returns:
23-
A dictionary with the count values (with and without NaN, distinct).
24-
"""
25-
length = series.count()
26-
27-
value_counts = series.groupBy(series.columns).count()
28-
value_counts = value_counts.sort("count", ascending=False).persist()
29-
value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True)
30-
31-
n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first()
32-
if n_missing is None:
33-
n_missing = 0
34-
else:
35-
n_missing = n_missing["count"]
36-
37-
# FIXME: reduce to top-n and bottom-n
38-
value_counts_index_sorted = (
39-
value_counts_index_sorted.limit(200)
40-
.toPandas()
41-
.set_index(series.columns[0], drop=True)
42-
.squeeze(axis="columns")
43-
)
44-
45-
# this is necessary as freqtables requires value_counts_without_nan
46-
# to be a pandas series. However, if we try to get everything into
47-
# pandas we will definitly crash the server
48-
value_counts_without_nan = (
49-
value_counts.dropna()
50-
.limit(200)
51-
.toPandas()
52-
.set_index(series.columns[0], drop=True)
53-
.squeeze(axis="columns")
54-
)
55-
56-
# FIXME: This is not correct, but used to fulfil render expectations
57-
# @chanedwin
58-
memory_size = 0
59-
60-
self.value_counts = value_counts
61-
super().__init__(
62-
hashable=False,
63-
value_counts_without_nan=value_counts_without_nan,
64-
value_counts_index_sorted=value_counts_index_sorted,
65-
ordering=False,
66-
n_missing=n_missing,
67-
n=length,
68-
p_missing=n_missing / length,
69-
count=length - n_missing,
70-
memory_size=memory_size,
71-
value_counts=value_counts.persist(),
72-
)
7+
def get_counts_spark(config: Settings, series: DataFrame) -> VarCounts:
8+
"""Get a VarCounts object for a spark series."""
9+
length = series.count()
10+
11+
value_counts = series.groupBy(series.columns).count()
12+
value_counts = value_counts.sort("count", ascending=False).persist()
13+
value_counts_index_sorted = value_counts.sort(series.columns[0], ascending=True)
14+
15+
n_missing = value_counts.where(value_counts[series.columns[0]].isNull()).first()
16+
if n_missing is None:
17+
n_missing = 0
18+
else:
19+
n_missing = n_missing["count"]
20+
21+
# FIXME: reduce to top-n and bottom-n
22+
value_counts_index_sorted = (
23+
value_counts_index_sorted.limit(200)
24+
.toPandas()
25+
.set_index(series.columns[0], drop=True)
26+
.squeeze(axis="columns")
27+
)
28+
29+
# this is necessary as freqtables requires value_counts_without_nan
30+
# to be a pandas series. However, if we try to get everything into
31+
# pandas we will definitly crash the server
32+
value_counts_without_nan = (
33+
value_counts.dropna()
34+
.limit(200)
35+
.toPandas()
36+
.set_index(series.columns[0], drop=True)
37+
.squeeze(axis="columns")
38+
)
39+
40+
# FIXME: This is not correct, but used to fulfil render expectations
41+
# @chanedwin
42+
memory_size = 0
43+
44+
return VarCounts(
45+
hashable=False,
46+
value_counts_without_nan=value_counts_without_nan,
47+
value_counts_index_sorted=value_counts_index_sorted,
48+
ordering=False,
49+
n_missing=n_missing,
50+
n=length,
51+
p_missing=n_missing / length,
52+
count=length - n_missing,
53+
memory_size=memory_size,
54+
value_counts=value_counts.persist(),
55+
)
Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,41 @@
11
from __future__ import annotations
22

3-
from dataclasses import dataclass
4-
53
from pyspark.sql import DataFrame
64

75
from ydata_profiling.config import Settings
8-
from ydata_profiling.model.spark.var_description.counts_spark import VarCountsSpark
6+
from ydata_profiling.model.spark.var_description.counts_spark import get_counts_spark
97
from ydata_profiling.model.var_description.default import VarDescriptionHashable
108

119

12-
@dataclass
13-
class VarDescriptionSparkHashable(VarDescriptionHashable):
14-
"""Default description for pandas columns."""
15-
16-
@classmethod
17-
def from_var_counts(
18-
cls, var_counts: VarCountsSpark, init_dict: dict
19-
) -> VarDescriptionSparkHashable:
20-
"""Get a default description from a VarCountsPandas object."""
21-
22-
count = var_counts.count
23-
n_distinct = var_counts.value_counts.count()
24-
25-
p_distinct = n_distinct / count if count > 0 else 0
26-
27-
n_unique = var_counts.value_counts.where("count == 1").count()
28-
is_unique = n_unique == count
29-
p_unique = n_unique / count
30-
31-
return VarDescriptionSparkHashable(
32-
n=var_counts.n,
33-
count=var_counts.count,
34-
n_missing=var_counts.n_missing,
35-
p_missing=var_counts.p_missing,
36-
hashable=var_counts.hashable,
37-
memory_size=var_counts.memory_size,
38-
ordering=var_counts.ordering,
39-
value_counts_index_sorted=var_counts.value_counts_index_sorted,
40-
value_counts_without_nan=var_counts.value_counts_without_nan,
41-
var_specific=init_dict,
42-
is_unique=is_unique,
43-
n_unique=n_unique,
44-
n_distinct=n_distinct,
45-
p_distinct=p_distinct,
46-
p_unique=p_unique,
47-
value_counts=var_counts.value_counts,
48-
)
49-
50-
5110
def get_default_spark_description(
5211
config: Settings, series: DataFrame, init_dict: dict
53-
) -> VarDescriptionSparkHashable:
54-
_var_counts = VarCountsSpark(config, series)
55-
return VarDescriptionSparkHashable.from_var_counts(_var_counts, init_dict)
12+
) -> VarDescriptionHashable:
13+
var_counts = get_counts_spark(config, series)
14+
15+
count = var_counts.count
16+
n_distinct = var_counts.value_counts.count()
17+
18+
p_distinct = n_distinct / count if count > 0 else 0
19+
20+
n_unique = var_counts.value_counts.where("count == 1").count()
21+
is_unique = n_unique == count
22+
p_unique = n_unique / count
23+
24+
return VarDescriptionHashable(
25+
n=var_counts.n,
26+
count=var_counts.count,
27+
n_missing=var_counts.n_missing,
28+
p_missing=var_counts.p_missing,
29+
hashable=var_counts.hashable,
30+
memory_size=var_counts.memory_size,
31+
ordering=var_counts.ordering,
32+
value_counts_index_sorted=var_counts.value_counts_index_sorted,
33+
value_counts_without_nan=var_counts.value_counts_without_nan,
34+
var_specific=init_dict,
35+
is_unique=is_unique,
36+
n_unique=n_unique,
37+
n_distinct=n_distinct,
38+
p_distinct=p_distinct,
39+
p_unique=p_unique,
40+
value_counts=var_counts.value_counts,
41+
)

0 commit comments

Comments
 (0)