Skip to content

Commit 651977d

Browse files
authored
Merge pull request #1671 from bluewave-labs/develop
Merging develop into master: June 18th 2025
2 parents ad31c2e + 60d3543 commit 651977d

File tree

38 files changed

+871
-345
lines changed

38 files changed

+871
-345
lines changed

.env.dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ FRONTEND_PORT=8082
66
FRONTEND_URL=http://localhost:8082
77
ALLOWED_ORIGINS=["http://localhost:5173", "http://localhost:8082"]
88
BIAS_AND_FAIRNESS_PORT=8000
9+
NODE_ENV=development
910

1011
# Database
1112
DB_PORT=5432

.env.prod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ FRONTEND_PORT=8080
66
FRONTEND_URL=http://localhost:8080 # localhost needs to be replaced by ip
77
ALLOWED_ORIGINS=["http://localhost:5173", "http://localhost:8080"] # localhost needs to be replaced by ip
88
BIAS_AND_FAIRNESS_PORT=8000
9+
NODE_ENV=production
10+
# NOTE: Please do not change this unless the database is hosted on a different server and you want to use SSL for the connection
11+
DB_SSL=false
912

1013
# Database
1114
DB_PORT=5432

BiasAndFairnessServers/src/app.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import FastAPI
66
from fastapi.middleware.cors import CORSMiddleware
77
from routers.bias_and_fairness import router as bias_and_fairness
8+
from database.redis import close_redis
89
from alembic.config import Config
910
from alembic import command
1011

@@ -19,7 +20,10 @@ def run_migrations():
1920
except Exception as e:
2021
logger.info(f"Error running migrations: {e}")
2122

22-
app = FastAPI(on_startup=[run_migrations])
23+
async def shutdown_redis():
24+
await close_redis()
25+
26+
app = FastAPI(on_startup=[run_migrations], on_shutdown=[shutdown_redis])
2327

2428
# enable CORS
2529
origins = [os.environ.get("BACKEND_URL") or "http://localhost:3000"]

BiasAndFairnessServers/src/controllers/bias_and_fairness.py

Lines changed: 37 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
import io
2+
import asyncio
13
import json
2-
from fastapi.responses import JSONResponse
4+
from fastapi.responses import JSONResponse, Response
35
from fastapi import HTTPException
46
from crud.bias_and_fairness import upload_model, upload_data, insert_metrics, get_metrics_by_id, get_all_metrics_query, delete_metrics_by_id
57
from utils.run_bias_and_fairness_check import analyze_fairness
8+
from utils.handle_files_uploads import process_files
69
from database.db import get_db
7-
from fastapi import UploadFile
10+
from fastapi import UploadFile, BackgroundTasks
11+
from database.redis import get_next_job_id, get_job_status, delete_job_status
812

