Skip to content

Commit

Permalink
Merge branch 'main' into advanced_filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Mini256 authored Feb 11, 2025
2 parents fb0ef2f + ce7f50e commit ab44f20
Show file tree
Hide file tree
Showing 19 changed files with 409 additions and 193 deletions.
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ migrate:

run_dev_server:
@echo "Running server..."
@rye run python main.py runserver
@rye run python main.py runserver --host 0.0.0.0 --port 5001

run_dev_celery_worker:
@echo "Running celery..."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -44,3 +45,10 @@ class DocumentItem(BaseModel):
last_modified_at: datetime
created_at: datetime
updated_at: datetime


class RebuildIndexResult(BaseModel):
reindex_document_ids: list[int] = Field(default_factory=list)
ignore_document_ids: list[int] = Field(default_factory=list)
reindex_chunk_ids: list[UUID] = Field(default_factory=list)
ignore_chunk_ids: list[UUID] = Field(default_factory=list)
127 changes: 108 additions & 19 deletions backend/app/api/admin_routes/knowledge_base/document/routes.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import logging
from typing import Annotated

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi_pagination import Params, Page
from sqlmodel import Session

from app.api.admin_routes.knowledge_base.models import ChunkItem
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.models import Document
from app.models.chunk import get_kb_chunk_model
from app.models.chunk import Chunk, KgIndexStatus, get_kb_chunk_model
from app.models.document import DocIndexTaskStatus
from app.models.entity import get_kb_entity_model
from app.models.relationship import get_kb_relationship_model
from app.repositories import knowledge_base_repo, document_repo
from app.repositories.chunk import ChunkRepo
from app.api.admin_routes.knowledge_base.document.models import (
DocumentFilters,
DocumentItem,
RebuildIndexResult,
)
from app.exceptions import (
InternalServerError,
KBNotFound,
DocumentNotFound,
)
from app.exceptions import InternalServerError
from app.repositories.graph import GraphRepo
from app.tasks.build_index import build_index_for_document, build_kg_index_for_chunk
from app.tasks.knowledge_base import stats_for_knowledge_base


Expand All @@ -45,15 +45,15 @@ def list_kb_documents(
filters=filters,
params=params,
)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()


