Skip to content

Commit

Permalink
[WIP] Part 1 fix for categorical mem opt issue (#795)
Browse files Browse the repository at this point in the history
* part_1 of fix for mem optimization for categoical dict creation issue

* precommit fix

* Separated the update from the check in stop conditions for categoical columns

* added tests and accounted for different varaibles affected by the change made to categories attribute

* Modifications to code based on test findings

* Fixes for logic and tests to match requirements from PR

* Fix for rebase carry over issue

* fixes for tests because of changes to variable names in categorical column object

* precommit fixes and improvement of code based on testing
  • Loading branch information
ksneab7 authored May 3, 2023
1 parent f206af2 commit ebb3995
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 10 deletions.
124 changes: 115 additions & 9 deletions dataprofiler/profilers/categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from collections import defaultdict
from operator import itemgetter
from typing import cast

from pandas import DataFrame, Series

Expand Down Expand Up @@ -43,6 +44,14 @@ def __init__(self, name: str | None, options: CategoricalOptions = None) -> None
self.__calculations: dict = {}
self._filter_properties_w_options(self.__calculations, options)
self._top_k_categories: int | None = None

# Conditions to stop categorical profiling
self.max_sample_size_to_check_stop_condition = None
self.stop_condition_unique_value_ratio = None
self._stop_condition_is_met = False

self._stopped_at_unique_ratio: float | None = None
self._stopped_at_unique_count: int | None = None
if options:
self._top_k_categories = options.top_k_categories

Expand All @@ -63,13 +72,59 @@ def __add__(self, other: CategoricalColumn) -> CategoricalColumn:
)

merged_profile = CategoricalColumn(None)
merged_profile._categories = utils.add_nested_dictionaries(
self._categories, other._categories
)
BaseColumnProfiler._add_helper(merged_profile, self, other)

self._merge_calculations(
merged_profile.__calculations, self.__calculations, other.__calculations
)
# If both profiles have not met stop condition
if not (self._stop_condition_is_met or other._stop_condition_is_met):
merged_profile._categories = utils.add_nested_dictionaries(
self._categories, other._categories
)

# Transfer stop condition variables of 1st profile object to merged profile
# if they are not None else set to 2nd profile
profile1_product = self.sample_size * self.unique_ratio
profile2_product = other.sample_size * other.unique_ratio
if profile1_product > profile2_product:
merged_profile.max_sample_size_to_check_stop_condition = (
self.max_sample_size_to_check_stop_condition
)
merged_profile.stop_condition_unique_value_ratio = (
self.stop_condition_unique_value_ratio
)
else:
merged_profile.stop_condition_unique_value_ratio = (
other.stop_condition_unique_value_ratio
)
merged_profile.max_sample_size_to_check_stop_condition = (
other.max_sample_size_to_check_stop_condition
)

# Check merged profile w/ stop condition
if merged_profile._check_stop_condition_is_met(
merged_profile.sample_size, merged_profile.unique_ratio
):
merged_profile._stopped_at_unique_ratio = merged_profile.unique_ratio
merged_profile._stopped_at_unique_count = merged_profile.unique_count
merged_profile._categories = {}
merged_profile._stop_condition_is_met = True

else:
if self.sample_size > other.sample_size:
merged_profile._stopped_at_unique_ratio = self.unique_ratio
merged_profile._stopped_at_unique_count = self.unique_count
merged_profile.sample_size = self.sample_size
else:
merged_profile._stopped_at_unique_ratio = other.unique_ratio
merged_profile._stopped_at_unique_count = other.unique_count
merged_profile.sample_size = other.sample_size

# If either profile has hit stop condition, remove categories dict
merged_profile._categories = {}
merged_profile._stop_condition_is_met = True

return merged_profile

