Skip to content

Sidra-009/cleancore-python-library

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CleanCore 🔍

Stop shipping unobserved data.

cleancore is a zero-dependency Python library that adds automatic audit trails and schema drift detection to your data pipelines.

Think of it like Git for your Data Rows — every change is tracked, every type mismatch is caught, before it breaks your production models.

PyPI version Python License: MIT


Why CleanCore?

Data pipelines fail silently. A column quietly changes from int to str. A price field becomes None. Your model trains on garbage — and you never know why.

CleanCore wraps your existing pipeline functions and automatically:

  • Records what changed (row-level diff)
  • Detects type drift (int → str, float → None)
  • Prints a clean dashboard after every run
  • Exports a full JSON audit trail

No config. No new infrastructure. Just one decorator.


Features

Feature Description
@audit_trail Decorator — wraps any function, records before/after diff
Schema Sentinel Detects type drift (int → str) and null regressions (float → NoneType)
Big Data Engine Chunk-based processing (10k rows/batch) — no memory crashes
JSON Export Full audit trail saved to file for compliance or debugging
CLI Tool cleancore report, cleancore validate from terminal
Zero Dependencies Pure Python — no pandas, numpy, or anything required
Pandas / Polars Optional support — works automatically if installed

Installation

pip install cleancore

That's it. No extra dependencies needed.


Quick Start

Basic usage — 3 lines

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="MASK_PII")
def clean_emails(data):
    for row in data:
        row['email'] = "***@***"
    return data

my_data = [
    {"id": 1, "email": "alice@example.com"},
    {"id": 2, "email": "bob@example.com"},
]

with ProvenaLogger("My_Pipeline") as logger:
    result = clean_emails(my_data, provena_logger=logger)

Output — printed automatically when the with block ends:

+--------------------------------------------------------------+
| PIPELINE START | My_Pipeline | MODE=LAZY                    |
+--------------------------------------------------------------+
...
  [SUMMARY]  Steps=1  |  In=2  Out=2  Delta=0  |  Wall=1.2ms

  STEP                 RULE             IN      OUT    MODIFIED       ms
  ------------------------------------------------------------------------
  clean_emails         MASK_PII          2        2           2      1.1

Schema Sentinel — Type Drift Detection

CleanCore automatically catches when a column's type changes between steps.

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="TYPE_BUG")
def process(data):
    for row in data:
        row['age'] = str(row['age'])   # Bug: int accidentally cast to str
    return data

data = [{"id": 1, "age": 25}, {"id": 2, "age": 30}]

with ProvenaLogger("Compliance_Pipeline") as logger:
    process(data, provena_logger=logger)

Output:

  [SCHEMA]  Schema Sentinel
  ----------------------------------------
  COLUMN               KIND         FROM         TO           DETECTED IN
  ------------------------------------------------------------------------
  age                  [WARN]       int          str          process

No more silent type bugs.


Multiple Steps in a Pipeline

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="FILTER_INACTIVE")
def remove_inactive(data):
    return [row for row in data if row['active']]

@audit_trail(rule_id="MASK_PII")
def mask_emails(data):
    for row in data:
        row['email'] = "***@***"
    return data

@audit_trail(rule_id="NORMALIZE_SALARY")
def normalize(data):
    for row in data:
        row['salary'] = round(row['salary'] / 1000, 2)
    return data

employees = [
    {"id": 1, "email": "ali@co.com",   "salary": 55000, "active": True},
    {"id": 2, "email": "sara@co.com",  "salary": 62000, "active": False},
    {"id": 3, "email": "ahmed@co.com", "salary": 48000, "active": True},
]

with ProvenaLogger("HR_Pipeline") as logger:
    step1 = remove_inactive(employees,   provena_logger=logger)
    step2 = mask_emails(step1,           provena_logger=logger)
    step3 = normalize(step2,             provena_logger=logger)

Save Audit Trail to JSON

with ProvenaLogger("Production_Pipeline") as logger:
    result = clean_emails(my_data, provena_logger=logger)

# Save full audit log to file
logger.export_json("audit_2024.json")
# → [EXPORT] audit_2024.json

The JSON file contains every step, every schema diff, timestamps, row counts, and duration.


Works with Pandas

import pandas as pd
from cleancore import audit_trail, ProvenaLogger

df = pd.DataFrame({
    'name':   ['Ali', 'Sara', 'Ahmed'],
    'salary': [50000, 60000, 55000]
})

@audit_trail(rule_id="SALARY_BUMP")
def give_raise(df):
    df['salary'] = df['salary'] + 5000
    return df

with ProvenaLogger("HR_Pipeline") as logger:
    result = give_raise(df, provena_logger=logger)

No extra setup needed — CleanCore detects pandas automatically.


Big Data — 100k+ Rows

CleanCore processes large datasets in 10,000-row chunks to prevent memory crashes:

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="LARGE_FILTER", chunk_size=10_000)
def filter_data(data):
    return [row for row in data if row['value'] > 0.5]

# Generator — memory-safe, no full load
large_data = ({"id": i, "value": i / 100000} for i in range(100_000))

with ProvenaLogger("Big_Data_Pipeline") as logger:
    result = filter_data(large_data, provena_logger=logger)

print(f"Processed: {len(result)} rows")

CLI Tool

After installing, you get the cleancore command:

# Pretty-print an audit JSON file
cleancore report audit_2024.json

# Validate — exits with code 1 if critical drift found (use in CI/CD)
cleancore validate audit_2024.json

# Raw JSON dump
cleancore dump audit_2024.json

# Check version
cleancore --version

Use cleancore validate in your GitHub Actions to fail builds when data drift is detected.


API Reference

@audit_trail(rule_id, chunk_size)

Decorator that records input/output diff for any function.

Parameter Type Default Description
rule_id str function name Label for this rule in the audit log
chunk_size int 10_000 Rows per batch for large datasets

The decorated function receives an extra keyword argument provena_logger at call time — pass your ProvenaLogger instance there.


ProvenaLogger(name, auto_report, lazy)

Context manager that collects all step logs and prints the dashboard.

Parameter Type Default Description
name str required Pipeline name shown in the report
auto_report bool True Print dashboard automatically on exit
lazy bool True Label in report header (LAZY / EAGER)

Methods:

  • logger.export_json("path.json") — save full audit trail to file
  • logger.get_reporter() — get a ProvenaReporter instance for custom rendering

detect_drift(schema_before, schema_after)

Compare two schemas and return drifted columns.

from cleancore import detect_drift

before = {"age": "int", "price": "float"}
after  = {"age": "str", "price": "NoneType"}

drifts = detect_drift(before, after)
# {"age": {"from": "int", "to": "str", "kind": "type_drift"},
#  "price": {"from": "float", "to": "NoneType", "kind": "nullified"}}

Drift kinds: type_drift, nullified, imputed, added, dropped


infer_schema(rows)

Infer column types from a list of dicts.

from cleancore import infer_schema

rows = [{"id": 1, "name": "Ali", "score": 9.5}]
infer_schema(rows)
# {"id": "int", "name": "str", "score": "float"}

Contributing

CleanCore is open-source and contributions are welcome!

GitHub: github.com/Sidra-009/cleancore-python-library

To contribute:

  1. Fork the repo
  2. Create a branch: git checkout -b feature/my-feature
  3. Make your changes and add tests in tests/
  4. Run tests: pytest tests/ -v
  5. Open a Pull Request

License

MIT License — see LICENSE for details.


Built by Sidra Saqlain · Published on PyPI

About

Zero-dependency Python library for data pipeline observability — audit trails, schema drift detection, and row-level change tracking.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages