Skip to content

Commit

Permalink
fix(data cleaning): 🐛 Fix date32/64[day] not converted to datetime.
Browse files Browse the repository at this point in the history
All date32/64[day] columns will now be converted to datetime64[s] for full support by vaex date operations.
  • Loading branch information
ErikBavenstrand committed Jun 30, 2023
1 parent 9078845 commit 98f4b26
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"config",
"cache",
"pipeline",
"data cleaning",
"data splitting",
"feature selection",
"semantic versioning",
"transformer"
]
}
}
6 changes: 6 additions & 0 deletions mleko/dataset/convert/csv_to_vaex_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from itertools import repeat
from pathlib import Path

import pyarrow as pa
import vaex
from pyarrow import csv as arrow_csv
from tqdm.auto import tqdm
Expand All @@ -14,6 +15,7 @@
from mleko.utils.custom_logger import CustomLogger
from mleko.utils.decorators import auto_repr
from mleko.utils.file_helpers import clear_directory
from mleko.utils.vaex_helpers import get_column

from .base_converter import BaseConverter

Expand Down Expand Up @@ -222,6 +224,10 @@ def _convert_csv_file_to_arrow(
),
).drop(drop_columns)

for column_name in df_chunk.get_column_names():
if get_column(df_chunk, column_name).dtype in (pa.date32(), pa.date64()):
df_chunk[column_name] = get_column(df_chunk, column_name).astype("datetime64[s]")