@router.get("/admin/knowledge_bases/{kb_id}/documents/{doc_id}")
def get_document_by_id(
def get_kb_document_by_id(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
Expand All @@ -63,8 +63,8 @@ def get_document_by_id(
document = document_repo.must_get(session, doc_id)
assert document.knowledge_base_id == kb_id
return document
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -81,8 +81,8 @@ def list_kb_document_chunks(
kb = knowledge_base_repo.must_get(session, kb_id)
chunk_repo = ChunkRepo(get_kb_chunk_model(kb))
return chunk_repo.get_document_chunks(session, doc_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -94,7 +94,7 @@ def remove_kb_document(
user: CurrentSuperuserDep,
kb_id: int,
document_id: int,
):
) -> RebuildIndexResult:
try:
kb = knowledge_base_repo.must_get(session, kb_id)
doc = document_repo.must_get(session, document_id)
Expand Down Expand Up @@ -124,10 +124,99 @@ def remove_kb_document(
stats_for_knowledge_base.delay(kb_id)

return {"detail": "success"}
except KBNotFound as e:
raise e
except DocumentNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to remove document #{document_id}: {e}")
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/reindex")
def rebuild_kb_documents_index(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
):
try:
return rebuild_kb_document_index_by_ids(
session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex")
def rebuild_kb_document_index(
db_session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
doc_id: int,
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
try:
document_ids = [doc_id]
return rebuild_kb_document_index_by_ids(
db_session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


def rebuild_kb_document_index_by_ids(
db_session: Session,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
kb = knowledge_base_repo.must_get(db_session, kb_id)
kb_chunk_repo = ChunkRepo(get_kb_chunk_model(kb))

# Retry failed vector index tasks.
documents = document_repo.fetch_by_ids(db_session, document_ids)
reindex_document_ids = []
ignore_document_ids = []

for doc in documents:
# TODO: check NOT_STARTED, PENDING, RUNNING
if doc.index_status != DocIndexTaskStatus.FAILED and not reindex_completed_task:
ignore_document_ids.append(doc.id)
else:
reindex_document_ids.append(doc.id)

doc.index_status = DocIndexTaskStatus.PENDING
db_session.add(doc)
db_session.commit()

build_index_for_document.delay(kb.id, doc.id)

# Retry failed kg index tasks.
chunks: list[Chunk] = kb_chunk_repo.fetch_by_document_ids(db_session, document_ids)
reindex_chunk_ids = []
ignore_chunk_ids = []
for chunk in chunks:
if chunk.index_status == KgIndexStatus.COMPLETED and not reindex_completed_task:
ignore_chunk_ids.append(chunk.id)
continue
else:
reindex_chunk_ids.append(chunk.id)

chunk.index_status = KgIndexStatus.PENDING
db_session.add(chunk)
db_session.commit()

build_kg_index_for_chunk.delay(kb.id, chunk.id)

return RebuildIndexResult(
reindex_document_ids=reindex_document_ids,
ignore_document_ids=ignore_document_ids,
reindex_chunk_ids=reindex_chunk_ids,
ignore_chunk_ids=ignore_chunk_ids,
)
68 changes: 30 additions & 38 deletions backend/app/api/admin_routes/knowledge_base/routes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import logging

from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Params, Page

from app.api.deps import SessionDep, CurrentSuperuserDep
from app.rag.knowledge_base.index_store import (
init_kb_tidb_vector_store,
init_kb_tidb_graph_store,
)
from app.repositories.embedding_model import embed_model_repo
from app.repositories.llm import llm_repo

from .models import (
KnowledgeBaseDetail,
KnowledgeBaseItem,
Expand All @@ -18,25 +15,24 @@
VectorIndexError,
KGIndexError,
)
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.exceptions import (
InternalServerError,
KBException,
KBNotFound,
KBNoVectorIndexConfigured,
DefaultLLMNotFound,
DefaultEmbeddingModelNotFound,
KBIsUsedByChatEngines,
)
from app.models import (
DataSource,
KnowledgeBase,
)
from app.models.data_source import DataSource
from app.repositories import (
embed_model_repo,
llm_repo,
data_source_repo,
knowledge_base_repo,
)
from app.tasks import (
build_kg_index_for_chunk,
build_index_for_document,
)
from app.repositories import knowledge_base_repo, data_source_repo
from app.tasks.knowledge_base import (
import_documents_for_knowledge_base,
stats_for_knowledge_base,
Expand Down Expand Up @@ -93,12 +89,8 @@ def create_knowledge_base(
import_documents_for_knowledge_base.delay(knowledge_base.id)

return knowledge_base
except KBNoVectorIndexConfigured as e:
raise e
except DefaultLLMNotFound as e:
raise e
except DefaultEmbeddingModelNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -121,8 +113,8 @@ def get_knowledge_base(
) -> KnowledgeBaseDetail:
try:
return knowledge_base_repo.must_get(session, knowledge_base_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -139,10 +131,8 @@ def update_knowledge_base_setting(
knowledge_base = knowledge_base_repo.must_get(session, knowledge_base_id)
knowledge_base = knowledge_base_repo.update(session, knowledge_base, update)
return knowledge_base
except KBNotFound as e:
raise e
except KBNoVectorIndexConfigured as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -155,8 +145,8 @@ def list_kb_linked_chat_engines(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_linked_chat_engines(session, kb.id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -181,8 +171,8 @@ def delete_knowledge_base(session: SessionDep, user: CurrentSuperuserDep, kb_id:
purge_knowledge_base_related_resources.apply_async(args=[kb_id], countdown=5)

return {"detail": f"Knowledge base #{kb_id} is deleted successfully"}
except KBException as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -200,8 +190,8 @@ def get_knowledge_base_index_overview(
stats_for_knowledge_base.delay(knowledge_base.id)

return knowledge_base_repo.get_index_overview(session, knowledge_base)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -217,8 +207,8 @@ def list_kb_vector_index_errors(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_vector_index_built_errors(session, kb, params)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -234,8 +224,8 @@ def list_kb_kg_index_errors(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_kg_index_built_errors(session, kb, params)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand Down Expand Up @@ -267,10 +257,12 @@ def retry_failed_tasks(
)

return {
"detail": f"Triggered reindex {len(document_ids)} documents and {len(chunk_ids)} chunks of knowledge base #{kb_id}."
"detail": f"Triggered reindex {len(document_ids)} documents and {len(chunk_ids)} chunks of knowledge base #{kb_id}.",
"reindex_document_ids": document_ids,
"reindex_chunk_ids": chunk_ids,
}
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
4 changes: 1 addition & 3 deletions backend/app/rag/chat/retrieve/retrieve_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ def search_relevant_chunks(self, user_question: str) -> List[NodeWithScore]:

def get_documents_from_nodes(self, nodes: List[NodeWithScore]) -> List[DBDocument]:
document_ids = [n.node.metadata["document_id"] for n in nodes]
documents = document_repo.list_full_documents_by_ids(
self.db_session, document_ids
)
documents = document_repo.fetch_by_ids(self.db_session, document_ids)
# Keep the original order of document ids, which is sorted by similarity.
return sorted(documents, key=lambda x: document_ids.index(x.id))

Expand Down
Loading

0 comments on commit ab44f20

Please sign in to comment.