Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging/main/0.10.8 #1081

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/data_readers/avro_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _load_data_from_file(self, input_file_path: str) -> List:
# even when the option encoding='utf-8' is added. It may come from
# some special compression codec, e.g., snappy. Then, binary mode
# reading is currently used to get the dict-formatted lines.
df_reader: fastavro.reader = fastavro.reader(input_file)
df_reader = fastavro.reader(input_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was actually throwing a mypy error IIRC @menglinw

lines: List = list()
for line in df_reader:
lines.append(line)
Expand Down
134 changes: 107 additions & 27 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import random
import re
import urllib
from collections import OrderedDict
Expand All @@ -24,6 +25,7 @@
import boto3
import botocore
import dateutil
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import requests
Expand Down Expand Up @@ -439,52 +441,130 @@ def read_csv_df(
return data


def read_parquet_df(
def convert_unicode_col_to_utf8(input_df: pd.DataFrame) -> pd.DataFrame:
"""
Convert all unicode columns in input dataframe to utf-8.

:param input_df: input dataframe
:type input_df: pd.DataFrame
:return: corrected dataframe
:rtype: pd.DataFrame
"""
# Convert all the unicode columns to utf-8
input_column_types = input_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)

mixed_and_unicode_cols = input_column_types[
input_column_types == "unicode"
].index.union(input_column_types[input_column_types == "mixed"].index)

for iter_column in mixed_and_unicode_cols:
# Encode sting to bytes
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)

# Decode bytes back to string
input_df[iter_column] = input_df[iter_column].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)

return input_df


def sample_parquet(
file_path: str,
sample_nrows: int,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Read parquet file, sample specified number of rows from it and return a data frame.

:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):
# read parquet file into table
if selected_columns:
parquet_table = pq.read_table(file_path, columns=selected_columns)
else:
parquet_table = pq.read_table(file_path)

data_row_df = parquet_file.read_row_group(i).to_pandas()
# sample
n_rows = parquet_table.num_rows
if n_rows > sample_nrows:
sample_index = np.array([False] * n_rows)
sample_index[random.sample(range(n_rows), sample_nrows)] = True
else:
sample_index = np.array([True] * n_rows)
sample_df = parquet_table.filter(sample_index).to_pandas()

# Convert all the unicode columns to utf-8
types = data_row_df.apply(
lambda x: pd.api.types.infer_dtype(x.values, skipna=True)
)
# Convert all the unicode columns to utf-8
sample_df = convert_unicode_col_to_utf8(sample_df)

mixed_and_unicode_cols = types[types == "unicode"].index.union(
types[types == "mixed"].index
)
original_df_dtypes = sample_df.dtypes
if read_in_string:
sample_df = sample_df.astype(str)

