Skip to content

Nishok2002/Data-Processing-Pipeline

Repository files navigation

Transaction Data Processing Pipeline (Mock AWS with Python)

This project implements a data processing pipeline that simulates AWS services using Python and Moto.

It ingests daily CSV transaction files, validates them, applies transformations, and outputs Parquet files into a curated bucket.
The pipeline can be triggered either directly via Python or through an API endpoint (FastAPI).


Features

  • Schema validation: Ensures required columns (customer_id, transaction_amount, transaction_date) exist.
  • Data type validation (lenient): Invalid rows (bad dates/amounts) are dropped, valid rows continue.
  • Transformations:
    • Remove negative transactions
    • Standardize transaction_dateYYYY-MM-DD
    • Add transaction_fee = transaction_amount * 0.02
  • Output: Writes cleaned data into an S3 bucket (mocked with Moto) in Parquet format.
  • Logging: Structured logs (like AWS CloudWatch).
  • API trigger: Run the pipeline manually via FastAPI + Uvicorn.

Project Structure

data-pipeline/
│
├── pipeline.py        # Core pipeline logic (validation, transform, Parquet output)
├── api.py             # FastAPI server to trigger pipeline via HTTP
├── test_api.py        # Example client to call the API
├── requirements.txt   # Dependencies (minimal set)
└── README.md          # Project documentation

Setup Instructions

  1. Clone project & create a virtual environment

    python -m venv venv
    source venv/bin/activate   # Mac/Linux
    venv\Scripts\activate      # Windows
  2. Install dependencies

    pip install -r requirements.txt

Run the Pipeline (directly)

python pipeline.py
  • This runs the pipeline with a demo CSV uploaded to a mocked S3 bucket.
  • Logs will show schema checks, rows dropped, transformations, and Parquet output.

Run as an API

  1. Start FastAPI server:

    uvicorn api:app --reload
  2. Open Swagger UI at http://127.0.0.1:8000/docs.

    • Use the /run endpoint to upload a CSV and trigger the pipeline.
  3. Or run the provided client script:

    python test_api.py

    Example response:

    {
      "input_key": "2025-08-30/transactions.csv",
      "output_key": "2025-08-30/transactions.parquet",
      "rows_output": 1
    }

AWS Mapping

This mocked pipeline maps directly to AWS services:

  • Moto S3 → Amazon S3
  • Python logging → CloudWatch Logs
  • Pipeline function → AWS Lambda
  • FastAPI → API Gateway + Lambda
  • Parquet in S3 → Queryable via Athena/Redshift

Example Demo CSV

customer_id,transaction_amount,transaction_date
1,100,2025-08-27
2,-50,2025-08-28
3,abc,2025-08-29
4,400,yesterday

Output (Parquet):

  • Row 2 dropped (negative amount).
  • Row 3 dropped (invalid amount).
  • Row 4 dropped (invalid date).
  • Only row 1 survives.

Please refer to "Mapping to AWS.txt" and "short_explanation.txt" to take a look at my responses for the other questions.

** Looking forward to hearing back from you!!!** - Nishok Ilangovan

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages