Skip to content

Commit

Permalink
Feature/335 parquet lf processing (#338)
Browse files Browse the repository at this point in the history
closes #335

---------

Co-authored-by: Adam <[email protected]>
  • Loading branch information
lchen-2101 and jcadam14 authored Feb 6, 2025
1 parent 8b7f742 commit b20b6e2
Show file tree
Hide file tree
Showing 7 changed files with 522 additions and 1,248 deletions.
1,469 changes: 344 additions & 1,125 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,17 @@ packages = [{ include = "regtech_data_validator", from = "src" }]
python = ">=3.12,<4"
pandas = "^2.2.2"
pandera = "^0.22.1"
requests = "^2.32.3"
tabulate = "^0.9.0"
ujson = "^5.9.0"
fsspec = "^2024.6.1"
polars-lts-cpu = "^1.21.0"
pyarrow = "^19.0.0"
boto3 = "~1.36.3"
#pinning due to snyk high vulnerability find
s3fs = { version = "^2024.9.0", extras = ["aiohttp=^3.11.10"] }

[tool.poetry.group.dev.dependencies]
pytest = "8.3.4"
pytest-cov = "6.0.0"
beautifulsoup4 = "^4.12.3"
lxml = "^5.3.0"
requests = "^2.32.3"

[tool.poetry.group.data.dependencies]
openpyxl = "^3.1.5"
Expand Down
31 changes: 23 additions & 8 deletions src/regtech_data_validator/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from regtech_data_validator.data_formatters import df_to_csv, df_to_str, df_to_json, df_to_table, df_to_download
from typing import Annotated, Optional

import polars as pl
import typer
import typer.core

from regtech_data_validator.validator import validate_batch_csv
from regtech_data_validator.validator import validate_batch_csv, validate_lazy_frame
from regtech_data_validator.validation_results import ValidationPhase

# Need to do this because the latest version of typer, if the rich package exists
Expand All @@ -22,6 +21,11 @@
app = typer.Typer(no_args_is_help=True, pretty_exceptions_enable=False)


class FileType(StrEnum):
CSV = 'csv'
PARQUET = 'parquet'


@dataclass
class KeyValueOpt:
key: str
Expand Down Expand Up @@ -57,7 +61,7 @@ def describe() -> None:
@app.command(no_args_is_help=True)
def validate(
path: Annotated[
Path,
str,
typer.Argument(
exists=True,
dir_okay=False,
Expand All @@ -77,6 +81,7 @@ def validate(
),
] = None,
output: Annotated[Optional[OutputFormat], typer.Option()] = OutputFormat.TABLE,
filetype: Annotated[Optional[FileType], typer.Option()] = FileType.CSV,
) -> tuple[bool, pl.DataFrame]:
"""
Validate CFPB data submission
Expand All @@ -87,11 +92,21 @@ def validate(
final_phase = ValidationPhase.LOGICAL
all_findings = []
final_df = pl.DataFrame()
# path = "s3://cfpb-devpub-regtech-sbl-filing-main/upload/2024/1234364890REGTECH006/156.csv"
for validation_results in validate_batch_csv(path, context_dict, batch_size=50000, batch_count=1):
total_findings += validation_results.error_counts.total_count + validation_results.warning_counts.total_count
final_phase = validation_results.phase
all_findings.append(validation_results)
if filetype == FileType.CSV:
for validation_results in validate_batch_csv(path, context_dict, batch_size=50000, batch_count=1):
total_findings += (
validation_results.error_counts.total_count + validation_results.warning_counts.total_count
)
final_phase = validation_results.phase
all_findings.append(validation_results)
elif filetype == FileType.PARQUET:
lf = pl.scan_parquet(path, allow_missing_columns=True)
for validation_results in validate_lazy_frame(lf, context_dict, batch_size=50000, batch_count=1):
total_findings += (
validation_results.error_counts.total_count + validation_results.warning_counts.total_count
)
final_phase = validation_results.phase
all_findings.append(validation_results)

if all_findings:
final_df = pl.concat([v.findings for v in all_findings], how="diagonal")
Expand Down
5 changes: 3 additions & 2 deletions src/regtech_data_validator/data_formatters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ujson
import polars as pl

from typing import Dict, Any
from tabulate import tabulate

from functools import partial
Expand Down Expand Up @@ -54,7 +55,7 @@ def format_findings(df: pl.DataFrame, phase, checks):
"record_no",
"uid",
],
columns="field_number",
on="field_number",
values=["field_name", "field_value"],
aggregate_function="first",
)
Expand Down Expand Up @@ -261,7 +262,7 @@ def process_group_data(group_df, json_results, group_size, checks):
return group_df


def process_chunk(df: pl.DataFrame, validation_id: str, check: SBLCheck) -> [dict]:
def process_chunk(df: pl.DataFrame, validation_id: str, check: SBLCheck) -> Dict[str, Any]:
# once we have a grouped dataframe, working with the data as a
# python dict is much faster
findings_json = ujson.loads(df.write_json())
Expand Down
1 change: 1 addition & 0 deletions src/regtech_data_validator/validation_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ class ValidationResults(object):
is_valid: bool
findings: pl.DataFrame
phase: ValidationPhase
record_count: int = 0
Loading

0 comments on commit b20b6e2

Please sign in to comment.