913
async def get_all_metrics():
1014
"""
@@ -60,67 +64,39 @@ async def get_metrics(id: int):
6064
detail=f"Failed to retrieve metrics, {str(e)}"
6165
)
6266

63-
async def handle_upload(model: UploadFile, data: UploadFile, target_column: str, sensitive_column: str):
67+
async def get_upload_status(job_id: int):
68+
value = await get_job_status(job_id)
69+
if value is None:
70+
return Response(status_code=204)
71+
await delete_job_status(job_id)
72+
return JSONResponse(
73+
status_code=200,
74+
content=value,
75+
media_type="application/json"
76+
)
77+
78+
async def handle_upload(background_tasks: BackgroundTasks, model: UploadFile, data: UploadFile, target_column: str, sensitive_column: str):
6479
"""
6580
Handle file upload from the client.
6681
"""
67-
try:
68-
async with get_db() as db:
69-
transaction = await db.begin()
70-
71-
model_content = await model.read()
72-
model_filename = model.filename
73-
data_content = await data.read()
74-
data_filename = data.filename
75-
76-
if not model_content or not data_content:
77-
raise ValueError("model or data file is empty")
78-
if not model_filename or not data_filename:
79-
raise ValueError("model or data file name is empty")
80-
81-
upload_model_record = await upload_model(content=model_content, name=model_filename, db=db)
82-
83-
if not upload_model_record:
84-
raise Exception("failed to upload model file")
85-
86-
upload_data_record = await upload_data(
87-
content=data_content,
88-
name=data_filename,
89-
target_column=target_column,
90-
sensitive_column=sensitive_column,
91-
model_id=upload_model_record.id,
92-
db=db
93-
)
94-
95-
if not upload_data_record:
96-
raise Exception("failed to upload data file")
97-
result = analyze_fairness(
98-
model_content=model_content,
99-
data_content=data_content,
100-
target_column=target_column,
101-
sensitive_column=sensitive_column
102-
)
103-
104-
metrics = await insert_metrics(json.dumps(result), upload_data_record.id, db)
105-
if not metrics:
106-
raise Exception("failed to insert metrics")
107-
108-
await transaction.commit()
109-
110-
return JSONResponse(
111-
status_code=200,
112-
content={
113-
"model_id": upload_model_record.id,
114-
"data_id": upload_data_record.id,
115-
"metrics_id": metrics.id,
116-
"metrics": result
117-
}
118-
)
119-
except Exception as e:
120-
raise HTTPException(
121-
status_code=500,
122-
detail=f"Failed to handle upload, {str(e)}"
123-
)
82+
job_id = await get_next_job_id()
83+
response = JSONResponse(status_code=202, content={
84+
"message": "Processing started",
85+
"job_id": job_id,
86+
"model_filename": model.filename.replace(".gz", "") if model.filename else "",
87+
"data_filename": data.filename.replace(".gz", "") if data.filename else ""
88+
}, media_type="application/json")
89+
model_ = {
90+
"filename": model.filename,
91+
"content": await model.read()
92+
}
93+
data_ = {
94+
"filename": data.filename,
95+
"content": await data.read()
96+
}
97+
# create a job ID or use a unique identifier for the task
98+
background_tasks.add_task(process_files, job_id, model_, data_, target_column, sensitive_column)
99+
return response
124100

125101
async def delete_metrics(id: int):
126102
"""
@@ -135,10 +111,7 @@ async def delete_metrics(id: int):
135111
detail=f"Metrics with ID {id} not found"
136112
)
137113
await db.commit()
138-
return JSONResponse(
139-
status_code=204,
140-
content=None
141-
)
114+
return Response(status_code=204)
142115
except HTTPException as he:
143116
raise he
144117
except Exception as e:
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import os
2+
import aioredis
3+
import json
4+
5+
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
6+
7+
redis = None
8+
9+
async def get_redis():
10+
global redis
11+
if not redis:
12+
redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
13+
return redis
14+
15+
async def close_redis():
16+
global redis
17+
if redis:
18+
await redis.close()
19+
redis = None
20+
21+
async def get_next_job_id():
22+
r = await get_redis()
23+
# Use Redis INCR to generate unique job IDs
24+
job_id = await r.incr("job_id_counter")
25+
return job_id
26+
27+
async def set_job_status(job_id: int, status: dict):
28+
r = await get_redis()
29+
# Store job status JSON serialized
30+
await r.set(f"job_status:{job_id}", json.dumps(status), ex=3600)
31+
32+
async def get_job_status(job_id: int):
33+
r = await get_redis()
34+
data = await r.get(f"job_status:{job_id}")
35+
if data:
36+
return json.loads(data)
37+
return None
38+
39+
async def delete_job_status(job_id: int):
40+
r = await get_redis()
41+
# Delete job status from Redis
42+
await r.delete(f"job_status:{job_id}")
43+
return True

BiasAndFairnessServers/src/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
aioredis==2.0.1
12
alembic==1.16.1
23
annotated-types==0.7.0
34
anyio==4.9.0
45
async-timeout==5.0.1
56
asyncpg==0.30.0
67
cffi==1.17.1
78
click==8.1.8
8-
cryptography==45.0.3
99
exceptiongroup==1.3.0
1010
fairlearn==0.12.0
1111
fastapi==0.115.12

BiasAndFairnessServers/src/routers/bias_and_fairness.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
1-
from fastapi import APIRouter, Form, UploadFile
2-
from controllers.bias_and_fairness import handle_upload as handle_upload_controller, get_metrics as get_metrics_controller, get_all_metrics as get_all_metrics_controller, delete_metrics as delete_metrics_controller
1+
from fastapi import APIRouter, Form, UploadFile, BackgroundTasks
2+
from controllers.bias_and_fairness import handle_upload as handle_upload_controller, get_metrics as get_metrics_controller, get_all_metrics as get_all_metrics_controller, delete_metrics as delete_metrics_controller, get_upload_status as get_upload_status_controller
33

44
router = APIRouter()
55

66
@router.post("/upload")
77
async def upload_model(
8+
background_tasks: BackgroundTasks,
89
model: UploadFile = Form(...),
910
data: UploadFile = Form(...),
1011
target_column: str = Form(...),
1112
sensitive_column: str = Form(...),
1213
):
1314
return await handle_upload_controller(
15+
background_tasks=background_tasks,
1416
model=model,
1517
data=data,
1618
target_column=target_column,
1719
sensitive_column=sensitive_column
1820
)
1921

22+
@router.get("/upload/status/{job_id}")
23+
async def get_upload_status(job_id: int):
24+
return await get_upload_status_controller(job_id)
25+
2026
@router.get("/metrics/all")
2127
async def get_all_metrics():
2228
return await get_all_metrics_controller()
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import json
2+
import gzip
3+
import typing
4+
from fastapi import FastAPI, UploadFile
5+
from crud.bias_and_fairness import upload_model, upload_data, insert_metrics
6+
from utils.run_bias_and_fairness_check import analyze_fairness
7+
from database.db import get_db
8+
from database.redis import set_job_status
9+
10+
@typing.no_type_check
11+
async def process_files(
12+
job_id: int,
13+
model: dict[str, typing.Union[bytes, str]],
14+
data: dict[str, typing.Union[bytes, str]],
15+
target_column: str,
16+
sensitive_column: str,
17+
):
18+
try:
19+
async with get_db() as db:
20+
transaction = await db.begin()
21+
22+
if not model["filename"] or not data["filename"]:
23+
raise ValueError("model or data file name is empty")
24+
model_filename = model["filename"].replace(".gz", "")
25+
data_filename = data["filename"].replace(".gz", "")
26+
27+
if not model["content"] or not data["content"]:
28+
raise ValueError("model or data file is empty")
29+
30+
model_bytes = model["content"]
31+
data_bytes = data["content"]
32+
33+
model_content = (
34+
gzip.decompress(model_bytes) if model["filename"].endswith(".gz") else model_bytes
35+
)
36+
data_content = (
37+
gzip.decompress(data_bytes) if data["filename"].endswith(".gz") else data_bytes
38+
)
39+
40+
upload_model_record = await upload_model(content=model_content, name=model_filename, db=db)
41+
42+
if not upload_model_record:
43+
raise Exception("failed to upload model file")
44+
45+
upload_data_record = await upload_data(
46+
content=data_content,
47+
name=data_filename,
48+
target_column=target_column,
49+
sensitive_column=sensitive_column,
50+
model_id=upload_model_record.id,
51+
db=db
52+
)
53+
54+
if not upload_data_record:
55+
raise Exception("failed to upload data file")
56+
result = analyze_fairness(
57+
model_content=model_content,
58+
data_content=data_content,
59+
target_column=target_column,
60+
sensitive_column=sensitive_column
61+
)
62+
63+
metrics = await insert_metrics(json.dumps(result), upload_data_record.id, db)
64+
if not metrics:
65+
raise Exception("failed to insert metrics")
66+
67+
await transaction.commit()
68+
status = {
69+
"status": "Completed",
70+
"model_filename": model_filename,
71+
"data_filename": data_filename,
72+
"metrics_id": metrics.id,
73+
"metrics": result
74+
}
75+
except Exception as e:
76+
status = {
77+
"status": "Failed",
78+
"error": str(e),
79+
"model_filename": locals().get("model_filename", "***"),
80+
"data_filename": locals().get("data_filename", "***")
81+
}
82+
83+
await set_job_status(job_id, status)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import functools
2+
import gzip
3+
import asyncio
4+
from concurrent.futures import ThreadPoolExecutor
5+
6+
executor = ThreadPoolExecutor()
7+
8+
async def decompress_async(data: bytes) -> bytes:
9+
loop = asyncio.get_event_loop()
10+
return await loop.run_in_executor(executor, gzip.decompress, data)

Clients/env.vars.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
export const ENV_VARs = {
2-
URL: `${window.location.protocol}//${window.location.hostname}:3000` || 'http://localhost:3000/',
2+
URL:
3+
import.meta.env.VITE_APP_API_BASE_URL ?? // keep empty string if set
4+
(typeof window !== 'undefined' // only run in browser
5+
? `${window.location.protocol}//${window.location.hostname}${window.location.protocol === 'https:' ? '' : ':3000'}` // use current URL if not set
6+
: 'http://localhost:3000/'), // final Node/SSR fallback
37
IS_DEMO_APP: import.meta.env.VITE_IS_DEMO_APP === 'true',
48
};

0 commit comments

Comments
 (0)