Skip to content

Commit

Permalink
feat: support single document reindex (pingcap#622)
Browse files Browse the repository at this point in the history
part of pingcap#580

Add new API: `/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex`

<img width="510" alt="image"
src="https://github.com/user-attachments/assets/895c6510-1905-4948-a658-fd653786a89e"
/>
  • Loading branch information
Mini256 authored and NG85 committed Feb 18, 2025
1 parent b66d888 commit 0bc81a6
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 117 deletions.
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ migrate:

run_dev_server:
@echo "Running server..."
@rye run python main.py runserver
@rye run python main.py runserver --host 0.0.0.0 --port 5001

run_dev_celery_worker:
@echo "Running celery..."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -44,3 +45,10 @@ class DocumentItem(BaseModel):
last_modified_at: datetime
created_at: datetime
updated_at: datetime


class RebuildIndexResult(BaseModel):
reindex_document_ids: list[int] = Field(default_factory=list)
ignore_document_ids: list[int] = Field(default_factory=list)
reindex_chunk_ids: list[UUID] = Field(default_factory=list)
ignore_chunk_ids: list[UUID] = Field(default_factory=list)
127 changes: 108 additions & 19 deletions backend/app/api/admin_routes/knowledge_base/document/routes.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import logging
from typing import Annotated

from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi_pagination import Params, Page
from sqlmodel import Session

from app.api.admin_routes.knowledge_base.models import ChunkItem
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.models import Document
from app.models.chunk import get_kb_chunk_model
from app.models.chunk import Chunk, KgIndexStatus, get_kb_chunk_model
from app.models.document import DocIndexTaskStatus
from app.models.entity import get_kb_entity_model
from app.models.relationship import get_kb_relationship_model
from app.repositories import knowledge_base_repo, document_repo
from app.repositories.chunk import ChunkRepo
from app.api.admin_routes.knowledge_base.document.models import (
DocumentFilters,
DocumentItem,
RebuildIndexResult,
)
from app.exceptions import (
InternalServerError,
KBNotFound,
DocumentNotFound,
)
from app.exceptions import InternalServerError
from app.repositories.graph import GraphRepo
from app.tasks.build_index import build_index_for_document, build_kg_index_for_chunk
from app.tasks.knowledge_base import stats_for_knowledge_base


Expand All @@ -45,15 +45,15 @@ def list_kb_documents(
filters=filters,
params=params,
)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()


@router.get("/admin/knowledge_bases/{kb_id}/documents/{doc_id}")
def get_document_by_id(
def get_kb_document_by_id(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
Expand All @@ -63,8 +63,8 @@ def get_document_by_id(
document = document_repo.must_get(session, doc_id)
assert document.knowledge_base_id == kb_id
return document
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -81,8 +81,8 @@ def list_kb_document_chunks(
kb = knowledge_base_repo.must_get(session, kb_id)
chunk_repo = ChunkRepo(get_kb_chunk_model(kb))
return chunk_repo.get_document_chunks(session, doc_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -94,7 +94,7 @@ def remove_kb_document(
user: CurrentSuperuserDep,
kb_id: int,
document_id: int,
):
) -> RebuildIndexResult:
try:
kb = knowledge_base_repo.must_get(session, kb_id)
doc = document_repo.must_get(session, document_id)
Expand Down Expand Up @@ -124,10 +124,99 @@ def remove_kb_document(
stats_for_knowledge_base.delay(kb_id)

return {"detail": "success"}
except KBNotFound as e:
raise e
except DocumentNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to remove document #{document_id}: {e}")
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/reindex")
def rebuild_kb_documents_index(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
):
try:
return rebuild_kb_document_index_by_ids(
session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex")
def rebuild_kb_document_index(
db_session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
doc_id: int,
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
try:
document_ids = [doc_id]
return rebuild_kb_document_index_by_ids(
db_session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


def rebuild_kb_document_index_by_ids(
db_session: Session,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
kb = knowledge_base_repo.must_get(db_session, kb_id)
kb_chunk_repo = ChunkRepo(get_kb_chunk_model(kb))

# Retry failed vector index tasks.
documents = document_repo.fetch_by_ids(db_session, document_ids)
reindex_document_ids = []
ignore_document_ids = []

for doc in documents:
# TODO: check NOT_STARTED, PENDING, RUNNING
if doc.index_status != DocIndexTaskStatus.FAILED and not reindex_completed_task:
ignore_document_ids.append(doc.id)
else:
reindex_document_ids.append(doc.id)

doc.index_status = DocIndexTaskStatus.PENDING
db_session.add(doc)
db_session.commit()

build_index_for_document.delay(kb.id, doc.id)

# Retry failed kg index tasks.
chunks: list[Chunk] = kb_chunk_repo.fetch_by_document_ids(db_session, document_ids)
reindex_chunk_ids = []
ignore_chunk_ids = []
for chunk in chunks:
if chunk.index_status == KgIndexStatus.COMPLETED and not reindex_completed_task:
ignore_chunk_ids.append(chunk.id)
continue
else:
reindex_chunk_ids.append(chunk.id)

chunk.index_status = KgIndexStatus.PENDING
db_session.add(chunk)
db_session.commit()

build_kg_index_for_chunk.delay(kb.id, chunk.id)

return RebuildIndexResult(
reindex_document_ids=reindex_document_ids,
ignore_document_ids=ignore_document_ids,
reindex_chunk_ids=reindex_chunk_ids,
ignore_chunk_ids=ignore_chunk_ids,
)
Loading

0 comments on commit 0bc81a6

Please sign in to comment.