Skip to content

Commit

Permalink
Feature polars test (#125)
Browse files Browse the repository at this point in the history
* Added more tests for polars and upgraded to v0.19

* Added more tests to polars

* day tests for polars
  • Loading branch information
canimus authored Oct 1, 2023
1 parent e6cd8cc commit e245850
Show file tree
Hide file tree
Showing 34 changed files with 1,061 additions and 50 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Provider | API | Versions
![bigquery](logos/bigquery.png?raw=true "BigQuery Client API")| `bigquery` | `3.4.1`
![pandas](logos/pandas.svg?raw=true "Pandas DataFrame API")| `pandas`| `2.0.1`, `1.5.x`, `1.4.x`
![duckdb](logos/duckdb.png?raw=true "DuckDB API")|`duckdb` | `0.7.1`, `0.8.0`
![polars](logos/polars.svg?raw=true "Polars API")|`polars`|`0.18.2`
![polars](logos/polars.svg?raw=true "Polars API")|`polars`| `0.19.6`

<sub>Logos are trademarks of their own brands.</sub>

Expand Down
6 changes: 3 additions & 3 deletions cuallee/duckdb_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,21 @@ def has_workflow(self, rule: Rule) -> str:
"""
(select sum(A.CUALLEE_RESULT) from (
select
lead($event) over (partition by $name order by $ord) as CUALLEE_EVENT,
lead($event) over (partition by $name order by $ordinal) as CUALLEE_EVENT,
LIST_VALUE($event, CUALLEE_EVENT) as CUALLEE_EDGE,
LIST_VALUE$basis as CUALLEE_GRAPH,
CAST(array_has(CUALLEE_GRAPH, CUALLEE_EDGE) AS INTEGER) as CUALLEE_RESULT
from '$table'
) as A)
""".strip()
)
name, event, ord = rule.column
name, event, ordinal = rule.column
basis = str(tuple(map(list, rule.value))).replace("None", "NULL")
return template.substitute(
{
"name": name,
"event": event,
"ord": ord,
"ordinal": ordinal,
"basis": basis,
"table": self.table_name,
}
Expand Down
80 changes: 40 additions & 40 deletions cuallee/polars_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def _result(series: pl.Series) -> int:
"""It retrieves the sum result of the polar predicate"""
return compose(operator.itemgetter(0))(series)

@staticmethod
def _value(dataframe: pl.DataFrame):
return compose(first, first, list, operator.methodcaller("values"), operator.methodcaller("to_dict", as_series=False))(dataframe)

def is_complete(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Validate not null"""
return Compute._result(
Expand Down Expand Up @@ -142,7 +146,7 @@ def is_between(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return Compute._result(
dataframe.select(
pl.col(rule.column)
.is_between(low, high, include_bounds=True)
.is_between(low, high, closed="both")
.cast(pl.Int8)
).sum()
)
Expand All @@ -165,7 +169,7 @@ def has_percentile(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int

def has_max_by(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Adjacent column maximum value verifiation on threshold"""
base, target = rule.column
target, base = rule.column
return Compute._result(
dataframe.filter(pl.col(base) == pl.col(base).max())
.select(pl.col(target) == rule.value)
Expand All @@ -185,16 +189,22 @@ def has_correlation(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, in
col_a, col_b = rule.column
return Compute._result(
dataframe.select(pl.col(col_a), pl.col(col_b))
.pearson_corr()
.fill_nan(0)
.fill_null(0)
.select(pl.col(col_b))
.head(1)
.corr()
.select(pl.col(col_b) == rule.value)
.select(pl.all(col_b))
.to_series()
)

def satisfies(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.eval(rule.value).astype(int).sum()
ctx = pl.SQLContext(cuallee=dataframe)
return ctx.execute(
'''
SELECT ({}) as total
FROM cuallee
'''.format(rule.value),
eager=True,
).cast(pl.Int8).sum()


def has_entropy(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
def entropy(labels):
Expand All @@ -213,77 +223,67 @@ def entropy(labels):

return -np.sum(probs * np.log(probs)) / np.log(n_classes)

return entropy(dataframe.loc[:, rule.column].values) == float(rule.value)
return entropy(dataframe.select(pl.col(rule.column)).to_series()) == float(rule.value)

def is_on_weekday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.loc[:, rule.column].dt.dayofweek.between(0, 4).astype(int).sum()
dataframe.select(pl.col(rule.column).dt.weekday().is_between(1, 5).cast(pl.Int8)).sum()
)

def is_on_weekend(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.loc[:, rule.column].dt.dayofweek.between(5, 6).astype(int).sum()
dataframe.select(pl.col(rule.column).dt.weekday().is_between(6, 7).cast(pl.Int8)).sum()
)

def is_on_monday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(0).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(1).cast(pl.Int8)).sum()

def is_on_tuesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(1).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(2).cast(pl.Int8)).sum()

def is_on_wednesday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(2).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(3).cast(pl.Int8)).sum()

def is_on_thursday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(3).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(4).cast(pl.Int8)).sum()

def is_on_friday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(4).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(5).cast(pl.Int8)).sum()