for col in mixed_and_unicode_cols:
data_row_df[col] = data_row_df[col].apply(
lambda x: x.encode("utf-8").strip() if isinstance(x, str) else x
)
data_row_df[col] = data_row_df[col].apply(
lambda x: x.decode("utf-8").strip() if isinstance(x, bytes) else x
)
return sample_df, original_df_dtypes

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])
def read_parquet_df(
file_path: str,
sample_nrows: Optional[int] = None,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
:param file_path: path to the Parquet file.
:type file_path: str
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param selected_columns: columns need to be read
:type selected_columns: list
:param read_in_string: return as string type
:type read_in_string: bool
:return:
:rtype: Iterator(pd.DataFrame)
"""
if sample_nrows is None:
parquet_file = pq.ParquetFile(file_path)
data = pd.DataFrame()
for i in range(parquet_file.num_row_groups):

data_row_df = parquet_file.read_row_group(i).to_pandas()

return data, original_df_dtypes
# Convert all the unicode columns to utf-8
data_row_df = convert_unicode_col_to_utf8(data_row_df)

if selected_columns:
data_row_df = data_row_df[selected_columns]

data = pd.concat([data, data_row_df])

original_df_dtypes = data.dtypes
if read_in_string:
data = data.astype(str)
return data, original_df_dtypes
else:
data, original_df_dtypes = sample_parquet(
file_path,
sample_nrows,
selected_columns=selected_columns,
read_in_string=read_in_string,
)
return data, original_df_dtypes


def read_text_as_list_of_strs(
Expand Down
11 changes: 10 additions & 1 deletion dataprofiler/data_readers/parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self._data_formats["json"] = self._get_data_as_json
self._selected_data_format: str = options.get("data_format", "dataframe")
self._selected_columns: List[str] = options.get("selected_columns", list())
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand All @@ -83,6 +84,11 @@ def selected_columns(self) -> List[str]:
"""Return selected columns."""
return self._selected_columns

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand All @@ -100,7 +106,10 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
"""Return data from file."""
data, original_df_dtypes = data_utils.read_parquet_df(
input_file_path, self.selected_columns, read_in_string=True
input_file_path,
selected_columns=self.selected_columns,
read_in_string=True,
sample_nrows=self.sample_nrows,
)
self._original_df_dtypes = original_df_dtypes
return data
Expand Down
26 changes: 19 additions & 7 deletions dataprofiler/profilers/numerical_column_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pandas as pd
import scipy.stats

from . import histogram_utils, profiler_utils
from . import float_column_profile, histogram_utils, profiler_utils
from .base_column_profilers import BaseColumnProfiler
from .profiler_options import NumericalOptions

Expand Down Expand Up @@ -710,6 +710,9 @@ def _preprocess_for_calculate_psi(
entity_count_per_bin=self_histogram["bin_counts"],
bin_edges=self_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
is_float_histogram=isinstance(
self, float_column_profile.FloatColumn
),
options={
"min_edge": min_min_edge,
"max_edge": max_max_edge,
Expand All @@ -731,6 +734,9 @@ def _preprocess_for_calculate_psi(
entity_count_per_bin=other_histogram["bin_counts"],
bin_edges=other_histogram["bin_edges"],
suggested_bin_count=num_psi_bins,
is_float_histogram=isinstance(
self, float_column_profile.FloatColumn
),
options={
"min_edge": min_min_edge,
"max_edge": max_max_edge,
Expand Down Expand Up @@ -1360,7 +1366,12 @@ def _update_histogram(self, df_series: pd.Series) -> None:
self._stored_histogram["total_loss"] += histogram_loss

def _regenerate_histogram(
self, entity_count_per_bin, bin_edges, suggested_bin_count, options=None
self,
entity_count_per_bin,
bin_edges,
suggested_bin_count,
is_float_histogram,
options=None,
) -> tuple[dict[str, np.ndarray], float]:

# create proper binning
Expand All @@ -1372,6 +1383,11 @@ def _regenerate_histogram(
new_bin_edges = np.linspace(
options["min_edge"], options["max_edge"], suggested_bin_count + 1
)

# if it's not a float histogram, then assume it only contains integer values
if not is_float_histogram:
bin_edges = np.round(bin_edges)

return self._assimilate_histogram(
from_hist_entity_count_per_bin=entity_count_per_bin,
from_hist_bin_edges=bin_edges,
Expand Down Expand Up @@ -1417,11 +1433,6 @@ def _assimilate_histogram(

bin_edge = from_hist_bin_edges[bin_id : bin_id + 3]

# if we know not float, we can assume values in bins are integers.
is_float_profile = self.__class__.__name__ == "FloatColumn"
if not is_float_profile:
bin_edge = np.round(bin_edge)

# loop until we have a new bin which contains the current bin.
while (
bin_edge[0] >= dest_hist_bin_edges[new_bin_id + 1]
Expand Down Expand Up @@ -1513,6 +1524,7 @@ def _histogram_for_profile(
entity_count_per_bin=bin_counts,
bin_edges=bin_edges,
suggested_bin_count=suggested_bin_count,
is_float_histogram=isinstance(self, float_column_profile.FloatColumn),
)

def _get_best_histogram_for_profile(self) -> dict:
Expand Down
22 changes: 20 additions & 2 deletions dataprofiler/tests/data_readers/test_parquet_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import unittest
from io import BytesIO

import pandas as pd

from dataprofiler.data_readers.data import Data
from dataprofiler.data_readers.parquet_data import ParquetData

Expand Down Expand Up @@ -123,13 +125,24 @@ def test_data_formats(self):
self.assertEqual(input_data_obj.data_format, data_format)
data = input_data_obj.data
if data_format == "dataframe":
import pandas as pd

self.assertIsInstance(data, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data, list)
self.assertIsInstance(data[0], str)

input_data_obj_sampled = Data(
input_file["path"], options={"sample_nrows": 100}
)
for data_format in list(input_data_obj_sampled._data_formats.keys()):
input_data_obj_sampled.data_format = data_format
self.assertEqual(input_data_obj_sampled.data_format, data_format)
data_sampled = input_data_obj_sampled.data
if data_format == "dataframe":
self.assertIsInstance(data_sampled, pd.DataFrame)
elif data_format in ["records", "json"]:
self.assertIsInstance(data_sampled, list)
self.assertIsInstance(data_sampled[0], str)

def test_mixed_string_col(self):
"""
Determine if parquet can handle mixed string column types.
Expand Down Expand Up @@ -181,6 +194,11 @@ def test_len_data(self):
self.assertEqual(input_file["count"], len(data), msg=input_file["path"])
self.assertEqual(input_file["count"], data.length, msg=input_file["path"])

data_sampled = Data(input_file["path"], options={"sample_nrows": 100})
self.assertEqual(
min(100, input_file["count"]), len(data_sampled), msg=input_file["path"]
)

def test_file_encoding(self):
"""Tests to ensure file_encoding set to None"""
for input_file in self.file_or_buf_list:
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

MAJOR = 0
MINOR = 10
MICRO = 7
MICRO = 8
POST = None # otherwise None

VERSION = "%d.%d.%d" % (MAJOR, MINOR, MICRO)
Expand Down
2 changes: 1 addition & 1 deletion requirements-reports.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
matplotlib>=3.3.3,<3.6.1
matplotlib>=3.3.3,<3.9.0
seaborn>=0.11.1