Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 64 additions & 55 deletions backend/routers/discoveries.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,64 @@
"""GET/POST/DELETE /api/discoveries."""

from fastapi import APIRouter, HTTPException
import agents.OrchestratorAgent # Import module, not variable

from utils.db import get_discovery, list_discoveries, save_discovery


router = APIRouter()


@router.get("/discoveries")
async def list_all():
return await list_discoveries()


@router.get("/discoveries/{discovery_id}")
async def get_one(discovery_id: str):
d = await get_discovery(discovery_id)
if not d:
raise HTTPException(status_code=404, detail="Discovery not found")
return d


@router.post("/discoveries/{session_id}/save")
async def save_session(session_id: str):
# Try Redis first (survives restarts), then fall back to in-memory
state = await get_session_redis(session_id)
if not state:
state = agents.OrchestratorAgent._sessions.get(session_id)

if not state:
raise HTTPException(status_code=404, detail="Session not found")

did = await save_discovery(state)
if not did:
from utils.db import get_engine
engine = get_engine()
detail = "DB not configured" if not engine else "DB insert failed"
raise HTTPException(status_code=500, detail=detail)
return {"discovery_id": did}


@router.delete("/discoveries/{discovery_id}")
async def delete_one(discovery_id: str):
from sqlalchemy import text

from utils.db import get_engine

engine = get_engine()
if not engine:
raise HTTPException(status_code=503, detail="DB not configured")
async with engine.begin() as conn:
await conn.execute(text("DELETE FROM discoveries WHERE id = :id"), {"id": discovery_id})
return {"deleted": discovery_id}
"""GET/POST/DELETE /api/discoveries."""

from fastapi import APIRouter, HTTPException

from utils.db import get_discovery, list_discoveries, save_discovery

router = APIRouter()


@router.get("/discoveries")
async def list_all():
return await list_discoveries()


@router.get("/discoveries/{discovery_id}")
async def get_one(discovery_id: str):
d = await get_discovery(discovery_id)
if not d:
raise HTTPException(status_code=404, detail="Discovery not found")
return d


@router.post("/discoveries/{session_id}/save")
async def save_session(session_id: str):
import agents.OrchestratorAgent # Import module, not variable

state = agents.OrchestratorAgent._sessions.get(session_id)

# Not in live memory — try recovering from Neon
if not state:
try:
from utils.db import get_session_by_session_id
state = await get_session_by_session_id(session_id)
except Exception:
state = None

if not state:
raise HTTPException(status_code=404, detail="Session not found")

did = await save_discovery(state)
if not did:
from utils.db import _build_dsn
detail = "DB not configured" if not _build_dsn() else "DB insert failed"
raise HTTPException(status_code=500, detail=detail)
return {"discovery_id": did}


@router.delete("/discoveries/{discovery_id}")
async def delete_one(discovery_id: str):
from utils.db import _get_conn, _build_dsn

if not _build_dsn():
raise HTTPException(status_code=503, detail="DB not configured")

conn = await _get_conn()
if not conn:
raise HTTPException(status_code=503, detail="DB connection failed")
try:
await conn.execute(
"DELETE FROM discoveries WHERE id = $1", discovery_id
)
finally:
await conn.close()
return {"deleted": discovery_id}
115 changes: 67 additions & 48 deletions backend/routers/molecules.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,67 @@
"""GET /api/molecules/{session_id}."""

from fastapi import APIRouter, HTTPException

router = APIRouter()


@router.get("/molecules/{session_id}")
async def get_molecules(session_id: str):
from agents.OrchestratorAgent import _sessions

state = _sessions.get(session_id)
if not state:
raise HTTPException(status_code=404, detail="Session not found")
return {
"session_id": session_id,
"query": state.get("query"),
"mutation_context": state.get("mutation_context"),
"literature": state.get("literature", []),
"structures": state.get("structures", []),
"pdb_content": state.get("pdb_content", ""),
"binding_pocket": state.get("binding_pocket"),
"pocket_detection_method": state.get("pocket_detection_method"),
"pocket_delta": state.get("pocket_delta"),
"generated_molecules": state.get("generated_molecules", []),
"docking_results": state.get("docking_results", []),
"selectivity_results": state.get("selectivity_results", []),
"admet_profiles": state.get("admet_profiles", []),
"toxicophore_highlights": state.get("toxicophore_highlights", []),
"optimized_leads": state.get("optimized_leads", []),
"evolution_tree": state.get("evolution_tree"),
"similar_compounds": state.get("similar_compounds", []),
"synergy_predictions": state.get("synergy_predictions", []),
"clinical_trials": state.get("clinical_trials", []),
"knowledge_graph": state.get("knowledge_graph"),
"reasoning_trace": state.get("reasoning_trace"),
"summary": state.get("summary"),
"resistance_forecast": state.get("resistance_forecast"),
"resistant_drugs": state.get("resistant_drugs", []),
"recommended_drugs": state.get("recommended_drugs", []),
"final_report": state.get("final_report"),
"status": state.get("status"),
"cancelled": state.get("cancelled", False),
"agent_statuses": state.get("agent_statuses", {}),
"execution_time_ms": state.get("execution_time_ms", 0),
"langsmith_run_id": state.get("langsmith_run_id"),
"llm_provider_used": state.get("llm_provider_used", "unknown"),
}
"""GET /api/molecules/{session_id}."""

