Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: added parquet sampling #1070

Merged
merged 11 commits into from
Dec 12, 2023
134 changes: 107 additions & 27 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import random
import re
import urllib
from collections import OrderedDict
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,130 @@ def read_csv_df(
return data


def read_parquet_df(
def convert_unicode_col_to_utf8(input_df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all unicode columns in input dataframe to utf-8.

:param input_df: input dataframe
:type input_df: pd.DataFrame
:return: corrected dataframe
:rtype: pd.DataFrame
"""
# Convert all the unicode columns to utf-8
input_column_types = input_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)

mixed_and_unicode_cols = input_column_types[
input_column_types == "unicode"
].index.union(input_column_types[input_column_types == "mixed"].index)

for iter_column in mixed_and_unicode_cols:
# Encode sting to bytes
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)

# Decode bytes back to string
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)

return input_df


def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample n row from it and return a dataframe.
menglinw marked this conversation as resolved.
Show resolved Hide resolved

:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
if selected_columns:
parquet_table = pq.read_table(file_path, columns=selected_columns)
else:
parquet_table = pq.read_table(file_path)

data_row_df = parquet_file.read_row_group(i).to_pandas()
# sample
n_rows = parquet_table.num_rows
if n_rows > sample_nrows:
sample_index = np.array([False] * n_rows)
sample_index[random.sample(range(n_rows), sample_nrows)] = True
else:
sample_index = np.array([True] * n_rows)
sample_df = parquet_table.filter(sample_index).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
# Convert all the unicode columns to utf-8
sample_df = convert_unicode_col_to_utf8(sample_df)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)
original_df_dtypes = sample_df.dtypes
if read_in_string:
sample_df = sample_df.astype(str)

for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
return sample_df, original_df_dtypes

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])
def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

return data, original_df_dtypes
# Convert all the unicode columns to utf-8
data_row_df = convert_unicode_col_to_utf8(data_row_df)
menglinw marked this conversation as resolved.
Show resolved Hide resolved

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

taylorfturner marked this conversation as resolved.
Show resolved Hide resolved
@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
56 changes: 54 additions & 2 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import unittest
from io import BytesIO

import pandas as pd

from dataprofiler.data_readers.data import Data
from dataprofiler.data_readers.parquet_data import ParquetData

Expand Down Expand Up @@ -102,6 +104,11 @@ def test_specifying_data_type(self):
input_data_obj = Data(input_file["path"], data_type="parquet")
self.assertEqual(input_data_obj.data_type, "parquet")

input_data_obj_sampled = Data(
input_file["path"], data_type="parquet", options={"sample_nrows": 100}
)
self.assertEqual(input_data_obj_sampled.data_type, "parquet")

menglinw marked this conversation as resolved.
Show resolved Hide resolved
def test_reload_data(self):
"""
Determine if the parquet file can be reloaded
Expand All @@ -112,6 +119,30 @@ def test_reload_data(self):
self.assertEqual(input_data_obj.data_type, "parquet")
self.assertEqual(input_file["path"], input_data_obj.input_file_path)

input_data_obj_sampled_before = Data(
input_file["path"], options={"sample_nrows": 100}
)
self.assertEqual(input_data_obj_sampled_before.data_type, "parquet")
self.assertEqual(
input_file["path"], input_data_obj_sampled_before.input_file_path
)
input_data_obj_sampled_after = input_data_obj_sampled_before
input_data_obj_sampled_after.reload(
input_file["path"], options={"sample_nrows": 100}
)
self.assertEqual(input_data_obj_sampled_after.data_type, "parquet")
self.assertEqual(
input_file["path"], input_data_obj_sampled_after.input_file_path
)
self.assertEqual(
input_data_obj_sampled_before.data_type,
input_data_obj_sampled_after.data_type,
)
self.assertEqual(
input_data_obj_sampled_before.input_file_path,
input_data_obj_sampled_after.input_file_path,
)

menglinw marked this conversation as resolved.
Show resolved Hide resolved
def test_data_formats(self):
"""
Determine if the parquet file data_formats can be used
Expand All @@ -123,13 +154,24 @@ def test_data_formats(self):
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd

self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

input_data_obj_sampled = Data(
input_file["path"], options={"sample_nrows": 100}
)
for data_format in list(input_data_obj_sampled._data_formats.keys()):
input_data_obj_sampled.data_format = data_format
self.assertEqual(input_data_obj_sampled.data_format, data_format)
data_sampled = input_data_obj_sampled.data
if data_format == "dataframe":
self.assertIsInstance(data_sampled, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data_sampled, list)
self.assertIsInstance(data_sampled[0], str)

Comment on lines 127 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads a little verbose.... wondering if needed since its adding an option to the test...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is necessary. The new parquet sampling feature is using a different parquet reading method and then format, so I think it might be safer to check if the sampled data still meet our format requirements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind too much if tests are verbose, as long as their readable. Better to be verbose than miss edge cases imo.

def test_mixed_string_col(self):
"""
Determine if parquet can handle mixed string column types.
Expand Down Expand Up @@ -181,6 +223,16 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

data_sampled = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data_sampled), msg=input_file["path"]
)
self.assertEqual(
min(100, input_file["count"]),
data_sampled.length,
msg=input_file["path"],
)
menglinw marked this conversation as resolved.
Show resolved Hide resolved

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down
Loading