Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d3322bc
fix(kb): scope chunk_count to current KB in update_kb_stats
lxfight Jun 4, 2026
7bb50a9
fix(kb): unify sparse retrieval score direction to lower-is-better
lxfight Jun 4, 2026
70e6e13
fix(kb): rollback orphan vectors when metadata save fails after upload
lxfight Jun 4, 2026
2b9e7d2
fix(sidebar): prevent memory leak in upload_tasks/upload_progress dicts
lxfight Jun 4, 2026
d921fe9
feat(kb): add batch delete documents API
lxfight Jun 4, 2026
6a9c6b7
fix(kb): cap BM25 fallback at 10K chunks to prevent OOM
lxfight Jun 4, 2026
21546f8
fix(kb): offload faiss.write_index to thread via asyncio.to_thread
lxfight Jun 4, 2026
f811403
fix(kb): switch FAISS from L2 to IP cosine similarity with write lock
lxfight Jun 5, 2026
3ca771a
fix(kb): correct BM25 top_k_sparse aggregation across multiple KBs
lxfight Jun 5, 2026
65ed3fc
perf(kb): add kb_id generated column and parallelize dense retrieval
lxfight Jun 5, 2026
6a331c5
feat(kb): add HNSW index support and thread-safe LRU embedding cache
lxfight Jun 5, 2026
9ef28c5
fix(kb): preserve FAISS external IDs during L2→IP index migration
lxfight Jun 5, 2026
8f63a6f
fix(kb): return bool from vec_db.delete to signal whether chunk existed
lxfight Jun 5, 2026
c7593f9
fix(kb): serialize FTS5 rebuild, use VIRTUAL generated columns, retur…
lxfight Jun 5, 2026
ab904ac
fix(kb): add incremental index_type migration and guard document dele…
lxfight Jun 5, 2026
590e898
refactor(kb): clarify RRF variable names and simplify BM25 per-KB agg…
lxfight Jun 5, 2026
e1269ef
fix(kb): add rate-limiter lock, validate chunk/doc existence on delet…
lxfight Jun 5, 2026
d74fe03
fix(kb): cascade-delete KB records, add instance lock, name index, an…
lxfight Jun 5, 2026
7e3bfc8
fix(kb): improve dashboard error messages, rerank sentinel, and uploa…
lxfight Jun 5, 2026
7a642aa
test(kb): add coverage for kb_id-guarded deletion, vec-delete failure…
lxfight Jun 5, 2026
13a1243
feat(kb): add document search by name/type and remove default list_kb…
lxfight Jun 5, 2026
c40eb5a
feat(kb): support kb_ids in retrieve and validate KB options before m…
lxfight Jun 5, 2026
8d869f0
fix(kb): validate upload parameters, add file type/size checks, suppo…
lxfight Jun 5, 2026
f88a339
feat(dashboard): add document search, improved upload UX, and validat…
lxfight Jun 5, 2026
a361cde
feat(kb): upgrade knowledge base workflows
lxfight Jun 6, 2026
ecebc01
fix: back up faiss index before migration
lxfight Jun 6, 2026
590772d
feat: preserve markdown structures during chunking
lxfight Jun 6, 2026
9a63165
fix: surface failed knowledge base uploads
lxfight Jun 6, 2026
19c4487
style: improve knowledge base document detail layout
lxfight Jun 6, 2026
8d60a8e
style: simplify knowledge base overview layout
lxfight Jun 6, 2026
0e6e0ff
fix: clean up faiss batch insert failures
lxfight Jun 6, 2026
4c71934
fix: enforce kb document ownership
lxfight Jun 6, 2026
932e24f
fix: report partial kb task failures
lxfight Jun 6, 2026
dd7ceeb
test: consolidate knowledge base regression coverage
lxfight Jun 6, 2026
ed3fcb0
test: merge knowledge base test files
lxfight Jun 6, 2026
c2d9bdf
test: restore split knowledge base tests
lxfight Jun 6, 2026
87bc159
test: align kb import failure status expectation
lxfight Jun 6, 2026
3e9a670
fix(kb): re-raise RuntimeError in single-KB dense retrieval
lxfight Jun 8, 2026
080ed11
fix(kb): add path validation and atomic migration for FAISS index
lxfight Jun 8, 2026
ee21756
fix(kb): add timestamp null-safety and concurrency control
lxfight Jun 8, 2026
69b1e95
fix(kb): prevent deadlock in KB instance initialization retry
lxfight Jun 8, 2026
4a9e24c
fix(kb): improve error logging for dense retrieval exceptions
lxfight Jun 8, 2026
8ac5481
fix(kb): remove unused imports from kb_db_sqlite.py
lxfight Jun 8, 2026
fac2791
fix(kb): ensure atomic FAISS index writes to prevent corruption
lxfight Jun 8, 2026
989b975
fix(kb): handle concurrent duplicate document uploads gracefully
lxfight Jun 8, 2026
4fcb56b
test(kb): add comprehensive stability and concurrency tests
lxfight Jun 8, 2026
92c5c11
fix(test): update tests for atomic FAISS write and correct RetrievalM…
lxfight Jun 8, 2026
4eb8d58
fix(kb): improve error handling and cache consistency
lxfight Jun 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 249 additions & 58 deletions astrbot/core/db/vec_db/faiss_impl/document_storage.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import json
import os
from asyncio import Lock
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path

from sqlalchemy import Column, Text, bindparam
from sqlalchemy.dialects import sqlite
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
from sqlalchemy.schema import CreateTable
from sqlmodel import Field, MetaData, SQLModel, col, func, select, text

from astrbot.core import logger
from astrbot.core.knowledge_base.retrieval.tokenizer import (
build_fts5_or_query,
load_stopwords,
to_fts5_search_text,
tokenize_text,
)

FTS_TABLE_NAME = "documents_fts"
Expand Down Expand Up @@ -55,50 +60,98 @@ def __init__(self, db_path: str) -> None:
self._fts_contentless_delete = False
self._fts_index_ready = False
self._stopwords: set[str] | None = None
self._fts_rebuild_lock = Lock()

async def initialize(self) -> None:
"""Initialize the SQLite database and create the documents table if it doesn't exist."""
await self.connect()
async with self.engine.begin() as conn: # type: ignore
# Create tables using SQLModel
await conn.run_sync(BaseDocModel.metadata.create_all)
await self._ensure_documents_table(conn)
await self._ensure_generated_columns(conn)

try:
await conn.execute(
text(
"ALTER TABLE documents ADD COLUMN kb_doc_id TEXT "
"GENERATED ALWAYS AS (json_extract(metadata, '$.kb_doc_id')) STORED",
),
)
await conn.execute(
text(
"ALTER TABLE documents ADD COLUMN user_id TEXT "
"GENERATED ALWAYS AS (json_extract(metadata, '$.user_id')) STORED",
),
)
await self._initialize_fts5(conn)
await conn.commit()

# Create indexes
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_kb_doc_id ON documents(kb_doc_id)",
),
)
await conn.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_documents_user_id ON documents(user_id)",
),
)
except BaseException:
pass
async def _table_columns(self, executor, table_name: str) -> set[str]:
result = await executor.execute(text(f"PRAGMA table_xinfo({table_name})"))
return {row[1] for row in result.fetchall()}