from fastapi import APIRouter, HTTPException

router = APIRouter()


@router.get("/molecules/{session_id}")
async def get_molecules(session_id: str):
import agents.OrchestratorAgent # Import module, not variable

state = agents.OrchestratorAgent._sessions.get(session_id)

# Not in memory (backend restarted) — try recovering from Neon
if not state:
try:
from utils.db import get_session_by_session_id
state = await get_session_by_session_id(session_id)
except Exception:
state = None

if not state:
raise HTTPException(status_code=404, detail="Session not found")

return {
"session_id": session_id,
"query": state.get("query"),
"mutation_context": state.get("mutation_context"),
"literature": state.get("literature", []),
"structures": state.get("structures", []),
"pdb_content": state.get("pdb_content", ""),
"binding_pocket": state.get("binding_pocket"),
"pocket_detection_method": state.get("pocket_detection_method"),
"pocket_delta": state.get("pocket_delta"),
"generated_molecules": state.get("generated_molecules", []),
"docking_results": state.get("docking_results", []),
"selectivity_results": state.get("selectivity_results", []),
"admet_profiles": state.get("admet_profiles", []),
"toxicophore_highlights": state.get("toxicophore_highlights", []),
"optimized_leads": state.get("optimized_leads", []),
"evolution_tree": state.get("evolution_tree"),
"similar_compounds": state.get("similar_compounds", []),
"synergy_predictions": state.get("synergy_predictions", []),
"clinical_trials": state.get("clinical_trials", []),
"knowledge_graph": state.get("knowledge_graph"),
"reasoning_trace": state.get("reasoning_trace"),
"summary": state.get("summary"),
"resistance_flags": state.get("resistance_flags", []),
"resistance_forecast": state.get("resistance_forecast"),
"resistant_drugs": state.get("resistant_drugs", []),
"recommended_drugs": state.get("recommended_drugs", []),
"md_results": state.get("md_results", []),
"sa_scores": state.get("sa_scores", []),
"synthesis_routes": state.get("synthesis_routes", []),
"confidence": state.get("confidence"),
"confidence_banner": state.get("confidence_banner"),
"esm1v_score": state.get("esm1v_score"),
"esm1v_confidence": state.get("esm1v_confidence"),
"final_report": state.get("final_report"),
"status": state.get("status"),
"cancelled": state.get("cancelled", False),
"agent_statuses": state.get("agent_statuses", {}),
"execution_time_ms": state.get("execution_time_ms", 0),
"langsmith_run_id": state.get("langsmith_run_id"),
"llm_provider_used": state.get("llm_provider_used", "unknown"),
"discovery_id": state.get("discovery_id"),
}
94 changes: 56 additions & 38 deletions backend/routers/stream.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
"""GET /api/stream/{session_id} — SSE event stream."""

import asyncio
import json

from fastapi import APIRouter
from fastapi.responses import StreamingResponse

router = APIRouter()


@router.get("/stream/{session_id}")
async def stream(session_id: str):
from agents.OrchestratorAgent import _sse_queues
from agents.OrchestratorAgent import _sessions

async def event_generator():
queue = _sse_queues.get(session_id)
if not queue and session_id in _sessions:
queue = asyncio.Queue()
_sse_queues[session_id] = queue
if not queue:
yield f"data: {json.dumps({'event': 'error', 'message': 'Session not found'})}\n\n"
return
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=120)
yield f"data: {json.dumps(event, default=str)}\n\n"
if event.get("event") == "pipeline_complete":
break
except asyncio.TimeoutError:
yield f"data: {json.dumps({'event': 'heartbeat'})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
"""GET /api/stream/{session_id} — SSE event stream."""

import asyncio
import json

from fastapi import APIRouter
from fastapi.responses import StreamingResponse

router = APIRouter()


@router.get("/stream/{session_id}")
async def stream(session_id: str):
from agents.OrchestratorAgent import _sessions, _sse_queues

async def event_generator():
# ── Live session in memory ─────────────────────────────────────────
queue = _sse_queues.get(session_id)
if not queue and session_id in _sessions:
queue = asyncio.Queue()
_sse_queues[session_id] = queue

if queue:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=120)
yield f"data: {json.dumps(event, default=str)}\n\n"
if event.get("event") == "pipeline_complete":
break
except asyncio.TimeoutError:
yield f"data: {json.dumps({'event': 'heartbeat'})}\n\n"
return

# ── Session not in memory — try recovering from Neon ───────────────
try:
from utils.db import get_session_by_session_id

state = await get_session_by_session_id(session_id)
if state and state.get("final_report"):
# Emit pipeline_complete immediately so the frontend skips
# the "running" view and goes straight to results.
yield (
f"data: {json.dumps({'event': 'pipeline_complete', 'data': state}, default=str)}\n\n"
)
return
except Exception:
pass

# ── Truly not found ────────────────────────────────────────────────
yield f"data: {json.dumps({'event': 'not_found', 'message': 'Session not found'})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Loading
Loading