From b5c3de5f1397bfb9422590ba037f431c2eaad9ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20B=C3=A5venstrand?= Date: Mon, 26 Jun 2023 23:12:34 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(cache):=20=E2=9C=A8=20Add=20cache=5Fgr?= =?UTF-8?q?oup=20that=20can=20segment=20an=20instance=20cache=20into=20dif?= =?UTF-8?q?ferent=20isolated=20parts.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Is useful for limiting the number of cache entries while also allowing unique operations which perform different actions to be cache at the same time. --- examples/Experiment.ipynb | 260 +++--------------- mleko/cache/cache_mixin.py | 44 ++- mleko/cache/lru_cache_mixin.py | 66 +++-- mleko/dataset/convert/base_converter.py | 5 +- .../dataset/convert/csv_to_vaex_converter.py | 6 +- .../feature_select/base_feature_selector.py | 5 +- .../composite_feature_selector.py | 6 +- .../invariance_feature_selector.py | 6 +- .../missing_rate_feature_selector.py | 6 +- .../pearson_correlation_feature_selector.py | 6 +- .../variance_feature_selector.py | 6 +- mleko/dataset/split/base_splitter.py | 5 +- mleko/dataset/split/expression_splitter.py | 6 +- mleko/dataset/split/random_splitter.py | 6 +- mleko/dataset/transform/base_transformer.py | 5 +- .../transform/composite_transformer.py | 6 +- .../frequency_encoder_transformer.py | 6 +- .../transform/label_encoder_transformer.py | 6 +- .../transform/max_abs_scaler_transformer.py | 6 +- .../transform/min_max_scaler_transformer.py | 6 +- mleko/pipeline/pipeline_step.py | 15 +- mleko/pipeline/steps/convert_step.py | 10 +- mleko/pipeline/steps/feature_select_step.py | 10 +- mleko/pipeline/steps/ingest_step.py | 4 +- mleko/pipeline/steps/split_step.py | 12 +- mleko/pipeline/steps/transform_step.py | 10 +- tests/cache/test_cache_mixin.py | 34 ++- tests/cache/test_lru_cache_mixin.py | 11 +- tests/pipeline/steps/test_convert_step.py | 6 +- .../steps/test_feature_select_step.py | 4 +- tests/pipeline/steps/test_split_step.py | 6 +- tests/pipeline/steps/test_transform_step.py | 6 +- tests/pipeline/test_pipeline.py | 16 +- tests/pipeline/test_pipeline_step.py | 18 +- 34 files changed, 293 insertions(+), 337 deletions(-) diff --git a/examples/Experiment.ipynb b/examples/Experiment.ipynb index 76f29027..b9cf9cb0 100644 --- a/examples/Experiment.ipynb +++ b/examples/Experiment.ipynb @@ -27,11 +27,11 @@ "source": [ "from mleko.dataset.convert import CSVToVaexConverter\n", "from mleko.dataset.feature_select import (\n", - " MissingRateFeatureSelector,\n", - " VarianceFeatureSelector,\n", " CompositeFeatureSelector,\n", - " PearsonCorrelationFeatureSelector,\n", " InvarianceFeatureSelector,\n", + " MissingRateFeatureSelector,\n", + " PearsonCorrelationFeatureSelector,\n", + " VarianceFeatureSelector,\n", ")\n", "from mleko.dataset.ingest import KaggleIngester\n", "from mleko.dataset.split import ExpressionSplitter, RandomSplitter\n", @@ -54,14 +54,14 @@ "metadata": {}, "outputs": [], "source": [ - "OWNER_SLUG = 'mlg-ulb'\n", - "DATASET_SLUG = 'creditcardfraud'\n", - "DATASET_NAME = f'{OWNER_SLUG}/{DATASET_SLUG}'\n", + "OWNER_SLUG = \"mlg-ulb\"\n", + "DATASET_SLUG = \"creditcardfraud\"\n", + "DATASET_NAME = f\"{OWNER_SLUG}/{DATASET_SLUG}\"\n", "\n", "TARGET_FEATURE = \"Class\"\n", "TIME_FEATURE = \"Time\"\n", "META_FEATURES = [TIME_FEATURE, TARGET_FEATURE]\n", - "RANDOM_STATE = 1337" + "RANDOM_STATE = 1337\n" ] }, { @@ -74,16 +74,16 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "[2023-06-16 20:14:01] [\u001b[1;32mINFO\u001b[0m] Attempting to fetch Kaggle API credentials from environment variables 'KAGGLE_USERNAME' and 'KAGGLE_KEY'. \u001b[1m(kaggle_ingester.py:74)\u001b[0m\n", - "[2023-06-16 20:14:01] [\u001b[1;33mWARNING\u001b[0m] Kaggle API credentials not found in environment variables, attempting to fetch from fallback path at ~/.kaggle/kaggle.json. \u001b[1m(kaggle_ingester.py:82)\u001b[0m\n", - "[2023-06-16 20:14:01] [\u001b[1;32mINFO\u001b[0m] Kaggle credentials successfully fetched. \u001b[1m(kaggle_ingester.py:91)\u001b[0m\n" + "[2023-06-26 23:05:25] [\u001b[1;32mINFO\u001b[0m] Attempting to fetch Kaggle API credentials from environment variables 'KAGGLE_USERNAME' and 'KAGGLE_KEY'. \u001b[1m(kaggle_ingester.py:74)\u001b[0m\n", + "[2023-06-26 23:05:25] [\u001b[1;33mWARNING\u001b[0m] Kaggle API credentials not found in environment variables, attempting to fetch from fallback path at ~/.kaggle/kaggle.json. \u001b[1m(kaggle_ingester.py:82)\u001b[0m\n", + "[2023-06-26 23:05:25] [\u001b[1;32mINFO\u001b[0m] Kaggle credentials successfully fetched. \u001b[1m(kaggle_ingester.py:91)\u001b[0m\n" ] } ], @@ -131,13 +131,14 @@ " steps=[\n", " IngestStep(kaggle_data_source, outputs=[\"raw_csv\"]),\n", " ConvertStep(csv_to_arrow_converter, inputs=[\"raw_csv\"], outputs=[\"df_clean\"]),\n", - " SplitStep(random_data_splitter, inputs=[\"df_clean\"], outputs=[\"df_train_validate\", \"df_test\"]),\n", + " SplitStep(random_data_splitter, inputs=[\"df_clean\"], outputs=[\"df_train_validate\", \"df_test\"], cache_group=\"train_test_split\"),\n", " FeatureSelectStep(\n", " composite_feature_selector,\n", " inputs=[\"df_train_validate\"],\n", " outputs=[\"df_train_validate_features_selected\"],\n", + " cache_group=\"feature_selection_train_validate\",\n", " ),\n", - " SplitStep(expression_data_splitter, inputs=[\"df_train_validate_features_selected\"], outputs=[\"df_train\", \"df_validate\"]),\n", + " SplitStep(random_data_splitter, inputs=[\"df_train_validate_features_selected\"], outputs=[\"df_train\", \"df_validate\"], cache_group=\"train_test_split\"),\n", " ]\n", ")\n" ] @@ -152,224 +153,37 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "[2023-06-16 20:14:18] [\u001b[1;32mINFO\u001b[0m] No data container provided. Creating an empty one. \u001b[1m(pipeline.py:77)\u001b[0m\n", - "[2023-06-16 20:14:18] [\u001b[1;32mINFO\u001b[0m] Executing step 1/5: IngestStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", - "[2023-06-16 20:14:19] [\u001b[1;32mINFO\u001b[0m] \u001b[33mForce Cache Refresh\u001b[0m: Downloading mlg-ulb/creditcardfraud/* to data/mlg-ulb/creditcardfraud/raw from Kaggle. \u001b[1m(kaggle_ingester.py:287)\u001b[0m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "2dd3d7165c1b48f8b8788a0de22833d5", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Downloading files from Kaggle: 0%| | 0/1 [00:00= 0.7: set(). \u001b[1m(missing_rate_feature_selector.py:108)\u001b[0m\n", - "[2023-06-16 20:14:31] [\u001b[1;32mINFO\u001b[0m] Finished composite feature selection step 1/4. \u001b[1m(composite_feature_selector.py:113)\u001b[0m\n", - "[2023-06-16 20:14:31] [\u001b[1;32mINFO\u001b[0m] Executing composite feature selection step 2/4: VarianceFeatureSelector. \u001b[1m(composite_feature_selector.py:108)\u001b[0m\n", - "[2023-06-16 20:14:31] [\u001b[1;32mINFO\u001b[0m] Selecting features from the following set (29): ['Amount', 'V1', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V2', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9']. \u001b[1m(variance_feature_selector.py:104)\u001b[0m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "5ad51159094145ef94a21e7987ef13a7", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Calculating variance for features: 0%| | 0/29 [00:00= 0.7: set(). \u001b[1m(pearson_correlation_feature_selector.py:147)\u001b[0m\n", - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Finished composite feature selection step 3/4. \u001b[1m(composite_feature_selector.py:113)\u001b[0m\n", - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Executing composite feature selection step 4/4: InvarianceFeatureSelector. \u001b[1m(composite_feature_selector.py:108)\u001b[0m\n", - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Selecting features from the following set (0): []. \u001b[1m(invariance_feature_selector.py:98)\u001b[0m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "2f23c61b036849978c348803f8ca2909", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Calculating invariance of features: 0it [00:00, ?it/s]" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Dropping (0) invariant features: set(). \u001b[1m(invariance_feature_selector.py:106)\u001b[0m\n", - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Finished composite feature selection step 4/4. \u001b[1m(composite_feature_selector.py:113)\u001b[0m\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "679efb0ea68f4a478f6c0e88ab8edfb7", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Writing DataFrame to .arrow file: 0%| | 0/100 [00:00 100'. \u001b[1m(expression_splitter.py:92)\u001b[0m\n", - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Split dataframe into two dataframes with shapes (227718, 31) and (127, 31). \u001b[1m(expression_splitter.py:95)\u001b[0m\n" + "[2023-06-26 23:05:28] [\u001b[1;32mINFO\u001b[0m] No data container provided. Creating an empty one. \u001b[1m(pipeline.py:77)\u001b[0m\n", + "[2023-06-26 23:05:28] [\u001b[1;32mINFO\u001b[0m] Executing step 1/5: IngestStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] \u001b[32mCache Hit\u001b[0m: Local dataset is up to date with Kaggle, skipping download. \u001b[1m(kaggle_ingester.py:279)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Finished step 1/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Executing step 2/5: ConvertStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] \u001b[32mCache Hit\u001b[0m (LRUCache) CSVToVaexConverter.convert: Using cached output. \u001b[1m(cache_mixin.py:134)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Finished step 2/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Executing step 3/5: SplitStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] \u001b[32mCache Hit\u001b[0m (LRUCache) RandomSplitter.split: Using cached output. \u001b[1m(cache_mixin.py:134)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Finished step 3/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Executing step 4/5: FeatureSelectStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] \u001b[32mCache Hit\u001b[0m (LRUCache) CompositeFeatureSelector.select_features: Using cached output. \u001b[1m(cache_mixin.py:134)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Finished step 4/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Executing step 5/5: SplitStep. \u001b[1m(pipeline.py:81)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] \u001b[31mCache Miss\u001b[0m (LRUCache) RandomSplitter.split: Executing method. \u001b[1m(cache_mixin.py:139)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Shuffling data before splitting. \u001b[1m(random_splitter.py:126)\u001b[0m\n", + "[2023-06-26 23:05:29] [\u001b[1;32mINFO\u001b[0m] Splitting data with stratification on column 'Class'. \u001b[1m(random_splitter.py:130)\u001b[0m\n", + "[2023-06-26 23:05:30] [\u001b[1;32mINFO\u001b[0m] Split dataframe into two dataframes with shapes (182276, 32) and (45569, 32). \u001b[1m(random_splitter.py:142)\u001b[0m\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "ca3ef9fa89cb467b84f8608497c35cfd", + "model_id": "e31294af1e634862a6c5b2b3612fbeb5", "version_major": 2, "version_minor": 0 }, @@ -383,7 +197,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "c6e1c300c9fe4be88136e04f9e288aee", + "model_id": "7778a10018f44efba0bf650864f44c08", "version_major": 2, "version_minor": 0 }, @@ -398,12 +212,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "[2023-06-16 20:14:32] [\u001b[1;32mINFO\u001b[0m] Finished step 5/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n" + "[2023-06-26 23:05:30] [\u001b[1;32mINFO\u001b[0m] Finished step 5/5 execution. \u001b[1m(pipeline.py:83)\u001b[0m\n" ] } ], "source": [ - "data_container = pipeline.run(force_recompute=True).data" + "data_container = pipeline.run().data" ] }, { diff --git a/mleko/cache/cache_mixin.py b/mleko/cache/cache_mixin.py index d337222c..cbd173af 100644 --- a/mleko/cache/cache_mixin.py +++ b/mleko/cache/cache_mixin.py @@ -99,14 +99,24 @@ def _cached_execute( self, lambda_func: Callable[[], Any], cache_keys: list[Hashable | tuple[Any, BaseFingerprinter]], + cache_group: str | None = None, force_recompute: bool = False, ) -> Any: """Executes the given function, caching the results based on the provided cache keys and fingerprints. + Warning: + The cache group is used to group related cache keys together to prevent collisions between cache keys + originating from the same method. For example, if a method is called during the training and testing + phases of a machine learning pipeline, the cache keys for the training and testing phases should be + using different cache groups to prevent collisions between the cache keys for the two phases. Otherwise, + the later cache keys might overwrite the earlier cache entries. + Args: lambda_func: A lambda function to execute. cache_keys: A list of cache keys that can be a mix of hashable values and tuples containing a value and a BaseFingerprinter instance for generating fingerprints. + cache_group: A string representing the cache group, used to group related cache keys together when methods + are called independently. force_recompute: A boolean indicating whether to force recompute the result and update the cache, even if a cached result is available. @@ -116,7 +126,7 @@ def _cached_execute( """ frame_qualname = get_frame_qualname(inspect.stack()[1]) class_method_name = ".".join(frame_qualname.split(".")[-2:]) - cache_key = self._compute_cache_key(cache_keys, frame_qualname) + cache_key = self._compute_cache_key(cache_keys, class_method_name, cache_group) if not force_recompute: output = self._load_from_cache(cache_key) @@ -139,14 +149,22 @@ def _cached_execute( return self._load_from_cache(cache_key) def _compute_cache_key( - self, cache_keys: list[Hashable | tuple[Any, BaseFingerprinter]], frame_qualname: str + self, + cache_keys: list[Hashable | tuple[Any, BaseFingerprinter]], + class_method_name: str, + cache_group: str | None = None, ) -> str: """Computes the cache key based on the provided cache keys and the calling function's fully qualified name. Args: cache_keys: A list of cache keys that can be a mix of hashable values and tuples containing a value and a BaseFingerprinter instance for generating fingerprints. - frame_qualname: The fully qualified name of the cached function stack frame. + class_method_name: A string of format "class.method" for class methods or "module.function" for + functions, representing the fully qualified name of the calling function or method. + cache_group: A string representing the cache group. + + Raises: + ValueError: If the computed cache key is too long. Returns: A string representing the computed cache key, which is the MD5 hash of the fully qualified name of the @@ -161,12 +179,22 @@ def _compute_cache_key( else: values_to_hash.append(key) - data = pickle.dumps((frame_qualname, values_to_hash)) - - class_method_name = ".".join(frame_qualname.split(".")[-2:]) - cache_key = f"{class_method_name}.{hashlib.md5(data).hexdigest()}" + data = pickle.dumps(values_to_hash) + cache_key_prefix = class_method_name + if cache_group is not None: + cache_key_prefix = f"{cache_key_prefix}.{cache_group}" + + cache_key = f"{cache_key_prefix}.{hashlib.md5(data).hexdigest()}" + if len(cache_key) + 1 + len(self._cache_file_suffix) > 255: + raise ValueError( + f"The computed cache key is too long ({len(cache_key) + len(self._cache_file_suffix)} chars)." + "The maximum length of a cache key is 255 chars, and given the current class, the maximum " + "length of the provided cache_group is " + f"{255 - len(cache_key_prefix) - 32 - 1 - len(self._cache_file_suffix)} chars." + "Please reduce the length of the cache_group." + ) - return cache_key + return f"{cache_key_prefix}.{hashlib.md5(data).hexdigest()}" def _read_cache_file(self, cache_file_path: Path) -> Any: """Reads the cache file from the specified path and returns the deserialized data. diff --git a/mleko/cache/lru_cache_mixin.py b/mleko/cache/lru_cache_mixin.py index 2cedbddf..12d013bd 100644 --- a/mleko/cache/lru_cache_mixin.py +++ b/mleko/cache/lru_cache_mixin.py @@ -10,7 +10,7 @@ import inspect import re -from collections import OrderedDict +from collections import OrderedDict, defaultdict from pathlib import Path from typing import Any @@ -69,7 +69,7 @@ def __init__(self, cache_directory: str | Path, cache_file_suffix: str, cache_si """ super().__init__(cache_directory, cache_file_suffix) self._cache_size = cache_size - self._cache: OrderedDict[str, bool] = OrderedDict() + self._cache: dict[str, OrderedDict[str, bool]] = defaultdict(OrderedDict) self._load_cache_from_disk() def _load_cache_from_disk(self) -> None: @@ -79,23 +79,30 @@ def _load_cache_from_disk(self) -> None: """ frame_qualname = get_frame_qualname(inspect.stack()[2]) class_name = frame_qualname.split(".")[-2] - file_name_pattern = rf"{class_name}\.[a-zA-Z_][a-zA-Z0-9_]*\.[a-fA-F\d]{{32}}" + file_name_pattern = rf"{class_name}\.([a-zA-Z_][a-zA-Z0-9_]*)(\.[a-zA-Z_][a-zA-Z0-9_]*)?\.[a-fA-F\d]{{32}}" + cache_files = [ f for f in self._cache_directory.glob(f"*.{self._cache_file_suffix}") if re.search(file_name_pattern, str(f.stem)) ] ordered_cache_files = sorted(cache_files, key=lambda x: x.stat().st_mtime) + for cache_file in ordered_cache_files: cache_key_match = re.search(file_name_pattern, cache_file.stem) - cache_key = cache_key_match.group(0) # type: ignore - if cache_key not in self._cache: - if len(self._cache) >= self._cache_size: - oldest_key = next(iter(self._cache)) - del self._cache[oldest_key] - for file in self._cache_directory.glob(f"{oldest_key}*.{self._cache_file_suffix}"): - file.unlink() - self._cache[cache_key] = True + if cache_key_match: + method_name, cache_group = cache_key_match.groups() + group_identifier = method_name + cache_group if cache_group else method_name + cache_key = cache_key_match.group(0) + + if cache_key not in self._cache[group_identifier]: + if len(self._cache[group_identifier]) >= self._cache_size: + oldest_key = next(iter(self._cache[group_identifier])) + del self._cache[group_identifier][oldest_key] + for file in self._cache_directory.glob(f"{oldest_key}*.{self._cache_file_suffix}"): + file.unlink() + + self._cache[group_identifier][cache_key] = True def _load_from_cache(self, cache_key: str) -> Any | None: """Loads data from the cache based on the provided cache key and updates the LRU cache. @@ -106,9 +113,11 @@ def _load_from_cache(self, cache_key: str) -> Any | None: Returns: The cached data if it exists, or None if there is no data for the given cache key. """ - if cache_key in self._cache: - self._cache.move_to_end(cache_key) - return super()._load_from_cache(cache_key) + for group_identifier in self._cache.keys(): + if cache_key in self._cache[group_identifier]: + self._cache[group_identifier].move_to_end(cache_key) + return super()._load_from_cache(cache_key) + return None def _save_to_cache(self, cache_key: str, output: Any) -> None: """Saves the given data to the cache using the provided cache key, updating the LRU cache accordingly. @@ -119,13 +128,22 @@ def _save_to_cache(self, cache_key: str, output: Any) -> None: cache_key: A string representing the cache key. output: The data to be saved to the cache. """ - if cache_key not in self._cache: - if len(self._cache) >= self._cache_size: - oldest_key = next(iter(self._cache)) - del self._cache[oldest_key] - for file in self._cache_directory.glob(f"{oldest_key}*.{self._cache_file_suffix}"): - file.unlink() - self._cache[cache_key] = True - else: - self._cache.move_to_end(cache_key) - super()._save_to_cache(cache_key, output) + cache_key_match = re.match( + r"[a-zA-Z_][a-zA-Z0-9_]*\.([a-zA-Z_][a-zA-Z0-9_]*)(\.[a-zA-Z_][a-zA-Z0-9_]*)?\.[a-fA-F\d]{32}", cache_key + ) + if cache_key_match: + method_name, cache_group = cache_key_match.groups() + group_identifier = method_name + cache_group if cache_group else method_name + + if cache_key not in self._cache[group_identifier]: + if len(self._cache[group_identifier]) >= self._cache_size: + oldest_key = next(iter(self._cache[group_identifier])) + del self._cache[group_identifier][oldest_key] + for file in self._cache_directory.glob(f"{oldest_key}*.{self._cache_file_suffix}"): + file.unlink() + + self._cache[group_identifier][cache_key] = True + else: + self._cache[group_identifier].move_to_end(cache_key) + + super()._save_to_cache(cache_key, output) diff --git a/mleko/dataset/convert/base_converter.py b/mleko/dataset/convert/base_converter.py index 25b8e61b..ea661a28 100644 --- a/mleko/dataset/convert/base_converter.py +++ b/mleko/dataset/convert/base_converter.py @@ -26,11 +26,14 @@ def __init__(self, cache_directory: str | Path, cache_size: int): LRUCacheMixin.__init__(self, cache_directory, self._cache_file_suffix, cache_size) @abstractmethod - def convert(self, file_paths: list[Path] | list[str], force_recompute: bool = False) -> vaex.DataFrame: + def convert( + self, file_paths: list[Path] | list[str], cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Abstract method to convert the input file paths to the desired output format. Args: file_paths: A list of input file paths to be converted. + cache_group: The cache group to use. force_recompute: If set to True, forces recomputation and ignores the cache. Returns: diff --git a/mleko/dataset/convert/csv_to_vaex_converter.py b/mleko/dataset/convert/csv_to_vaex_converter.py index f47819bd..bc7d9401 100644 --- a/mleko/dataset/convert/csv_to_vaex_converter.py +++ b/mleko/dataset/convert/csv_to_vaex_converter.py @@ -117,7 +117,9 @@ def __init__( self._num_workers = num_workers self._random_state = random_state - def convert(self, file_paths: list[Path] | list[str], force_recompute: bool = False) -> vaex.DataFrame: + def convert( + self, file_paths: list[Path] | list[str], cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Converts a list of CSV files to Arrow format and returns a `vaex` dataframe joined from the converted data. The method takes care of caching, and results will be reused accordingly unless `force_recompute` @@ -131,6 +133,7 @@ def convert(self, file_paths: list[Path] | list[str], force_recompute: bool = Fa Args: file_paths: A list of file paths to be converted. + cache_group: The cache group to use. force_recompute: If set to True, forces recomputation and ignores the cache. Returns: @@ -149,6 +152,7 @@ def convert(self, file_paths: list[Path] | list[str], force_recompute: bool = Fa self._downcast_float, (file_paths, CSVFingerprinter(n_rows=100_000 // len(file_paths))), ], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/feature_select/base_feature_selector.py b/mleko/dataset/feature_select/base_feature_selector.py index 6199fad7..cd450c22 100644 --- a/mleko/dataset/feature_select/base_feature_selector.py +++ b/mleko/dataset/feature_select/base_feature_selector.py @@ -65,11 +65,14 @@ def __init__( self._ignore_features: tuple[str, ...] = tuple(ignore_features) if ignore_features is not None else tuple() @abstractmethod - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects features from the given DataFrame. Args: dataframe: DataFrame from which to select features. + cache_group: The cache group to use. force_recompute: Whether to force the feature selector to recompute its output, even if it already exists. Raises: diff --git a/mleko/dataset/feature_select/composite_feature_selector.py b/mleko/dataset/feature_select/composite_feature_selector.py index 917ff3fd..314b869d 100644 --- a/mleko/dataset/feature_select/composite_feature_selector.py +++ b/mleko/dataset/feature_select/composite_feature_selector.py @@ -78,11 +78,14 @@ def __init__( super().__init__(cache_directory, None, None, cache_size) self._feature_selectors = tuple(feature_selectors) - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects the features from the DataFrame. Args: dataframe: DataFrame from which the features will be selected. + cache_group: The cache group to use for caching. force_recompute: If True, the features will be recomputed even if they are cached. Returns: @@ -91,6 +94,7 @@ def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = Fal return self._cached_execute( lambda_func=lambda: self._select_features(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/feature_select/invariance_feature_selector.py b/mleko/dataset/feature_select/invariance_feature_selector.py index d0f733b7..bba613c7 100644 --- a/mleko/dataset/feature_select/invariance_feature_selector.py +++ b/mleko/dataset/feature_select/invariance_feature_selector.py @@ -66,11 +66,14 @@ def __init__( """ super().__init__(cache_directory, features, ignore_features, cache_size) - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects features based on invariance. Args: dataframe: The DataFrame to select features from. + cache_group: The cache group to use for caching. force_recompute: Whether to force recompute the selected features. Returns: @@ -82,6 +85,7 @@ def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = Fal self._fingerprint(), (dataframe, VaexFingerprinter()), ], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/feature_select/missing_rate_feature_selector.py b/mleko/dataset/feature_select/missing_rate_feature_selector.py index 40836ec9..25853237 100644 --- a/mleko/dataset/feature_select/missing_rate_feature_selector.py +++ b/mleko/dataset/feature_select/missing_rate_feature_selector.py @@ -69,13 +69,16 @@ def __init__( super().__init__(cache_directory, features, ignore_features, cache_size) self._missing_rate_threshold = missing_rate_threshold - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects features based on the missing rate. Will cache the result of the feature selection. Args: dataframe: The DataFrame to select features from. + cache_group: The cache group to use. force_recompute: Whether to force recompute the feature selection. Returns: @@ -84,6 +87,7 @@ def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = Fal return self._cached_execute( lambda_func=lambda: self._select_features(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/feature_select/pearson_correlation_feature_selector.py b/mleko/dataset/feature_select/pearson_correlation_feature_selector.py index 74e0391e..1d1c2658 100644 --- a/mleko/dataset/feature_select/pearson_correlation_feature_selector.py +++ b/mleko/dataset/feature_select/pearson_correlation_feature_selector.py @@ -72,11 +72,14 @@ def __init__( super().__init__(cache_directory, features, ignore_features, cache_size) self._correlation_threshold = correlation_threshold - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects features based on the Pearson correlation. Args: dataframe: The DataFrame to select features from. + cache_group: The cache group to use. force_recompute: Whether to force recompute the selected features. Returns: @@ -85,6 +88,7 @@ def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = Fal return self._cached_execute( lambda_func=lambda: self._select_features(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/feature_select/variance_feature_selector.py b/mleko/dataset/feature_select/variance_feature_selector.py index 7b60e381..f14e0a51 100644 --- a/mleko/dataset/feature_select/variance_feature_selector.py +++ b/mleko/dataset/feature_select/variance_feature_selector.py @@ -72,11 +72,14 @@ def __init__( super().__init__(cache_directory, features, ignore_features, cache_size) self._variance_threshold = variance_threshold - def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def select_features( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Selects features based on the variance. Args: dataframe: The DataFrame to select features from. + cache_group: The cache group to use. force_recompute: Whether to force recompute the selected features. Returns: @@ -88,6 +91,7 @@ def select_features(self, dataframe: vaex.DataFrame, force_recompute: bool = Fal self._fingerprint(), (dataframe, VaexFingerprinter()), ], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/split/base_splitter.py b/mleko/dataset/split/base_splitter.py index bcf024c8..468cd58a 100644 --- a/mleko/dataset/split/base_splitter.py +++ b/mleko/dataset/split/base_splitter.py @@ -26,11 +26,14 @@ def __init__(self, cache_directory: str | Path, cache_size: int): LRUCacheMixin.__init__(self, cache_directory, self._cache_file_suffix, cache_size) @abstractmethod - def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tuple[vaex.DataFrame, vaex.DataFrame]: + def split( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> tuple[vaex.DataFrame, vaex.DataFrame]: """Abstract method to split the given dataframe into two parts. Args: dataframe: The dataframe to be split. + cache_group: The cache group to use. force_recompute: Forces recomputation if True, otherwise reads from the cache if available. Returns: diff --git a/mleko/dataset/split/expression_splitter.py b/mleko/dataset/split/expression_splitter.py index eb2ff961..d4464f20 100644 --- a/mleko/dataset/split/expression_splitter.py +++ b/mleko/dataset/split/expression_splitter.py @@ -61,11 +61,14 @@ def __init__( super().__init__(cache_directory, cache_size) self._expression = expression - def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tuple[vaex.DataFrame, vaex.DataFrame]: + def split( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> tuple[vaex.DataFrame, vaex.DataFrame]: """Split the given dataframe into two parts. Args: dataframe: The dataframe to be split. + cache_group: The cache group to use. force_recompute: Forces recomputation if True, otherwise reads from the cache if available. Returns: @@ -77,6 +80,7 @@ def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tup self._expression, (dataframe, VaexFingerprinter()), ], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/split/random_splitter.py b/mleko/dataset/split/random_splitter.py index b69682be..36235432 100644 --- a/mleko/dataset/split/random_splitter.py +++ b/mleko/dataset/split/random_splitter.py @@ -78,7 +78,9 @@ def __init__( self._stratify = stratify self._random_state = random_state - def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tuple[vaex.DataFrame, vaex.DataFrame]: + def split( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> tuple[vaex.DataFrame, vaex.DataFrame]: """Split the given dataframe into two parts. Splits the dataframe into train and test sets according to the proportions, shuffle, @@ -87,6 +89,7 @@ def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tup Args: dataframe: The dataframe to be split. + cache_group: The cache group to use. force_recompute: Whether to force recompute the split, even if the cache is available. Returns: @@ -101,6 +104,7 @@ def split(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> tup self._random_state, (dataframe, VaexFingerprinter()), ], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/transform/base_transformer.py b/mleko/dataset/transform/base_transformer.py index 904174cd..99279a6e 100644 --- a/mleko/dataset/transform/base_transformer.py +++ b/mleko/dataset/transform/base_transformer.py @@ -40,11 +40,14 @@ def __init__( self._features: tuple[str, ...] = tuple(features) @abstractmethod - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transfigures the specified features in the DataFrame. Args: dataframe: DataFrame to be transformed. + cache_group: The cache group to use. force_recompute: Whether to force the transformation to be recomputed even if the result is cached. Raises: diff --git a/mleko/dataset/transform/composite_transformer.py b/mleko/dataset/transform/composite_transformer.py index d4e923c7..4872c8dc 100644 --- a/mleko/dataset/transform/composite_transformer.py +++ b/mleko/dataset/transform/composite_transformer.py @@ -74,11 +74,14 @@ def __init__( super().__init__(cache_directory, [], cache_size) self._transformers = tuple(transformers) - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transforms the DataFrame using the transformers in the order they are specified. Args: dataframe: The DataFrame to transform. + cache_group: The cache group to use. force_recompute: Whether to force the recomputation of the transformation. Returns: @@ -87,6 +90,7 @@ def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> return self._cached_execute( lambda_func=lambda: self._transform(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/transform/frequency_encoder_transformer.py b/mleko/dataset/transform/frequency_encoder_transformer.py index 7d6b974f..7f60f370 100644 --- a/mleko/dataset/transform/frequency_encoder_transformer.py +++ b/mleko/dataset/transform/frequency_encoder_transformer.py @@ -67,13 +67,16 @@ def __init__( super().__init__(cache_directory, features, cache_size) self._unseen_strategy = unseen_strategy - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transforms the features in the DataFrame using frequency encoding. Will cache the resulting DataFrame in the cache directory. Args: dataframe: The DataFrame to transform. + cache_group: The cache group to use. force_recompute: Whether to force recomputing the transformation. Returns: @@ -82,6 +85,7 @@ def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> return self._cached_execute( lambda_func=lambda: self._transform(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/transform/label_encoder_transformer.py b/mleko/dataset/transform/label_encoder_transformer.py index 037acb04..e26c55a7 100644 --- a/mleko/dataset/transform/label_encoder_transformer.py +++ b/mleko/dataset/transform/label_encoder_transformer.py @@ -65,13 +65,16 @@ def __init__( super().__init__(cache_directory, features, cache_size) self._allow_unseen = allow_unseen - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transforms the features of the given DataFrame using label encoding. Will cache the resulting DataFrame in the cache directory. Args: dataframe: The DataFrame to transform. + cache_group: The cache group to use. force_recompute: Whether to force recomputation of the transformation. Returns: @@ -80,6 +83,7 @@ def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> return self._cached_execute( lambda_func=lambda: self._transform(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/transform/max_abs_scaler_transformer.py b/mleko/dataset/transform/max_abs_scaler_transformer.py index c6ff967a..90e7700e 100644 --- a/mleko/dataset/transform/max_abs_scaler_transformer.py +++ b/mleko/dataset/transform/max_abs_scaler_transformer.py @@ -60,13 +60,16 @@ def __init__( """ super().__init__(cache_directory, features, cache_size) - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transforms the features in the DataFrame using maximum absolute scaling. Will cache the resulting DataFrame in the cache directory. Args: dataframe: The DataFrame to transform. + cache_group: The cache group to use. force_recompute: Whether to force recomputing the transformation. Returns: @@ -75,6 +78,7 @@ def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> return self._cached_execute( lambda_func=lambda: self._transform(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/dataset/transform/min_max_scaler_transformer.py b/mleko/dataset/transform/min_max_scaler_transformer.py index 04bd5c47..472a3aff 100644 --- a/mleko/dataset/transform/min_max_scaler_transformer.py +++ b/mleko/dataset/transform/min_max_scaler_transformer.py @@ -66,13 +66,16 @@ def __init__( self._min_value = min_value self._max_value = max_value - def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> vaex.DataFrame: + def transform( + self, dataframe: vaex.DataFrame, cache_group: str | None = None, force_recompute: bool = False + ) -> vaex.DataFrame: """Transforms the features in the DataFrame using min max scaling. Will cache the resulting DataFrame in the cache directory. Args: dataframe: The DataFrame to transform. + cache_group: The cache group to use. force_recompute: Whether to force recomputing the transformation. Returns: @@ -81,6 +84,7 @@ def transform(self, dataframe: vaex.DataFrame, force_recompute: bool = False) -> return self._cached_execute( lambda_func=lambda: self._transform(dataframe), cache_keys=[self._fingerprint(), (dataframe, VaexFingerprinter())], + cache_group=cache_group, force_recompute=force_recompute, ) diff --git a/mleko/pipeline/pipeline_step.py b/mleko/pipeline/pipeline_step.py index 72e76e0f..a23ac321 100644 --- a/mleko/pipeline/pipeline_step.py +++ b/mleko/pipeline/pipeline_step.py @@ -31,17 +31,20 @@ class PipelineStep(ABC): def __init__( self, - inputs: list[str] | tuple[str, ...] | tuple[()] = (), - outputs: list[str] | tuple[str, ...] | tuple[()] = (), + inputs: list[str] | tuple[str, ...] | tuple[()], + outputs: list[str] | tuple[str, ...] | tuple[()], + cache_group: str | None, ) -> None: """Initialize a new PipelineStep with the provided input and output keys. Args: inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. + cache_group: The cache group to use. """ - self.inputs = tuple(inputs) - self.outputs = tuple(outputs) + self._inputs = tuple(inputs) + self._outputs = tuple(outputs) + self._cache_group = cache_group self._validate_inputs() self._validate_outputs() @@ -72,7 +75,7 @@ def _validate_inputs(self) -> None: Raises: ValueError: If the PipelineStep has an invalid number of inputs. """ - if len(self.inputs) != self._num_inputs: + if len(self._inputs) != self._num_inputs: raise ValueError(f"{self.__class__.__name__} must have exactly {self._num_inputs} input(s).") def _validate_outputs(self) -> None: @@ -81,5 +84,5 @@ def _validate_outputs(self) -> None: Raises: ValueError: If the PipelineStep has an invalid number of outputs. """ - if len(self.outputs) != self._num_outputs: + if len(self._outputs) != self._num_outputs: raise ValueError(f"{self.__class__.__name__} must have exactly {self._num_outputs} output(s).") diff --git a/mleko/pipeline/steps/convert_step.py b/mleko/pipeline/steps/convert_step.py index ecafe61d..b2be985a 100644 --- a/mleko/pipeline/steps/convert_step.py +++ b/mleko/pipeline/steps/convert_step.py @@ -28,6 +28,7 @@ def __init__( converter: BaseConverter, inputs: list[str] | tuple[str, ...] | tuple[()] = (), outputs: list[str] | tuple[str, ...] | tuple[()] = (), + cache_group: str | None = None, ) -> None: """Initialize the ConvertStep with the specified data converter. @@ -35,8 +36,9 @@ def __init__( converter: The DataConverter responsible for handling data format conversion. inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. + cache_group: The cache group to use. """ - super().__init__(inputs, outputs) + super().__init__(inputs, outputs, cache_group) self._converter = converter def execute(self, data_container: DataContainer, force_recompute: bool) -> DataContainer: @@ -52,10 +54,10 @@ def execute(self, data_container: DataContainer, force_recompute: bool) -> DataC Returns: A DataContainer containing the converted data as a vaex dataframe. """ - file_paths = data_container.data[self.inputs[0]] + file_paths = data_container.data[self._inputs[0]] if not isinstance(file_paths, list) or not all(isinstance(e, Path) for e in file_paths): raise ValueError - df = self._converter.convert(file_paths, force_recompute) - data_container.data[self.outputs[0]] = df + df = self._converter.convert(file_paths, self._cache_group, force_recompute) + data_container.data[self._outputs[0]] = df return data_container diff --git a/mleko/pipeline/steps/feature_select_step.py b/mleko/pipeline/steps/feature_select_step.py index 5c911f45..2d056543 100644 --- a/mleko/pipeline/steps/feature_select_step.py +++ b/mleko/pipeline/steps/feature_select_step.py @@ -24,6 +24,7 @@ def __init__( feature_selector: BaseFeatureSelector, inputs: list[str] | tuple[str, ...] | tuple[()] = (), outputs: list[str] | tuple[str, ...] | tuple[()] = (), + cache_group: str | None = None, ) -> None: """Initialize the FeatureSelectStep with the specified feature selector. @@ -31,8 +32,9 @@ def __init__( feature_selector: The FeatureSelector responsible for handling feature selection. inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. + cache_group: The cache group to use. """ - super().__init__(inputs, outputs) + super().__init__(inputs, outputs, cache_group) self._feature_selector = feature_selector def execute(self, data_container: DataContainer, force_recompute: bool) -> DataContainer: @@ -48,10 +50,10 @@ def execute(self, data_container: DataContainer, force_recompute: bool) -> DataC Returns: A DataContainer containing the selected features as a vaex DataFrame. """ - dataframe = data_container.data[self.inputs[0]] + dataframe = data_container.data[self._inputs[0]] if not isinstance(dataframe, DataFrame): raise ValueError - df = self._feature_selector.select_features(dataframe, force_recompute) - data_container.data[self.outputs[0]] = df + df = self._feature_selector.select_features(dataframe, self._cache_group, force_recompute) + data_container.data[self._outputs[0]] = df return data_container diff --git a/mleko/pipeline/steps/ingest_step.py b/mleko/pipeline/steps/ingest_step.py index 278f4e20..1514e7a5 100644 --- a/mleko/pipeline/steps/ingest_step.py +++ b/mleko/pipeline/steps/ingest_step.py @@ -35,7 +35,7 @@ def __init__( inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. """ - super().__init__(inputs, outputs) + super().__init__(inputs, outputs, None) self._ingester = ingester def execute(self, data_container: DataContainer, force_recompute: bool) -> DataContainer: @@ -49,5 +49,5 @@ def execute(self, data_container: DataContainer, force_recompute: bool) -> DataC DataContainer: A DataContainer containing a list of fetched files. """ files = self._ingester.fetch_data(force_recompute) - data_container.data[self.outputs[0]] = files + data_container.data[self._outputs[0]] = files return data_container diff --git a/mleko/pipeline/steps/split_step.py b/mleko/pipeline/steps/split_step.py index 06ea0f3b..60dc061e 100644 --- a/mleko/pipeline/steps/split_step.py +++ b/mleko/pipeline/steps/split_step.py @@ -28,6 +28,7 @@ def __init__( splitter: BaseSplitter, inputs: list[str] | tuple[str, ...] | tuple[()] = (), outputs: list[str] | tuple[str, ...] | tuple[()] = (), + cache_group: str | None = None, ) -> None: """Initialize the SplitStep with the specified data splitter. @@ -35,8 +36,9 @@ def __init__( splitter: The DataSplitter responsible for handling data splitting. inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. + cache_group: The cache group to use. """ - super().__init__(inputs, outputs) + super().__init__(inputs, outputs, cache_group) self._splitter = splitter def execute(self, data_container: DataContainer, force_recompute: bool) -> DataContainer: @@ -52,11 +54,11 @@ def execute(self, data_container: DataContainer, force_recompute: bool) -> DataC Returns: A DataContainer containing the split data as two vaex DataFrames. """ - dataframe = data_container.data[self.inputs[0]] + dataframe = data_container.data[self._inputs[0]] if not isinstance(dataframe, DataFrame): raise ValueError - df1, df2 = self._splitter.split(dataframe, force_recompute) - data_container.data[self.outputs[0]] = df1 - data_container.data[self.outputs[1]] = df2 + df1, df2 = self._splitter.split(dataframe, self._cache_group, force_recompute) + data_container.data[self._outputs[0]] = df1 + data_container.data[self._outputs[1]] = df2 return data_container diff --git a/mleko/pipeline/steps/transform_step.py b/mleko/pipeline/steps/transform_step.py index 09c371dd..56708f99 100644 --- a/mleko/pipeline/steps/transform_step.py +++ b/mleko/pipeline/steps/transform_step.py @@ -24,6 +24,7 @@ def __init__( transformer: BaseTransformer, inputs: list[str] | tuple[str, ...] | tuple[()] = (), outputs: list[str] | tuple[str, ...] | tuple[()] = (), + cache_group: str | None = None, ) -> None: """Initialize the TransformStep with the specified transformer. @@ -31,8 +32,9 @@ def __init__( transformer: The Transformer responsible for handling feature transformation. inputs: List or tuple of input keys expected by this step. outputs: List or tuple of output keys produced by this step. + cache_group: The cache group to use. """ - super().__init__(inputs, outputs) + super().__init__(inputs, outputs, cache_group) self._transformer = transformer def execute(self, data_container: DataContainer, force_recompute: bool) -> DataContainer: @@ -48,10 +50,10 @@ def execute(self, data_container: DataContainer, force_recompute: bool) -> DataC Returns: A DataContainer containing the selected features as a vaex DataFrame. """ - dataframe = data_container.data[self.inputs[0]] + dataframe = data_container.data[self._inputs[0]] if not isinstance(dataframe, DataFrame): raise ValueError - df = self._transformer.transform(dataframe, force_recompute) - data_container.data[self.outputs[0]] = df + df = self._transformer.transform(dataframe, self._cache_group, force_recompute) + data_container.data[self._outputs[0]] = df return data_container diff --git a/tests/cache/test_cache_mixin.py b/tests/cache/test_cache_mixin.py index 2274754d..d2752e45 100644 --- a/tests/cache/test_cache_mixin.py +++ b/tests/cache/test_cache_mixin.py @@ -8,6 +8,8 @@ from typing import Hashable from unittest.mock import patch +import pytest + from mleko.cache.cache_mixin import CacheMixin, get_frame_qualname @@ -44,17 +46,17 @@ def __init__(self, cache_directory, cache_file_suffix): """Initialize cache.""" super().__init__(cache_directory, cache_file_suffix) - def my_method_1(self, a, b, force_recompute=False): + def my_method_1(self, a, b, cache_group=None, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: a + b, [a, b], force_recompute) + return self._cached_execute(lambda: a + b, [a, b], cache_group, force_recompute) - def my_method_2(self, a, b, force_recompute=False): + def my_method_2(self, a, b, cache_group=None, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: a * b, [a, b], force_recompute) + return self._cached_execute(lambda: a * b, [a, b], cache_group, force_recompute) - def my_method_3(self, list_vals, force_recompute=False): + def my_method_3(self, list_vals, cache_group=None, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: list_vals, [list_vals], force_recompute) + return self._cached_execute(lambda: list_vals, [list_vals], cache_group, force_recompute) def test_cached_execute(self, temporary_directory: Path): """Should save to cache as expected.""" @@ -67,14 +69,26 @@ def test_cache_key_computation(self, temporary_directory: Path): """Should compute MD5 based cache keys correctly.""" my_test_instance = self.MyTestClass(temporary_directory, "cache") - dummy_frame_qualname = "module.Class.method" + dummy_class_method_name = "Class.method" + cache_group = "cache_group" cache_keys: list[Hashable] = [1, 2] - data = pickle.dumps((dummy_frame_qualname, cache_keys)) - expected_key = "Class.method." + hashlib.md5(data).hexdigest() + data = pickle.dumps(cache_keys) + expected_key = f"{dummy_class_method_name}.{cache_group}.{hashlib.md5(data).hexdigest()}" - key = my_test_instance._compute_cache_key(cache_keys, dummy_frame_qualname) + key = my_test_instance._compute_cache_key(cache_keys, dummy_class_method_name, cache_group) assert key == expected_key + def test_cache_key_overflow(self, temporary_directory: Path): + """Should raise ValueError if cache key is too long.""" + my_test_instance = self.MyTestClass(temporary_directory, "cache") + + dummy_class_method_name = "Class.method" + cache_group = "".join(["a" for _ in range(255)]) + cache_keys: list[Hashable] = [1, 2] + + with pytest.raises(ValueError): + my_test_instance._compute_cache_key(cache_keys, dummy_class_method_name, cache_group) + def test_different_functions_same_arguments(self, temporary_directory: Path): """Should correctly cache different functions with same arguments.""" my_test_instance = self.MyTestClass(temporary_directory, "cache") diff --git a/tests/cache/test_lru_cache_mixin.py b/tests/cache/test_lru_cache_mixin.py index f74bef16..07cde19d 100644 --- a/tests/cache/test_lru_cache_mixin.py +++ b/tests/cache/test_lru_cache_mixin.py @@ -19,9 +19,9 @@ def __init__(self, cache_directory, cache_file_suffix, max_entries): """Initialize cache.""" super().__init__(cache_directory, cache_file_suffix, max_entries) - def my_method(self, a, force_recompute=False): + def my_method(self, a, cache_group=None, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: a, [a], force_recompute) + return self._cached_execute(lambda: a, [a], cache_group, force_recompute) class MyTestClass2(LRUCacheMixin): """Cached test class.""" @@ -30,9 +30,9 @@ def __init__(self, cache_directory, cache_file_suffix, max_entries): """Initialize cache.""" super().__init__(cache_directory, cache_file_suffix, max_entries) - def my_method(self, a, force_recompute=False): + def my_method(self, a, cache_group=None, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: a, [a], force_recompute) + return self._cached_execute(lambda: a, [a], cache_group, force_recompute) def test_eviction(self, temporary_directory: Path): """Should evict the least recently used cache entries correctly.""" @@ -78,7 +78,8 @@ def test_clean_cache_on_load(self, temporary_directory: Path): cache_file_keys = list(temporary_directory.glob(f"{cache_file_prefix_name}*.{cache_suffix}")) cache_file_endings = [int(cache_key.stem[-1]) for cache_key in cache_file_keys] - assert len(lru_cached_class._cache) == n_cache_entries + assert len(lru_cached_class._cache) == 1 + assert len(lru_cached_class._cache["test"].keys()) == n_cache_entries assert all([cache_key_ending > n_cache_entries for cache_key_ending in cache_file_endings]) def test_two_classes_same_cache(self, temporary_directory: Path): diff --git a/tests/pipeline/steps/test_convert_step.py b/tests/pipeline/steps/test_convert_step.py index 73c47145..9721d9f8 100644 --- a/tests/pipeline/steps/test_convert_step.py +++ b/tests/pipeline/steps/test_convert_step.py @@ -31,13 +31,15 @@ def test_execute(self): df = vaex.from_dict({"col1": [1, 2, 3], "col2": [4, 5, 6]}) converter.convert = MagicMock(return_value=df) - convert_step = ConvertStep(converter=converter, inputs=["raw_data"], outputs=["converted_data"]) + convert_step = ConvertStep( + converter=converter, inputs=["raw_data"], outputs=["converted_data"], cache_group=None + ) result = convert_step.execute(data_container, force_recompute=False) assert isinstance(result, DataContainer) assert result.data["converted_data"] == df - converter.convert.assert_called_once_with(file_paths, False) + converter.convert.assert_called_once_with(file_paths, None, False) def test_wrong_data_type(self): """Should throw ValueError if not recieving list[Path].""" diff --git a/tests/pipeline/steps/test_feature_select_step.py b/tests/pipeline/steps/test_feature_select_step.py index f59eb4ce..0db2e71a 100644 --- a/tests/pipeline/steps/test_feature_select_step.py +++ b/tests/pipeline/steps/test_feature_select_step.py @@ -32,14 +32,14 @@ def test_execute(self): feature_selector.select_features = MagicMock(return_value=df) feature_select_step = FeatureSelectStep( - feature_selector=feature_selector, inputs=["df_clean"], outputs=["df_clean_selected"] + feature_selector=feature_selector, inputs=["df_clean"], outputs=["df_clean_selected"], cache_group=None ) result = feature_select_step.execute(data_container, force_recompute=False) assert isinstance(result, DataContainer) assert result.data["df_clean_selected"] == df - feature_selector.select_features.assert_called_once_with(data_container.data["df_clean"], False) + feature_selector.select_features.assert_called_once_with(data_container.data["df_clean"], None, False) def test_wrong_data_type(self): """Should throw ValueError if not recieving a vaex dataframe.""" diff --git a/tests/pipeline/steps/test_split_step.py b/tests/pipeline/steps/test_split_step.py index 66fb9159..fce5e859 100644 --- a/tests/pipeline/steps/test_split_step.py +++ b/tests/pipeline/steps/test_split_step.py @@ -29,14 +29,16 @@ def test_execute(self): df_train, df_test = vaex.from_dict({"col1": [1, 2], "col2": [4, 5]}), vaex.from_dict({"col1": [3], "col2": [6]}) splitter.split = MagicMock(return_value=(df_train, df_test)) - split_step = SplitStep(splitter=splitter, inputs=["df_clean"], outputs=["df_train", "df_test"]) + split_step = SplitStep( + splitter=splitter, inputs=["df_clean"], outputs=["df_train", "df_test"], cache_group=None + ) result = split_step.execute(data_container, force_recompute=False) assert isinstance(result, DataContainer) assert result.data["df_train"] == df_train assert result.data["df_test"] == df_test - splitter.split.assert_called_once_with(data_container.data["df_clean"], False) + splitter.split.assert_called_once_with(data_container.data["df_clean"], None, False) def test_wrong_data_type(self): """Should throw ValueError if not recieving a vaex dataframe.""" diff --git a/tests/pipeline/steps/test_transform_step.py b/tests/pipeline/steps/test_transform_step.py index 484529be..5bcf43fd 100644 --- a/tests/pipeline/steps/test_transform_step.py +++ b/tests/pipeline/steps/test_transform_step.py @@ -29,13 +29,15 @@ def test_execute(self): df = vaex.from_dict({"col2": [4, 5, 6]}) transformer.transform = MagicMock(return_value=df) - feature_select_step = TransformStep(transformer=transformer, inputs=["df_clean"], outputs=["df_clean_selected"]) + feature_select_step = TransformStep( + transformer=transformer, inputs=["df_clean"], outputs=["df_clean_selected"], cache_group=None + ) result = feature_select_step.execute(data_container, force_recompute=False) assert isinstance(result, DataContainer) assert result.data["df_clean_selected"] == df - transformer.transform.assert_called_once_with(data_container.data["df_clean"], False) + transformer.transform.assert_called_once_with(data_container.data["df_clean"], None, False) def test_wrong_data_type(self): """Should throw ValueError if not recieving a vaex dataframe.""" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index ad5d85fa..4369a60c 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -42,8 +42,8 @@ def test_init(self): def test_init_with_steps(self): """Should successfully initialize the pipeline with one or more PipelineStep instances.""" - step1 = self.InputStep(outputs=["raw_data"]) - step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"]) + step1 = self.InputStep(inputs=[], outputs=["raw_data"], cache_group=None) + step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"], cache_group=None) pipeline = Pipeline(steps=[step1, step2]) assert len(pipeline._steps) == 2 @@ -52,8 +52,8 @@ def test_init_with_steps(self): def test_repr(self): """Should represent Pipeline using representation of steps.""" - step1 = self.InputStep(outputs=["raw_data"]) - step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"]) + step1 = self.InputStep(inputs=[], outputs=["raw_data"], cache_group=None) + step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"], cache_group=None) pipeline = Pipeline(steps=[step1, step2]) expected = f"Pipeline:\n 1. {step1!r}\n 2. {step2!r}" @@ -61,8 +61,8 @@ def test_repr(self): def test_add_step(self): """Should successfully add new PipelineStep.""" - step1 = self.InputStep(outputs=["raw_data"]) - step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"]) + step1 = self.InputStep(inputs=[], outputs=["raw_data"], cache_group=None) + step2 = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"], cache_group=None) pipeline = Pipeline(steps=[step1]) pipeline.add_step(step2) @@ -71,8 +71,8 @@ def test_add_step(self): def test_run(self): """Should run multiple PipelineSteps.""" - input_step = self.InputStep(outputs=["raw_data"]) - increment_step = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"]) + input_step = self.InputStep(inputs=[], outputs=["raw_data"], cache_group=None) + increment_step = self.AppendStep(inputs=["raw_data"], outputs=["appended_data"], cache_group=None) pipeline = Pipeline(steps=[input_step, increment_step]) result = pipeline.run() diff --git a/tests/pipeline/test_pipeline_step.py b/tests/pipeline/test_pipeline_step.py index a36fb426..71846625 100644 --- a/tests/pipeline/test_pipeline_step.py +++ b/tests/pipeline/test_pipeline_step.py @@ -20,39 +20,39 @@ class DummyPipelineStep(PipelineStep): def execute(self, data_container: DataContainer): """Execute the dummy step.""" - file_paths = data_container.data[self.inputs[0]] + file_paths = data_container.data[self._inputs[0]] if not isinstance(file_paths, list) or not all(isinstance(e, Path) for e in file_paths): raise ValueError - data_container.data[self.outputs[0]] = "0" # type: ignore - data_container.data[self.outputs[1]] = "1" # type: ignore - data_container.data[self.outputs[2]] = "2" # type: ignore + data_container.data[self._outputs[0]] = "0" # type: ignore + data_container.data[self._outputs[1]] = "1" # type: ignore + data_container.data[self._outputs[2]] = "2" # type: ignore return data_container def test_init(self): """Should init a concrete PipelineStep subclass.""" - concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"]) + concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"], cache_group=None) assert isinstance(concrete_step, PipelineStep) def test_execute(self): """Should successfully implementation and execution of the `execute` method in a PipelineStep subclass.""" dummy_data = [Path()] input_data_container = DataContainer(data={"input": dummy_data}) - concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"]) + concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"], cache_group=None) output_data_container = concrete_step.execute(input_data_container) assert output_data_container.data == input_data_container.data def test_execute_with_invalid_input(self): """Should raise a ValueError if the input data is invalid.""" input_data_container = DataContainer(data={"input": "invalid"}) # type: ignore - concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"]) + concrete_step = self.DummyPipelineStep(inputs=["input"], outputs=["first", "second", "third"], cache_group=None) with pytest.raises(ValueError): concrete_step.execute(input_data_container) def test_wrong_number_inputs_outputs(self): """Should throw ValueError inputs or outputs number is incorrect.""" with pytest.raises(ValueError): - self.DummyPipelineStep(inputs=["input1", "input2"], outputs=["first", "second", "third"]) + self.DummyPipelineStep(inputs=["input1", "input2"], outputs=["first", "second", "third"], cache_group=None) with pytest.raises(ValueError): - self.DummyPipelineStep(inputs=["input"], outputs=["first", "second"]) + self.DummyPipelineStep(inputs=["input"], outputs=["first", "second"], cache_group=None) From ea63943a5a938443037a07e665b6eafa3dffa282 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20B=C3=A5venstrand?= Date: Mon, 26 Jun 2023 23:17:54 +0200 Subject: [PATCH 2/2] revert: Fix typeguard in test case. --- tests/cache/format/test_vaex_arrow_cache_format_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cache/format/test_vaex_arrow_cache_format_mixin.py b/tests/cache/format/test_vaex_arrow_cache_format_mixin.py index fffb2acb..31dc8386 100644 --- a/tests/cache/format/test_vaex_arrow_cache_format_mixin.py +++ b/tests/cache/format/test_vaex_arrow_cache_format_mixin.py @@ -21,7 +21,7 @@ def __init__(self, cache_directory, max_entries): def my_method(self, a, force_recompute=False): """Cached execute.""" - return self._cached_execute(lambda: a, [a.fingerprint()], force_recompute) + return self._cached_execute(lambda: a, [a.fingerprint()], None, force_recompute) def test_vaex_dataframe_arrow_mixin(self, temporary_directory: Path): """Should save to cache as expected."""