Stop shipping unobserved data.
cleancore is a zero-dependency Python library that adds automatic audit trails and schema drift detection to your data pipelines.
Think of it like Git for your Data Rows — every change is tracked, every type mismatch is caught, before it breaks your production models.
Data pipelines fail silently. A column quietly changes from int to str. A price field becomes None. Your model trains on garbage — and you never know why.
CleanCore wraps your existing pipeline functions and automatically:
- Records what changed (row-level diff)
- Detects type drift (
int → str,float → None) - Prints a clean dashboard after every run
- Exports a full JSON audit trail
No config. No new infrastructure. Just one decorator.
| Feature | Description |
|---|---|
@audit_trail |
Decorator — wraps any function, records before/after diff |
| Schema Sentinel | Detects type drift (int → str) and null regressions (float → NoneType) |
| Big Data Engine | Chunk-based processing (10k rows/batch) — no memory crashes |
| JSON Export | Full audit trail saved to file for compliance or debugging |
| CLI Tool | cleancore report, cleancore validate from terminal |
| Zero Dependencies | Pure Python — no pandas, numpy, or anything required |
| Pandas / Polars | Optional support — works automatically if installed |
pip install cleancoreThat's it. No extra dependencies needed.
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="MASK_PII")
def clean_emails(data):
for row in data:
row['email'] = "***@***"
return data
my_data = [
{"id": 1, "email": "alice@example.com"},
{"id": 2, "email": "bob@example.com"},
]
with ProvenaLogger("My_Pipeline") as logger:
result = clean_emails(my_data, provena_logger=logger)Output — printed automatically when the with block ends:
+--------------------------------------------------------------+
| PIPELINE START | My_Pipeline | MODE=LAZY |
+--------------------------------------------------------------+
...
[SUMMARY] Steps=1 | In=2 Out=2 Delta=0 | Wall=1.2ms
STEP RULE IN OUT MODIFIED ms
------------------------------------------------------------------------
clean_emails MASK_PII 2 2 2 1.1
CleanCore automatically catches when a column's type changes between steps.
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="TYPE_BUG")
def process(data):
for row in data:
row['age'] = str(row['age']) # Bug: int accidentally cast to str
return data
data = [{"id": 1, "age": 25}, {"id": 2, "age": 30}]
with ProvenaLogger("Compliance_Pipeline") as logger:
process(data, provena_logger=logger)Output:
[SCHEMA] Schema Sentinel
----------------------------------------
COLUMN KIND FROM TO DETECTED IN
------------------------------------------------------------------------
age [WARN] int str process
No more silent type bugs.
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="FILTER_INACTIVE")
def remove_inactive(data):
return [row for row in data if row['active']]
@audit_trail(rule_id="MASK_PII")
def mask_emails(data):
for row in data:
row['email'] = "***@***"
return data
@audit_trail(rule_id="NORMALIZE_SALARY")
def normalize(data):
for row in data:
row['salary'] = round(row['salary'] / 1000, 2)
return data
employees = [
{"id": 1, "email": "ali@co.com", "salary": 55000, "active": True},
{"id": 2, "email": "sara@co.com", "salary": 62000, "active": False},
{"id": 3, "email": "ahmed@co.com", "salary": 48000, "active": True},
]
with ProvenaLogger("HR_Pipeline") as logger:
step1 = remove_inactive(employees, provena_logger=logger)
step2 = mask_emails(step1, provena_logger=logger)
step3 = normalize(step2, provena_logger=logger)with ProvenaLogger("Production_Pipeline") as logger:
result = clean_emails(my_data, provena_logger=logger)
# Save full audit log to file
logger.export_json("audit_2024.json")
# → [EXPORT] audit_2024.jsonThe JSON file contains every step, every schema diff, timestamps, row counts, and duration.
import pandas as pd
from cleancore import audit_trail, ProvenaLogger
df = pd.DataFrame({
'name': ['Ali', 'Sara', 'Ahmed'],
'salary': [50000, 60000, 55000]
})
@audit_trail(rule_id="SALARY_BUMP")
def give_raise(df):
df['salary'] = df['salary'] + 5000
return df
with ProvenaLogger("HR_Pipeline") as logger:
result = give_raise(df, provena_logger=logger)No extra setup needed — CleanCore detects pandas automatically.
CleanCore processes large datasets in 10,000-row chunks to prevent memory crashes:
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="LARGE_FILTER", chunk_size=10_000)
def filter_data(data):
return [row for row in data if row['value'] > 0.5]
# Generator — memory-safe, no full load
large_data = ({"id": i, "value": i / 100000} for i in range(100_000))
with ProvenaLogger("Big_Data_Pipeline") as logger:
result = filter_data(large_data, provena_logger=logger)
print(f"Processed: {len(result)} rows")After installing, you get the cleancore command:
# Pretty-print an audit JSON file
cleancore report audit_2024.json
# Validate — exits with code 1 if critical drift found (use in CI/CD)
cleancore validate audit_2024.json
# Raw JSON dump
cleancore dump audit_2024.json
# Check version
cleancore --versionUse cleancore validate in your GitHub Actions to fail builds when data drift is detected.
Decorator that records input/output diff for any function.
| Parameter | Type | Default | Description |
|---|---|---|---|
rule_id |
str |
function name | Label for this rule in the audit log |
chunk_size |
int |
10_000 |
Rows per batch for large datasets |
The decorated function receives an extra keyword argument provena_logger at call time — pass your ProvenaLogger instance there.
Context manager that collects all step logs and prints the dashboard.
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Pipeline name shown in the report |
auto_report |
bool |
True |
Print dashboard automatically on exit |
lazy |
bool |
True |
Label in report header (LAZY / EAGER) |
Methods:
logger.export_json("path.json")— save full audit trail to filelogger.get_reporter()— get aProvenaReporterinstance for custom rendering
Compare two schemas and return drifted columns.
from cleancore import detect_drift
before = {"age": "int", "price": "float"}
after = {"age": "str", "price": "NoneType"}
drifts = detect_drift(before, after)
# {"age": {"from": "int", "to": "str", "kind": "type_drift"},
# "price": {"from": "float", "to": "NoneType", "kind": "nullified"}}Drift kinds: type_drift, nullified, imputed, added, dropped
Infer column types from a list of dicts.
from cleancore import infer_schema
rows = [{"id": 1, "name": "Ali", "score": 9.5}]
infer_schema(rows)
# {"id": "int", "name": "str", "score": "float"}CleanCore is open-source and contributions are welcome!
GitHub: github.com/Sidra-009/cleancore-python-library
To contribute:
- Fork the repo
- Create a branch:
git checkout -b feature/my-feature - Make your changes and add tests in
tests/ - Run tests:
pytest tests/ -v - Open a Pull Request
MIT License — see LICENSE for details.
Built by Sidra Saqlain · Published on PyPI