Skip to content

Commit 069de50

Browse files
committed
fix: concurrency issues when uploading large number of files
* moved execution of archive extraction to a separate thread * batched text extraction in smaller chunks. This became blocking when doing it for a larger number of files due to synchronous encoding detection done by charset_normalizer
1 parent 5fcf846 commit 069de50

File tree

11 files changed

+103
-28
lines changed

11 files changed

+103
-28
lines changed

e2e/health.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
3+
echo "Starting health check polling for localhost:8000/healthz"
4+
echo "Press Ctrl+C to stop"
5+
echo "----------------------------------------"
6+
7+
while true; do
8+
timestamp=$(date +"%Y-%m-%d %H:%M:%S")
9+
response=$(curl -s -o /dev/null -w "%{http_code}" \
10+
--connect-timeout 1 \
11+
--max-time 2 \
12+
https://a10test.jitsi.net/health)
13+
14+
if [ "$response" = "200" ]; then
15+
echo "[$timestamp] Health check: OK (Status: $response)"
16+
else
17+
echo "[$timestamp] Health check: FAILED (Status: $response)"
18+
fi
19+
20+
sleep 1
21+
done

poetry.lock

Lines changed: 33 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ fastapi = "0.115.11"
4040
fastapi-versionizer = "4.0.1"
4141
faster-whisper = "1.1.1"
4242
flashrank = "^0.2.10"
43-
kreuzberg = "^2.1.2"
43+
kreuzberg = "3.1.1"
4444
langchain = "0.3.17"
4545
langchain-community = "^0.3.16"
4646
langchain-huggingface = "^0.1.2"

requirements-vllm.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ googleapis-common-protos==1.69.2 ; python_version >= "3.11" and python_version <
5555
greenlet==3.1.1 ; python_version >= "3.11" and python_version < "3.12" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32")
5656
grpcio==1.71.0 ; python_version >= "3.11" and python_version < "3.12"
5757
h11==0.14.0 ; python_version >= "3.11" and python_version < "3.12"
58-
html-to-markdown==1.2.0 ; python_version >= "3.11" and python_version < "3.12"
58+
html-to-markdown==1.3.0 ; python_version >= "3.11" and python_version < "3.12"
5959
httpcore==1.0.7 ; python_version >= "3.11" and python_version < "3.12"
6060
httptools==0.6.4 ; python_version >= "3.11" and python_version < "3.12"
6161
httpx-sse==0.4.0 ; python_version >= "3.11" and python_version < "3.12"
@@ -73,7 +73,7 @@ jsonpatch==1.33 ; python_version >= "3.11" and python_version < "3.12"
7373
jsonpointer==3.0.0 ; python_version >= "3.11" and python_version < "3.12"
7474
jsonschema-specifications==2024.10.1 ; python_version >= "3.11" and python_version < "3.12"
7575
jsonschema==4.23.0 ; python_version >= "3.11" and python_version < "3.12"
76-
kreuzberg==2.1.2 ; python_version >= "3.11" and python_version < "3.12"
76+
kreuzberg==3.1.1 ; python_version >= "3.11" and python_version < "3.12"
7777
langchain-community==0.3.16 ; python_version >= "3.11" and python_version < "3.12"
7878
langchain-core==0.3.48 ; python_version >= "3.11" and python_version < "3.12"
7979
langchain-huggingface==0.1.2 ; python_version >= "3.11" and python_version < "3.12"
@@ -123,6 +123,7 @@ packaging==24.2 ; python_version >= "3.11" and python_version < "3.12"
123123
partial-json-parser==0.2.1.1.post5 ; python_version >= "3.11" and python_version < "3.12"
124124
pillow==11.1.0 ; python_version >= "3.11" and python_version < "3.12"
125125
platformdirs==4.3.7 ; python_version >= "3.11" and python_version < "3.12"
126+
playa-pdf==0.4.1 ; python_version >= "3.11" and python_version < "3.12"
126127
prometheus-client==0.21.0 ; python_version >= "3.11" and python_version < "3.12"
127128
prometheus-fastapi-instrumentator==7.0.0 ; python_version >= "3.11" and python_version < "3.12"
128129
propcache==0.3.0 ; python_version >= "3.11" and python_version < "3.12"

requirements.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ frozenlist==1.5.0 ; python_version >= "3.11" and python_version < "3.12"
3838
fsspec==2024.12.0 ; python_version >= "3.11" and python_version < "3.12"
3939
greenlet==3.1.1 ; python_version >= "3.11" and python_version < "3.12" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32")
4040
h11==0.14.0 ; python_version >= "3.11" and python_version < "3.12"
41-
html-to-markdown==1.2.0 ; python_version >= "3.11" and python_version < "3.12"
41+
html-to-markdown==1.3.0 ; python_version >= "3.11" and python_version < "3.12"
4242
httpcore==1.0.7 ; python_version >= "3.11" and python_version < "3.12"
4343
httpx-sse==0.4.0 ; python_version >= "3.11" and python_version < "3.12"
4444
httpx==0.28.1 ; python_version >= "3.11" and python_version < "3.12"
@@ -51,7 +51,7 @@ jmespath==1.0.1 ; python_version >= "3.11" and python_version < "3.12"
5151
joblib==1.4.2 ; python_version >= "3.11" and python_version < "3.12"
5252
jsonpatch==1.33 ; python_version >= "3.11" and python_version < "3.12"
5353
jsonpointer==3.0.0 ; python_version >= "3.11" and python_version < "3.12"
54-
kreuzberg==2.1.2 ; python_version >= "3.11" and python_version < "3.12"
54+
kreuzberg==3.1.1 ; python_version >= "3.11" and python_version < "3.12"
5555
langchain-community==0.3.16 ; python_version >= "3.11" and python_version < "3.12"
5656
langchain-core==0.3.48 ; python_version >= "3.11" and python_version < "3.12"
5757
langchain-huggingface==0.1.2 ; python_version >= "3.11" and python_version < "3.12"
@@ -86,6 +86,7 @@ openai==1.68.2 ; python_version >= "3.11" and python_version < "3.12"
8686
orjson==3.10.16 ; python_version >= "3.11" and python_version < "3.12" and platform_python_implementation != "PyPy"
8787
packaging==24.2 ; python_version >= "3.11" and python_version < "3.12"
8888
pillow==11.1.0 ; python_version >= "3.11" and python_version < "3.12"
89+
playa-pdf==0.4.1 ; python_version >= "3.11" and python_version < "3.12"
8990
prometheus-client==0.21.0 ; python_version >= "3.11" and python_version < "3.12"
9091
prometheus-fastapi-instrumentator==7.0.0 ; python_version >= "3.11" and python_version < "3.12"
9192
propcache==0.3.0 ; python_version >= "3.11" and python_version < "3.12"

skynet/logs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
# Suppress some logs from uvicorn
1111
class AccessLogSuppressor(Filter):
12-
exclude_paths = ('/favicon.ico', '/healthz', '/metrics')
12+
exclude_paths = ('/favicon.ico', '/metrics')
1313

1414
def filter(self, record: LogRecord) -> bool:
1515
log_msg = record.getMessage()

skynet/modules/ttt/rag/stores/faiss.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def create(self, store_id, documents: list[Document]):
6060
return None
6161

6262
start = time.perf_counter_ns()
63-
index = faiss.IndexFlatL2(len(self.embedding.embed_query(store_id)))
63+
index = faiss.IndexFlatL2(len(await self.embedding.aembed_query(store_id)))
6464
vector_store = FAISS(
6565
embedding_function=self.embedding,
6666
distance_strategy=DistanceStrategy.COSINE, # better for full-text search
@@ -72,7 +72,7 @@ async def create(self, store_id, documents: list[Document]):
7272

7373
uuids = [str(uuid4()) for _ in range(len(documents))]
7474

75-
batch_size = 10 # this is purely for logging the progress
75+
batch_size = 100 # this is purely for logging the progress
7676
for i in range(0, len(documents), batch_size):
7777
await vector_store.aadd_documents(documents=documents[i : i + batch_size], ids=uuids[i : i + batch_size])
7878
log.info(f'Embeddings for {store_id} progress: {i + batch_size} / {len(documents)} documents')

skynet/modules/ttt/rag/text_extractor/main.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import asyncio
2+
3+
from multiprocessing import cpu_count
14
from pathlib import Path
25

36
from kreuzberg import batch_extract_file
@@ -8,6 +11,9 @@
811

912
log = get_logger(__name__)
1013

14+
MAX_CONCURRENT_PROCESSES = max(1, cpu_count() - 1) # Leave one core free for other tasks
15+
cpu_semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)
16+
1117

1218
async def extract(files: list[str]) -> list[Document]:
1319
"""
@@ -16,10 +22,19 @@ async def extract(files: list[str]) -> list[Document]:
1622

1723
documents = []
1824

19-
results = await batch_extract_file(files)
25+
# Process files in smaller chunks to prevent long blocking operations
26+
chunk_size = 10 # Adjust based on performance testing
27+
for i in range(0, len(files), chunk_size):
28+
files_chunk = files[i : i + chunk_size]
29+
30+
async with cpu_semaphore:
31+
chunk_results = await batch_extract_file(files_chunk)
32+
33+
for file, result in zip(files_chunk, chunk_results):
34+
documents.append(Document(result.content, metadata={'source': Path(file).name}))
2035

21-
for file, result in zip(files, results):
22-
documents.append(Document(result.content, metadata={'source': Path(file).name}))
36+
# Yield control back to the event loop to allow healthchecks to run
37+
await asyncio.sleep(0.01)
2338

2439
splits = split_documents(documents)
2540

skynet/modules/ttt/rag/vector_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ async def workflow(self, store_id: str, files: list[str], urls: list[str], max_d
126126
zip_files = [f for f in files if f.endswith('.zip')]
127127
if zip_files:
128128
files = [f for f in files if f not in zip_files]
129-
files.extend(extract_files(zip_files, self.get_temp_folder(store_id), min_size_kb=1))
129+
files.extend(await extract_files(zip_files, self.get_temp_folder(store_id), min_size_kb=1))
130130

131131
files = [f for f in files if any(f.endswith(ext) for ext in supported_files)]
132132

skynet/modules/ttt/rag/zip_extractor/main.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import asyncio
12
from pathlib import Path
23
from shutil import unpack_archive
34
from typing import Optional
45

56

6-
def extract_files(compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None) -> list[str]:
7-
"""
8-
Extract all files from the given list of compressed files.
9-
"""
10-
7+
def _extract_files_sync(
8+
compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None
9+
) -> list[str]:
10+
"""Synchronous implementation of file extraction"""
1111
extracted_files = []
1212

1313
for compressed_file in compressed_files:
@@ -24,3 +24,13 @@ def extract_files(compressed_files: list[str], destination_folder: str, min_size
2424
extracted_files = [f for f in extracted_files if Path(f).stat().st_size >= min_size_kb * 1024]
2525

2626
return extracted_files
27+
28+
29+
async def extract_files(
30+
compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None
31+
) -> list[str]:
32+
"""
33+
Extract all files from the given list of compressed files.
34+
Runs the entire extraction process in a separate thread.
35+
"""
36+
return await asyncio.to_thread(_extract_files_sync, compressed_files, destination_folder, min_size_kb)

0 commit comments

Comments
 (0)