await conn.execute(
async def _ensure_generated_columns(self, executor) -> None:
generated_columns = {
"kb_doc_id": "json_extract(metadata, '$.kb_doc_id')",
"user_id": "json_extract(metadata, '$.user_id')",
"kb_id": "json_extract(metadata, '$.kb_id')",
}
columns = await self._table_columns(executor, "documents")
for column_name, expression in generated_columns.items():
if column_name in columns:
continue
await executor.execute(
text(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_documents_doc_id_unique ON documents(doc_id)",
f"ALTER TABLE documents ADD COLUMN {column_name} TEXT "
f"GENERATED ALWAYS AS ({expression}) VIRTUAL",
),
)
columns.add(column_name)

await self._initialize_fts5(conn)
await conn.commit()
index_statements = [
"CREATE INDEX IF NOT EXISTS idx_documents_kb_doc_id "
"ON documents(kb_doc_id)",
"CREATE INDEX IF NOT EXISTS idx_documents_user_id ON documents(user_id)",
"CREATE INDEX IF NOT EXISTS idx_documents_kb_id ON documents(kb_id)",
]
for statement in index_statements:
await executor.execute(text(statement))

async def _ensure_documents_table(self, executor) -> None:
"""Create the document table from the SQLModel definition."""
result = await executor.execute(
text(
"""
SELECT 1
FROM sqlite_master
WHERE type='table' AND name=:table_name
LIMIT 1
""",
),
{"table_name": Document.__tablename__},
)
if result.scalar_one_or_none() is not None:
await self._ensure_doc_id_unique_index(executor)
return

create_table = CreateTable(Document.__table__, if_not_exists=True) # type: ignore[attr-defined]

await executor.execute(
text(str(create_table.compile(dialect=sqlite.dialect())))
)
await self._ensure_doc_id_unique_index(executor)

async def _ensure_doc_id_unique_index(self, executor) -> None:
duplicate_result = await executor.execute(
text(
"""
SELECT doc_id
FROM documents
GROUP BY doc_id
HAVING COUNT(*) > 1
LIMIT 1
""",
),
)
if duplicate_result.scalar_one_or_none() is not None:
logger.warning(
"Skipping documents.doc_id unique index migration because duplicate "
f"doc_id values already exist in {self.db_path}.",
)
return

await executor.execute(
text(
"CREATE UNIQUE INDEX IF NOT EXISTS "
"idx_documents_doc_id_unique ON documents(doc_id)",
),
)

async def _initialize_fts5(self, executor) -> None:
try:
Expand Down Expand Up @@ -203,6 +256,7 @@ async def connect(self) -> None:
self.DATABASE_URL,
echo=False,
future=True,
poolclass=NullPool,
)
self.async_session_maker = sessionmaker(
self.engine, # type: ignore
Expand Down Expand Up @@ -255,11 +309,11 @@ async def get_documents(

async with self.get_session() as session:
query = select(Document)

for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

if ids is not None and len(ids) > 0:
valid_ids = [int(i) for i in ids if i != -1]
Expand Down Expand Up @@ -421,11 +475,11 @@ async def delete_documents(self, metadata_filters: dict) -> None:

async with self.get_session() as session, session.begin():
query = select(Document)

for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

result = await session.execute(query)
documents = result.scalars().all()
Expand All @@ -452,15 +506,144 @@ async def count_documents(self, metadata_filters: dict | None = None) -> int:
query = select(func.count(col(Document.id)))

if metadata_filters:
for key, val in metadata_filters.items():
query = query.where(
text(f"json_extract(metadata, '$.{key}') = :filter_{key}"),
).params(**{f"filter_{key}": val})
query = await self._apply_metadata_filters(
session,
query,
metadata_filters,
)

result = await session.execute(query)
count = result.scalar_one_or_none()
return count if count is not None else 0

async def search_documents(
self,
query_text: str,
metadata_filters: dict | None = None,
offset: int = 0,
limit: int = 100,
) -> tuple[list[dict], int] | None:
"""Search documents with FTS5 and optional metadata filters.

Returns None when FTS5 is unavailable so callers can choose whether to
fall back to an alternate search strategy.
"""
if limit <= 0:
return [], 0
if not await self.ensure_fts_index():
return None

match_query = build_fts5_or_query(tokenize_text(query_text, self.stopwords))
if not match_query:
return [], 0

metadata_filters = metadata_filters or {}
async with self.get_session() as session:
filters_sql, filter_params = await self._metadata_filter_sql(
session,
metadata_filters,
table_alias="d",
)
where_clause = f"{FTS_TABLE_NAME} MATCH :query"
if filters_sql:
where_clause = f"{where_clause} AND {' AND '.join(filters_sql)}"
params = {
"query": match_query,
"limit": int(limit),
"offset": int(offset),
**filter_params,
}
try:
count_result = await session.execute(
text(
f"""
SELECT count(*)
FROM {FTS_TABLE_NAME}
JOIN documents d ON d.id = {FTS_TABLE_NAME}.rowid
WHERE {where_clause}
""",
),
params,
)
total = int(count_result.scalar_one_or_none() or 0)
result = await session.execute(
text(
f"""
SELECT
d.id AS id,
d.doc_id AS doc_id,
d.text AS text,
d.metadata AS metadata,
d.created_at AS created_at,
d.updated_at AS updated_at,
bm25({FTS_TABLE_NAME}) AS score
FROM {FTS_TABLE_NAME}
JOIN documents d ON d.id = {FTS_TABLE_NAME}.rowid
WHERE {where_clause}
ORDER BY score ASC, d.id ASC
LIMIT :limit
OFFSET :offset
""",
),
params,
)
except Exception as e:
logger.warning(
f"FTS5 document search failed for {self.db_path}: {e}",
)
self.fts5_available = False
return None

rows = result.mappings().all()
return [
{
"id": row["id"],
"doc_id": row["doc_id"],
"text": row["text"],
"metadata": row["metadata"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"score": float(row["score"]),
}
for row in rows
], total

async def _apply_metadata_filters(
self,
session: AsyncSession,
query,
metadata_filters: dict,
):
filters_sql, params = await self._metadata_filter_sql(
session,
metadata_filters,
)
for filter_sql in filters_sql:
query = query.where(text(filter_sql))
if params:
query = query.params(**params)
return query

async def _metadata_filter_sql(
self,
session: AsyncSession,
metadata_filters: dict,
table_alias: str | None = None,
) -> tuple[list[str], dict]:
columns = await self._table_columns(session, "documents")
prefix = f"{table_alias}." if table_alias else ""
filters_sql = []
params = {}
for key, val in metadata_filters.items():
if key in {"kb_id", "kb_doc_id", "user_id"} and key in columns:
filters_sql.append(f"{prefix}{key} = :filter_{key}")
else:
filters_sql.append(
f"json_extract({prefix}metadata, '$.{key}') = :filter_{key}"
)
params[f"filter_{key}"] = val
return filters_sql, params

async def ensure_fts_index(self) -> bool:
"""Ensure the FTS5 sparse index exists and matches the documents table."""
if not self.fts5_available:
Expand All @@ -470,22 +653,30 @@ async def ensure_fts_index(self) -> bool:

assert self.engine is not None, "Database connection is not initialized."

async with self.get_session() as session:
doc_count = await self._count_documents_in_session(session)
fts_count = await self._count_fts_rows(session)
if doc_count == fts_count:
self._fts_index_ready = True
async with self._fts_rebuild_lock:
if self._fts_index_ready:
return True

logger.info(
f"Rebuilding FTS5 sparse index for {self.db_path}: "
f"documents={doc_count}, fts_rows={fts_count}",
)
await self.rebuild_fts_index()
return self.fts5_available
async with self.get_session() as session:
doc_count = await self._count_documents_in_session(session)
fts_count = await self._count_fts_rows(session)
if doc_count == fts_count:
self._fts_index_ready = True
return True

logger.info(
f"Rebuilding FTS5 sparse index for {self.db_path}: "
f"documents={doc_count}, fts_rows={fts_count}",
)
await self._rebuild_fts_index_unlocked()
return self.fts5_available

async def rebuild_fts_index(self) -> None:
"""Rebuild the contentless FTS5 sparse index from documents."""
async with self._fts_rebuild_lock:
await self._rebuild_fts_index_unlocked()

async def _rebuild_fts_index_unlocked(self) -> None:
if not self.fts5_available:
return

Expand Down Expand Up @@ -530,7 +721,7 @@ async def search_sparse(
sparse retrieval implementation.
"""
if limit <= 0:
return []
return None
if not await self.ensure_fts_index():
return None

Expand Down
Loading
Loading