Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support single document reindex #622

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ migrate:

run_dev_server:
@echo "Running server..."
@rye run python main.py runserver
@rye run python main.py runserver --host 0.0.0.0 --port 5001

run_dev_celery_worker:
@echo "Running celery..."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -44,3 +45,10 @@ class DocumentItem(BaseModel):
last_modified_at: datetime
created_at: datetime
updated_at: datetime


class RebuildIndexResult(BaseModel):
reindex_document_ids: list[int] = Field(default_factory=list)
ignore_document_ids: list[int] = Field(default_factory=list)
reindex_chunk_ids: list[UUID] = Field(default_factory=list)
ignore_chunk_ids: list[UUID] = Field(default_factory=list)
127 changes: 108 additions & 19 deletions backend/app/api/admin_routes/knowledge_base/document/routes.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import logging
from typing import Annotated

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi_pagination import Params, Page
from sqlmodel import Session

from app.api.admin_routes.knowledge_base.models import ChunkItem
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.models import Document
from app.models.chunk import get_kb_chunk_model
from app.models.chunk import Chunk, KgIndexStatus, get_kb_chunk_model
from app.models.document import DocIndexTaskStatus
from app.models.entity import get_kb_entity_model
from app.models.relationship import get_kb_relationship_model
from app.repositories import knowledge_base_repo, document_repo
from app.repositories.chunk import ChunkRepo
from app.api.admin_routes.knowledge_base.document.models import (
DocumentFilters,
DocumentItem,
RebuildIndexResult,
)
from app.exceptions import (
InternalServerError,
KBNotFound,
DocumentNotFound,
)
from app.exceptions import InternalServerError
from app.repositories.graph import GraphRepo
from app.tasks.build_index import build_index_for_document, build_kg_index_for_chunk
from app.tasks.knowledge_base import stats_for_knowledge_base


Expand All @@ -45,15 +45,15 @@ def list_kb_documents(
filters=filters,
params=params,
)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()