def diff(self, other_profile: CategoricalColumn, options: dict = None) -> dict:
Expand All @@ -92,7 +147,7 @@ def diff(self, other_profile: CategoricalColumn, options: dict = None) -> dict:
(
"unique_count",
utils.find_diff_of_numbers(
len(self.categories), len(other_profile.categories)
self.unique_count, other_profile.unique_count
),
),
(
Expand Down Expand Up @@ -162,7 +217,7 @@ def profile(self) -> dict:
categorical=self.is_match,
statistics=dict(
[
("unique_count", len(self.categories)),
("unique_count", self.unique_count),
("unique_ratio", self.unique_ratio),
]
),
Expand Down Expand Up @@ -192,10 +247,20 @@ def categorical_counts(self) -> dict[str, int]:
@property
def unique_ratio(self) -> float:
"""Return ratio of unique categories to sample_size."""
unique_ratio = 1.0
if self._stop_condition_is_met:
return cast(float, self._stopped_at_unique_ratio)

if self.sample_size:
unique_ratio = len(self.categories) / self.sample_size
return unique_ratio
return len(self.categories) / self.sample_size
return 0

@property
def unique_count(self) -> int:
"""Return ratio of unique categories to sample_size."""
if self._stop_condition_is_met:
return cast(int, self._stopped_at_unique_count)

return len(self.categories)

@property
def is_match(self) -> bool:
Expand All @@ -211,6 +276,43 @@ def is_match(self) -> bool:
is_match = True
return is_match

def _check_stop_condition_is_met(self, sample_size: int, unqiue_ratio: float):
"""Return boolean given stop conditions.
:param sample_size: Number of samples to check the stop condition
:type sample_size: int
:param unqiue_ratio: Ratio of unique values to full sample size to
check stop condition
:type unqiue_ratio: float
:return: boolean for stop conditions
"""
if (
self.max_sample_size_to_check_stop_condition is not None
and self.stop_condition_unique_value_ratio is not None
and sample_size >= self.max_sample_size_to_check_stop_condition
and unqiue_ratio >= self.stop_condition_unique_value_ratio
):
return True
return False

def _update_stop_condition(self, data: DataFrame):
"""Return value stop_condition_is_met given stop conditions.
:param data: Dataframe currently being processed by categorical profiler
:type data: DataFrame
:return: boolean for stop conditions
"""
merged_unique_count = len(self._categories)
merged_sample_size = self.sample_size + len(data)
merged_unique_ratio = merged_unique_count / merged_sample_size

self._stop_condition_is_met = self._check_stop_condition_is_met(
merged_sample_size, merged_unique_ratio
)
if self._stop_condition_is_met:
self._stopped_at_unique_ratio = merged_unique_ratio
self._stopped_at_unique_count = merged_unique_count

@BaseColumnProfiler._timeit(name="categories")
def _update_categories(
self,
Expand All @@ -237,6 +339,9 @@ def _update_categories(
self._categories = utils.add_nested_dictionaries(
self._categories, category_count
)
self._update_stop_condition(df_series)
if self._stop_condition_is_met:
self._categories = {}

def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
"""
Expand All @@ -259,7 +364,8 @@ def update(self, df_series: Series) -> CategoricalColumn:
:return: updated CategoricalColumn
:rtype: CategoricalColumn
"""
if len(df_series) == 0:
# If condition for limiting profile calculations
if len(df_series) == 0 or self._stop_condition_is_met:
return self

profile = dict(sample_size=len(df_series))
Expand Down
103 changes: 103 additions & 0 deletions dataprofiler/tests/profilers/test_categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,42 @@ def test_correct_categorical_model_string(self):
}
self.assertCountEqual(categories, profile.categories)

def test_stop_condition_is_met_initially(self):
dataset = pd.Series(["a"] * 10 + ["b"] * 10 + ["c"] * 10 + ["d"] * 10)
profile = CategoricalColumn("test dataset")
profile.max_sample_size_to_check_stop_condition = 0
profile.stop_condition_unique_value_ratio = 0
profile.update(dataset)

self.assertTrue(profile._stop_condition_is_met)
self.assertEqual(profile.categories, [])
self.assertEqual(profile.unique_ratio, 0.1)
self.assertEqual(profile.unique_count, 4)

def test_stop_condition_is_met_after_initial_profile(self):
dataset = pd.Series(["a"] * 10 + ["b"] * 10 + ["c"] * 10 + ["d"] * 10)
profile = CategoricalColumn("test dataset")
profile.max_sample_size_to_check_stop_condition = len(dataset) + 1
profile.stop_condition_unique_value_ratio = 0
profile.update(dataset)

self.assertFalse(profile._stop_condition_is_met)

dataset.loc[len(dataset.index)] = "Testing past ratio"
profile.update(dataset)

self.assertTrue(profile._stop_condition_is_met)
self.assertEqual([], profile.categories)
self.assertEqual(5, profile.unique_count)
self.assertEqual((5 / 81), profile.unique_ratio)

profile.update(dataset)
self.assertTrue(profile._stop_condition_is_met)
self.assertEqual([], profile.categories)
self.assertEqual(5, profile.unique_count)
self.assertEqual((5 / 81), profile.unique_ratio)
self.assertEqual(81, profile.sample_size)

def test_timeit_profile(self):
dataset = self.aws_dataset["host"].dropna()
profile = CategoricalColumn(dataset.name)
Expand Down Expand Up @@ -578,6 +614,63 @@ def test_categorical_merge(self):
}
self.assertCountEqual(report_count, expected_dict)

# Setting up of profile with stop condition not yet met
profile_w_stop_cond_1 = CategoricalColumn("merge_stop_condition_test")
profile_w_stop_cond_1.max_sample_size_to_check_stop_condition = 12
profile_w_stop_cond_1.stop_condition_unique_value_ratio = 0
profile_w_stop_cond_1.update(df1)

self.assertFalse(profile_w_stop_cond_1._stop_condition_is_met)

# Setting up of profile without stop condition met
profile_w_stop_cond_2 = CategoricalColumn("merge_stop_condition_test")
profile_w_stop_cond_2.max_sample_size_to_check_stop_condition = 12
profile_w_stop_cond_2.stop_condition_unique_value_ratio = 0
profile_w_stop_cond_2.update(df2)

self.assertFalse(profile_w_stop_cond_1._stop_condition_is_met)

# Merge profiles w/o condition met
merged_stop_cond_profile_1 = profile_w_stop_cond_1 + profile_w_stop_cond_2

# Test whether merge caused stop condition to be hit
self.assertTrue(merged_stop_cond_profile_1._stop_condition_is_met)
self.assertEqual([], merged_stop_cond_profile_1.categories)
self.assertEqual(16, merged_stop_cond_profile_1.unique_count)
self.assertEqual((16 / 22), merged_stop_cond_profile_1.unique_ratio)
self.assertEqual(22, merged_stop_cond_profile_1.sample_size)

# Merge profile w/ and w/o condition met
merged_stop_cond_profile_2 = merged_stop_cond_profile_1 + profile_w_stop_cond_2

# Test whether merged profile stays persistently with condition met
self.assertTrue(merged_stop_cond_profile_2._stop_condition_is_met)
self.assertEqual([], merged_stop_cond_profile_2.categories)
self.assertEqual(16, merged_stop_cond_profile_2.unique_count)
self.assertEqual(
merged_stop_cond_profile_1.unique_ratio,
merged_stop_cond_profile_2.unique_ratio,
)
self.assertEqual(22, merged_stop_cond_profile_2.sample_size)

# Merge profile w/ and w/o condition met (ensure operator communitivity)
merged_stop_cond_profile_3 = profile_w_stop_cond_2 + merged_stop_cond_profile_1
self.assertTrue(merged_stop_cond_profile_3._stop_condition_is_met)
self.assertEqual([], merged_stop_cond_profile_3.categories)
self.assertEqual(16, merged_stop_cond_profile_3.unique_count)
self.assertEqual(
merged_stop_cond_profile_1.unique_ratio,
merged_stop_cond_profile_2.unique_ratio,
)
self.assertEqual(22, merged_stop_cond_profile_2.sample_size)

# Ensure successful merge without stop condition met
profile_w_stop_cond_1.stop_condition_unique_value_ratio = 0.99
merge_stop_conditions_not_met = profile_w_stop_cond_1 + profile_w_stop_cond_1
self.assertFalse(merge_stop_conditions_not_met._stop_condition_is_met)
self.assertIsNone(merge_stop_conditions_not_met._stopped_at_unique_count)
self.assertIsNone(merge_stop_conditions_not_met._stopped_at_unique_ratio)

def test_gini_impurity(self):
# Normal test
df_categorical = pd.Series(["y", "y", "y", "y", "n", "n", "n"])
Expand Down Expand Up @@ -722,6 +815,11 @@ def test_json_encode(self):
"_categories": defaultdict(int),
"_CategoricalColumn__calculations": dict(),
"_top_k_categories": None,
"max_sample_size_to_check_stop_condition": None,
"stop_condition_unique_value_ratio": None,
"_stop_condition_is_met": False,
"_stopped_at_unique_ratio": None,
"_stopped_at_unique_count": None,
},
}
)
Expand Down Expand Up @@ -764,6 +862,11 @@ def test_json_encode_after_update(self):
"_categories": {"c": 5, "b": 4, "a": 3},
"_CategoricalColumn__calculations": {},
"_top_k_categories": None,
"max_sample_size_to_check_stop_condition": None,
"stop_condition_unique_value_ratio": None,
"_stop_condition_is_met": False,
"_stopped_at_unique_ratio": None,
"_stopped_at_unique_count": None,
},
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataset_generation import NumpyEncoder, generate_dataset_by_class, nan_injection

from dataprofiler import StructuredProfiler
from dataprofiler.data_readers.csv_data import CSVData

# suppress TF warnings
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
Expand Down Expand Up @@ -279,7 +280,7 @@ def dp_space_time_analysis(
)
print(f"Dataset of size {max(SAMPLE_SIZES)} created.")
else:
_full_dataset = dp.Data(DATASET_PATH)
_full_dataset = CSVData(DATASET_PATH)

dp_space_time_analysis(
_rng,
Expand Down

0 comments on commit ebb3995

Please sign in to comment.