output_path = output_directory / f"df_chunk_{file_path.stem}.{dataframe_suffix}"
df_chunk.export(output_path, chunk_size=100_000, parallel=False)
df_chunk.close()
Expand Down
2 changes: 1 addition & 1 deletion mleko/dataset/split/expression_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def split(
Returns:
A tuple containing the split dataframes.
"""
return self._cached_execute( # type: ignore
return self._cached_execute(
lambda_func=lambda: self._split(dataframe),
cache_keys=[
self._expression,
Expand Down
13 changes: 7 additions & 6 deletions mleko/dataset/split/random_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def split(
Returns:
A tuple containing the split dataframes.
"""
return self._cached_execute( # type: ignore
return self._cached_execute(
lambda_func=lambda: self._split(dataframe),
cache_keys=[
self._idx2_size,
Expand All @@ -118,9 +118,10 @@ def _split(self, dataframe: vaex.DataFrame) -> tuple[vaex.DataFrame, vaex.DataFr
A tuple containing the split dataframes.
"""
index_name = "index"
dataframe[index_name] = vaex.vrange(0, dataframe.shape[0])
index = get_column(dataframe, index_name)
target = get_column(dataframe, self._stratify).to_numpy() if self._stratify else None
df = dataframe.copy()
df[index_name] = vaex.vrange(0, df.shape[0])
index = get_column(df, index_name)
target = get_column(df, self._stratify).to_numpy() if self._stratify else None

if self._shuffle:
logger.info("Shuffling data before splitting.")
Expand All @@ -137,8 +138,8 @@ def _split(self, dataframe: vaex.DataFrame) -> tuple[vaex.DataFrame, vaex.DataFr
stratify=target,
)

df1 = get_filtered_df(dataframe, index.isin(idx1)).extract()
df2 = get_filtered_df(dataframe, index.isin(idx2)).extract()
df1 = get_filtered_df(df, index.isin(idx1)).extract()
df2 = get_filtered_df(df, index.isin(idx2)).extract()
logger.info(f"Split dataframe into two dataframes with shapes {df1.shape} and {df2.shape}.")
df1.delete_virtual_column(index_name)
df2.delete_virtual_column(index_name)
Expand Down
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def generate_csv_files(directory_path: Path, n_files: int, gzipped: bool = False
file_path = directory_path / f"{uuid.uuid4()}.csv"
with open(file_path, "w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["Time", "Count", "Name", "Is Best"])
writer.writerow(["2023-01-01 20:00:00", 3, "Linux", False])
writer.writerow(["2023-01-01 20:00:00", 5.4, "Windows", False])
writer.writerow(["2023-01-01 20:00:00", -1, "-9999", True])
writer.writerow(["Time", "Date", "Count", "Name", "Is Best"])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 3, "Linux", False])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", 5.4, "Windows", False])
writer.writerow(["2023-01-01 20:00:00", "2023-01-01", -1, "-9999", True])

if gzipped:
with open(file_path, "rb") as f_in, gzip.open(file_path.with_suffix(".gz"), "wb") as f_out:
Expand Down
12 changes: 6 additions & 6 deletions tests/dataset/convert/test_csv_to_vaex_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def test_convert(self, temporary_directory: Path):
dfs = [vaex.open(f) for f in arrow_files]

for df in dfs:
assert str(list(df.dtypes)) == "[datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Count", "Name", "Is Best"]
assert df.shape == (3, 4)
assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"]
assert df.shape == (3, 5)
assert df.Name.countna() == 1
df.close()

Expand All @@ -48,9 +48,9 @@ def test_cache_hit(self, temporary_directory: Path):
file_paths = generate_csv_files(temporary_directory, n_files)
df = csv_to_arrow_converter.convert(file_paths, force_recompute=False)

assert str(list(df.dtypes)) == "[datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Count", "Name", "Is Best"]
assert df.shape == (n_files * 3, 4)
assert str(list(df.dtypes)) == "[datetime64[s], datetime64[s], float64, string, bool]"
assert df.column_names == ["Time", "Date", "Count", "Name", "Is Best"]
assert df.shape == (n_files * 3, 5)
assert df.Name.countna() == n_files
assert len(glob.glob(str(temporary_directory / "df_chunk_*.arrow"))) == 0
assert len(glob.glob(str(temporary_directory / "*.arrow"))) == 1
Expand Down
13 changes: 13 additions & 0 deletions tests/dataset/splitters/test_expression_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from pathlib import Path
from unittest.mock import patch

import pytest
import vaex
Expand Down Expand Up @@ -68,3 +69,15 @@ def test_split_by_date(self, temporary_directory: Path, example_vaex_dataframe:
assert df_test.shape == (5, 4)
assert df_test.column_names == ["a", "b", "target", "date"]
assert df_test["target"].tolist() == [1, 1, 1, 1, 0] # type: ignore

def test_split_cache(self, temporary_directory: Path, example_vaex_dataframe: vaex.DataFrame):
"""Should test the cache of the expression splitter."""
test_expression_data_splitter = ExpressionSplitter(
temporary_directory, expression='date < scalar_datetime("2020-06-01 00:00:00")'
)

test_expression_data_splitter.split(example_vaex_dataframe)

with patch.object(ExpressionSplitter, "_split") as mocked_split:
test_expression_data_splitter.split(example_vaex_dataframe)
mocked_split.assert_not_called()
13 changes: 13 additions & 0 deletions tests/dataset/splitters/test_random_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from pathlib import Path
from unittest.mock import patch

import pytest
import vaex
Expand Down Expand Up @@ -62,3 +63,15 @@ def test_split(self, temporary_directory: Path, example_vaex_dataframe: vaex.Dat
assert df_test.shape == (5, 4)
assert df_test.column_names == ["a", "b", "target", "date"]
assert df_test["target"].tolist() == [1, 1, 1, 1, 0] # type: ignore

def test_split_cache(self, temporary_directory: Path, example_vaex_dataframe: vaex.DataFrame):
"""Should test the cache of the random splitter."""
test_random_splitter = RandomSplitter(
temporary_directory, data_split=(0.5, 0.5), shuffle=False, random_state=1337
)

test_random_splitter.split(example_vaex_dataframe)

with patch.object(RandomSplitter, "_split") as mocked_split:
test_random_splitter.split(example_vaex_dataframe)
mocked_split.assert_not_called()

0 comments on commit 98f4b26

Please sign in to comment.