@router.get("/admin/knowledge_bases/{kb_id}/documents/{doc_id}")
def get_document_by_id(
def get_kb_document_by_id(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
Expand All @@ -63,8 +63,8 @@ def get_document_by_id(
document = document_repo.must_get(session, doc_id)
assert document.knowledge_base_id == kb_id
return document
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -81,8 +81,8 @@ def list_kb_document_chunks(
kb = knowledge_base_repo.must_get(session, kb_id)
chunk_repo = ChunkRepo(get_kb_chunk_model(kb))
return chunk_repo.get_document_chunks(session, doc_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -94,7 +94,7 @@ def remove_kb_document(
user: CurrentSuperuserDep,
kb_id: int,
document_id: int,
):
) -> RebuildIndexResult:
try:
kb = knowledge_base_repo.must_get(session, kb_id)
doc = document_repo.must_get(session, document_id)
Expand Down Expand Up @@ -124,10 +124,99 @@ def remove_kb_document(
stats_for_knowledge_base.delay(kb_id)

return {"detail": "success"}
except KBNotFound as e:
raise e
except DocumentNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to remove document #{document_id}: {e}")
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/reindex")
def rebuild_kb_documents_index(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
):
try:
return rebuild_kb_document_index_by_ids(
session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex")
def rebuild_kb_document_index(
db_session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
doc_id: int,
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
try:
document_ids = [doc_id]
return rebuild_kb_document_index_by_ids(
db_session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


def rebuild_kb_document_index_by_ids(
db_session: Session,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
kb = knowledge_base_repo.must_get(db_session, kb_id)
kb_chunk_repo = ChunkRepo(get_kb_chunk_model(kb))

# Retry failed vector index tasks.
documents = document_repo.fetch_by_ids(db_session, document_ids)
reindex_document_ids = []
ignore_document_ids = []

for doc in documents:
# TODO: check NOT_STARTED, PENDING, RUNNING
if doc.index_status != DocIndexTaskStatus.FAILED and not reindex_completed_task:
ignore_document_ids.append(doc.id)
else:
reindex_document_ids.append(doc.id)

doc.index_status = DocIndexTaskStatus.PENDING
db_session.add(doc)
db_session.commit()

build_index_for_document.delay(kb.id, doc.id)

# Retry failed kg index tasks.
chunks: list[Chunk] = kb_chunk_repo.fetch_by_document_ids(db_session, document_ids)
reindex_chunk_ids = []
ignore_chunk_ids = []
for chunk in chunks:
if chunk.index_status == KgIndexStatus.COMPLETED and not reindex_completed_task:
ignore_chunk_ids.append(chunk.id)
continue
else:
reindex_chunk_ids.append(chunk.id)

chunk.index_status = KgIndexStatus.PENDING
db_session.add(chunk)
db_session.commit()

build_kg_index_for_chunk.delay(kb.id, chunk.id)

return RebuildIndexResult(
reindex_document_ids=reindex_document_ids,
ignore_document_ids=ignore_document_ids,
reindex_chunk_ids=reindex_chunk_ids,
ignore_chunk_ids=ignore_chunk_ids,
)
68 changes: 30 additions & 38 deletions backend/app/api/admin_routes/knowledge_base/routes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import logging

from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from fastapi_pagination import Params, Page

from app.api.deps import SessionDep, CurrentSuperuserDep
from app.rag.knowledge_base.index_store import (
init_kb_tidb_vector_store,
init_kb_tidb_graph_store,
)
from app.repositories.embedding_model import embed_model_repo
from app.repositories.llm import llm_repo

from .models import (
KnowledgeBaseDetail,
KnowledgeBaseItem,
Expand All @@ -18,25 +15,24 @@
VectorIndexError,
KGIndexError,
)
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.exceptions import (
InternalServerError,
KBException,
KBNotFound,
KBNoVectorIndexConfigured,
DefaultLLMNotFound,
DefaultEmbeddingModelNotFound,
KBIsUsedByChatEngines,
)
from app.models import (
DataSource,
KnowledgeBase,
)
from app.models.data_source import DataSource
from app.repositories import (
embed_model_repo,
llm_repo,
data_source_repo,
knowledge_base_repo,
)
from app.tasks import (
build_kg_index_for_chunk,
build_index_for_document,
)
from app.repositories import knowledge_base_repo, data_source_repo
from app.tasks.knowledge_base import (
import_documents_for_knowledge_base,
stats_for_knowledge_base,
Expand Down Expand Up @@ -93,12 +89,8 @@ def create_knowledge_base(
import_documents_for_knowledge_base.delay(knowledge_base.id)

return knowledge_base
except KBNoVectorIndexConfigured as e:
raise e
except DefaultLLMNotFound as e:
raise e
except DefaultEmbeddingModelNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -121,8 +113,8 @@ def get_knowledge_base(
) -> KnowledgeBaseDetail:
try:
return knowledge_base_repo.must_get(session, knowledge_base_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -139,10 +131,8 @@ def update_knowledge_base_setting(
knowledge_base = knowledge_base_repo.must_get(session, knowledge_base_id)
knowledge_base = knowledge_base_repo.update(session, knowledge_base, update)
return knowledge_base
except KBNotFound as e:
raise e
except KBNoVectorIndexConfigured as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -155,8 +145,8 @@ def list_kb_linked_chat_engines(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_linked_chat_engines(session, kb.id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -181,8 +171,8 @@ def delete_knowledge_base(session: SessionDep, user: CurrentSuperuserDep, kb_id:
purge_knowledge_base_related_resources.apply_async(args=[kb_id], countdown=5)

return {"detail": f"Knowledge base #{kb_id} is deleted successfully"}
except KBException as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -200,8 +190,8 @@ def get_knowledge_base_index_overview(
stats_for_knowledge_base.delay(knowledge_base.id)

return knowledge_base_repo.get_index_overview(session, knowledge_base)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -217,8 +207,8 @@ def list_kb_vector_index_errors(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_vector_index_built_errors(session, kb, params)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -234,8 +224,8 @@ def list_kb_kg_index_errors(
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return knowledge_base_repo.list_kg_index_built_errors(session, kb, params)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand Down Expand Up @@ -267,10 +257,12 @@ def retry_failed_tasks(
)

return {
"detail": f"Triggered reindex {len(document_ids)} documents and {len(chunk_ids)} chunks of knowledge base #{kb_id}."
"detail": f"Triggered reindex {len(document_ids)} documents and {len(chunk_ids)} chunks of knowledge base #{kb_id}.",
"reindex_document_ids": document_ids,
"reindex_chunk_ids": chunk_ids,
}
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
4 changes: 1 addition & 3 deletions backend/app/rag/chat/retrieve/retrieve_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ def search_relevant_chunks(self, user_question: str) -> List[NodeWithScore]:

def get_documents_from_nodes(self, nodes: List[NodeWithScore]) -> List[DBDocument]:
document_ids = [n.node.metadata["document_id"] for n in nodes]
documents = document_repo.list_full_documents_by_ids(
self.db_session, document_ids
)
documents = document_repo.fetch_by_ids(self.db_session, document_ids)
# Keep the original order of document ids, which is sorted by similarity.
return sorted(documents, key=lambda x: document_ids.index(x.id))

Expand Down
Loading
Loading