This project implements a data processing pipeline that simulates AWS services using Python and Moto.
It ingests daily CSV transaction files, validates them, applies transformations, and outputs Parquet files into a curated bucket.
The pipeline can be triggered either directly via Python or through an API endpoint (FastAPI).
- Schema validation: Ensures required columns (
customer_id
,transaction_amount
,transaction_date
) exist. - Data type validation (lenient): Invalid rows (bad dates/amounts) are dropped, valid rows continue.
- Transformations:
- Remove negative transactions
- Standardize
transaction_date
→YYYY-MM-DD
- Add
transaction_fee = transaction_amount * 0.02
- Output: Writes cleaned data into an S3 bucket (mocked with Moto) in Parquet format.
- Logging: Structured logs (like AWS CloudWatch).
- API trigger: Run the pipeline manually via FastAPI + Uvicorn.
data-pipeline/
│
├── pipeline.py # Core pipeline logic (validation, transform, Parquet output)
├── api.py # FastAPI server to trigger pipeline via HTTP
├── test_api.py # Example client to call the API
├── requirements.txt # Dependencies (minimal set)
└── README.md # Project documentation
-
Clone project & create a virtual environment
python -m venv venv source venv/bin/activate # Mac/Linux venv\Scripts\activate # Windows
-
Install dependencies
pip install -r requirements.txt
python pipeline.py
- This runs the pipeline with a demo CSV uploaded to a mocked S3 bucket.
- Logs will show schema checks, rows dropped, transformations, and Parquet output.
-
Start FastAPI server:
uvicorn api:app --reload
-
Open Swagger UI at http://127.0.0.1:8000/docs.
- Use the
/run
endpoint to upload a CSV and trigger the pipeline.
- Use the
-
Or run the provided client script:
python test_api.py
Example response:
{ "input_key": "2025-08-30/transactions.csv", "output_key": "2025-08-30/transactions.parquet", "rows_output": 1 }
This mocked pipeline maps directly to AWS services:
- Moto S3 → Amazon S3
- Python logging → CloudWatch Logs
- Pipeline function → AWS Lambda
- FastAPI → API Gateway + Lambda
- Parquet in S3 → Queryable via Athena/Redshift
customer_id,transaction_amount,transaction_date
1,100,2025-08-27
2,-50,2025-08-28
3,abc,2025-08-29
4,400,yesterday
Output (Parquet):
- Row 2 dropped (negative amount).
- Row 3 dropped (invalid amount).
- Row 4 dropped (invalid date).
- Only row 1 survives.
Please refer to "Mapping to AWS.txt" and "short_explanation.txt" to take a look at my responses for the other questions.
** Looking forward to hearing back from you!!!** - Nishok Ilangovan