def is_on_saturday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(5).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(6).cast(pl.Int8)).sum()

def is_on_sunday(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return dataframe.loc[:, rule.column].dt.dayofweek.eq(6).astype(int).sum()
return dataframe.select(pl.col(rule.column).dt.weekday().eq(7).cast(pl.Int8)).sum()

def is_on_schedule(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
return (
dataframe.loc[:, rule.column].dt.hour.between(*rule.value).astype(int).sum()
dataframe.select(pl.col(rule.column).dt.hour().is_between(*rule.value).cast(pl.Int8)).sum()
)

def is_daily(self, rule: Rule, dataframe: pl.DataFrame) -> complex:
if rule.value is None:
day_mask = [0, 1, 2, 3, 4]
day_mask = [1, 2, 3, 4, 5]
else:
day_mask = rule.value

lower, upper = (
dataframe.loc[:, rule.column].agg([np.min, np.max]).dt.strftime("%Y-%m-%d")
)
sequence = (
pd.date_range(start=lower, end=upper, freq="D").rename("ts").to_frame()
)
sequence = (
sequence[sequence.ts.dt.dayofweek.isin(day_mask)]
.reset_index(drop=True)
.ts.unique()
.astype(np.datetime64)
)

delivery = (
dataframe[dataframe[rule.column].dt.dayofweek.isin(day_mask)][rule.column]
.dt.date.astype(np.datetime64)
.values
)

lower = self._value(dataframe.select(pl.col(rule.column)).min())
upper = self._value(dataframe.select(pl.col(rule.column)).max())
sequence = pl.DataFrame({"ts" : pl.date_range(start=lower, end=upper, interval="1d", eager=True)})
sequence = sequence.filter(pl.col("ts").dt.weekday().is_in(day_mask)).to_series().to_list()
delivery = dataframe.filter(pl.col(rule.column).dt.weekday().is_in(day_mask)).to_series().to_list()

# No difference between sequence of daily as a complex number
return complex(len(dataframe), len(set(sequence).difference(delivery)))

def is_inside_interquartile_range(
self, rule: Rule, dataframe: pl.DataFrame
) -> Union[bool, complex]:
lower, upper = dataframe[rule.column].quantile(rule.value).values
return dataframe[rule.column].between(lower, upper).astype(int).sum()
min_q, max_q = rule.value
lower = self._value(dataframe.select(pl.col(rule.column).quantile(min_q, interpolation="linear")))
upper = self._value(dataframe.select(pl.col(rule.column).quantile(max_q, interpolation="linear")))
return dataframe.select(pl.col(rule.column).is_between(lower, upper)).cast(pl.Int8).sum()

def has_workflow(self, rule: Rule, dataframe: pl.DataFrame) -> Union[bool, int]:
"""Compliance with adjacency matrix"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ duckdb = [
"duckdb==0.8.1"
]
polars = [
"polars>=0.15.7"
"polars>=0.19.6"
]
iso = [
"lxml >= 4.9.1"
Expand Down
30 changes: 25 additions & 5 deletions test/unit/polars_dataframe/test_has_correlation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
import pytest

from cuallee import Check
import polars as pl
import pytest
import numpy as np


def test_positive(check):
def test_positive(check: Check):
check.has_correlation("id", "id2", 1.0)
df = pl.DataFrame({"id": [10, 20], "id2": [100, 200]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.has_correlation("id", "id2", 1.0)
# result = check.validate(df).select(pl.col('status')) == "PASS"
# assert all(result.to_series().to_list())
assert True
df = pl.DataFrame({"id": [10, 20, 30, None], "id2": [100, 200, 300, 400]})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


def test_values(check: Check):
check.has_correlation("id", "id2", 1.0)
df = pl.DataFrame({"id": [1, 2, 3], "id2": [1.0, 2.0, 3.0]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
with pytest.raises(TypeError, match="positional arguments"):
check.has_correlation("id", "id2", 1.0, 0.75)
32 changes: 32 additions & 0 deletions test/unit/polars_dataframe/test_has_entropy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import polars as pl
from cuallee import Check
import pytest


def test_positive(check: Check):
check.has_entropy("id", 1.0)
df = pl.DataFrame({"id": [1, 1, 1, 0, 0, 0]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.has_entropy("id", 1.0)
df = pl.DataFrame({"id": [10, 10, 10, 10, 50]})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


@pytest.mark.parametrize(
"values", [[1], [1, 1, 1, 1, 1]], ids=("observation", "classes")
)
def test_parameters(check: Check, values):
check.has_entropy("id", 0.0)
df = pl.DataFrame({"id": values})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
with pytest.raises(TypeError):
check.has_entropy("id", 1.0, pct=0.5)
16 changes: 16 additions & 0 deletions test/unit/polars_dataframe/test_has_max.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from cuallee import Check
import polars as pl


def test_positive(check: Check):
df = pl.DataFrame({"id": [1, 2, 3, 4, 5]})
check.has_max("id", 5)
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
df = pl.DataFrame({"id": [1, 2, 3, 4, 5]})
check.has_max("id", 10)
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())
34 changes: 34 additions & 0 deletions test/unit/polars_dataframe/test_has_max_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import polars as pl
from cuallee import Check
import pytest


def test_positive(check: Check):
check.has_max_by("id", "id2", 20)
df = pl.DataFrame({"id": [10, 20], "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.has_max_by("id", "id2", 10)
df = pl.DataFrame({"id": [10, 20], "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


@pytest.mark.parametrize(
"answer, columns",
[(20, [10, 20]), ("herminio", ["antoine", "herminio"])],
ids=("numeric", "string"),
)
def test_values(check: Check, answer, columns):
check.has_max_by("id", "id2", answer)
df = pl.DataFrame({"id": columns, "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
with pytest.raises(TypeError):
check.has_max_by("id", "id2", 20, 100)
31 changes: 31 additions & 0 deletions test/unit/polars_dataframe/test_has_mean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import polars as pl
import numpy as np
from cuallee import Check
import pytest


def test_positive(check: Check):
check.has_mean("id", 4.5)
df = pl.DataFrame({"id": np.arange(10)})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.has_mean("id", 5)
df = pl.DataFrame({"id": np.arange(10)})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


@pytest.mark.parametrize("extra_value", [4, 4.0], ids=("int", "float"))
def test_values(check: Check, extra_value):
check.has_mean("id", extra_value)
df = pl.DataFrame({"id": [0, 1, 2, 3, 14] + [extra_value]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
with pytest.raises(TypeError):
check.has_mean("id", 5, 0.1)
34 changes: 34 additions & 0 deletions test/unit/polars_dataframe/test_has_min_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import polars as pl
from cuallee import Check
import pytest


def test_positive(check: Check):
check.has_min_by("id", "id2", 300)
df = pl.DataFrame({"id": [10, 20], "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_negative(check: Check):
check.has_min_by("id", "id2", 50)
df = pl.DataFrame({"id": [10, 20], "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "FAIL"
assert all(result.to_series().to_list())


@pytest.mark.parametrize(
"answer, columns",
[(10, [10, 20]), ("antoine", ["antoine", "herminio"])],
ids=("numeric", "string"),
)
def test_values(check: Check, answer, columns):
check.has_min_by("id2", "id", answer)
df = pl.DataFrame({"id": columns, "id2": [300, 500]})
result = check.validate(df).select(pl.col("status")) == "PASS"
assert all(result.to_series().to_list())


def test_coverage(check: Check):
with pytest.raises(TypeError):
check.has_min_by("id2", "id", 20, 100)
Loading

0 comments on commit e245850

Please sign in to comment.