Skip to content

fix: concurrency issues when uploading large number of files #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions e2e/health.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

echo "Starting health check polling for localhost:8001/healthz"
echo "Press Ctrl+C to stop"
echo "----------------------------------------"

while true; do
timestamp=$(date +"%Y-%m-%d %H:%M:%S")
response=$(curl -s -o /dev/null -w "%{http_code}" \
--connect-timeout 1 \
--max-time 2 \
localhost:8001/healthz)

if [ "$response" = "200" ]; then
echo "[$timestamp] Health check: OK (Status: $response)"
else
echo "[$timestamp] Health check: FAILED (Status: $response)"
fi

sleep 1
done
42 changes: 33 additions & 9 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fastapi = "0.115.11"
fastapi-versionizer = "4.0.1"
faster-whisper = "1.1.1"
flashrank = "^0.2.10"
kreuzberg = "^2.1.2"
kreuzberg = "3.1.1"
langchain = "0.3.17"
langchain-community = "^0.3.16"
langchain-huggingface = "^0.1.2"
Expand Down
5 changes: 3 additions & 2 deletions requirements-vllm.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ googleapis-common-protos==1.69.2 ; python_version >= "3.11" and python_version <
greenlet==3.1.1 ; python_version >= "3.11" and python_version < "3.12" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32")
grpcio==1.71.0 ; python_version >= "3.11" and python_version < "3.12"
h11==0.14.0 ; python_version >= "3.11" and python_version < "3.12"
html-to-markdown==1.2.0 ; python_version >= "3.11" and python_version < "3.12"
html-to-markdown==1.3.0 ; python_version >= "3.11" and python_version < "3.12"
httpcore==1.0.7 ; python_version >= "3.11" and python_version < "3.12"
httptools==0.6.4 ; python_version >= "3.11" and python_version < "3.12"
httpx-sse==0.4.0 ; python_version >= "3.11" and python_version < "3.12"
Expand All @@ -73,7 +73,7 @@ jsonpatch==1.33 ; python_version >= "3.11" and python_version < "3.12"
jsonpointer==3.0.0 ; python_version >= "3.11" and python_version < "3.12"
jsonschema-specifications==2024.10.1 ; python_version >= "3.11" and python_version < "3.12"
jsonschema==4.23.0 ; python_version >= "3.11" and python_version < "3.12"
kreuzberg==2.1.2 ; python_version >= "3.11" and python_version < "3.12"
kreuzberg==3.1.1 ; python_version >= "3.11" and python_version < "3.12"
langchain-community==0.3.16 ; python_version >= "3.11" and python_version < "3.12"
langchain-core==0.3.48 ; python_version >= "3.11" and python_version < "3.12"
langchain-huggingface==0.1.2 ; python_version >= "3.11" and python_version < "3.12"
Expand Down Expand Up @@ -123,6 +123,7 @@ packaging==24.2 ; python_version >= "3.11" and python_version < "3.12"
partial-json-parser==0.2.1.1.post5 ; python_version >= "3.11" and python_version < "3.12"
pillow==11.1.0 ; python_version >= "3.11" and python_version < "3.12"
platformdirs==4.3.7 ; python_version >= "3.11" and python_version < "3.12"
playa-pdf==0.4.1 ; python_version >= "3.11" and python_version < "3.12"
prometheus-client==0.21.0 ; python_version >= "3.11" and python_version < "3.12"
prometheus-fastapi-instrumentator==7.0.0 ; python_version >= "3.11" and python_version < "3.12"
propcache==0.3.0 ; python_version >= "3.11" and python_version < "3.12"
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ frozenlist==1.5.0 ; python_version >= "3.11" and python_version < "3.12"
fsspec==2024.12.0 ; python_version >= "3.11" and python_version < "3.12"
greenlet==3.1.1 ; python_version >= "3.11" and python_version < "3.12" and (platform_machine == "aarch64" or platform_machine == "ppc64le" or platform_machine == "x86_64" or platform_machine == "amd64" or platform_machine == "AMD64" or platform_machine == "win32" or platform_machine == "WIN32")
h11==0.14.0 ; python_version >= "3.11" and python_version < "3.12"
html-to-markdown==1.2.0 ; python_version >= "3.11" and python_version < "3.12"
html-to-markdown==1.3.0 ; python_version >= "3.11" and python_version < "3.12"
httpcore==1.0.7 ; python_version >= "3.11" and python_version < "3.12"
httpx-sse==0.4.0 ; python_version >= "3.11" and python_version < "3.12"
httpx==0.28.1 ; python_version >= "3.11" and python_version < "3.12"
Expand All @@ -51,7 +51,7 @@ jmespath==1.0.1 ; python_version >= "3.11" and python_version < "3.12"
joblib==1.4.2 ; python_version >= "3.11" and python_version < "3.12"
jsonpatch==1.33 ; python_version >= "3.11" and python_version < "3.12"
jsonpointer==3.0.0 ; python_version >= "3.11" and python_version < "3.12"
kreuzberg==2.1.2 ; python_version >= "3.11" and python_version < "3.12"
kreuzberg==3.1.1 ; python_version >= "3.11" and python_version < "3.12"
langchain-community==0.3.16 ; python_version >= "3.11" and python_version < "3.12"
langchain-core==0.3.48 ; python_version >= "3.11" and python_version < "3.12"
langchain-huggingface==0.1.2 ; python_version >= "3.11" and python_version < "3.12"
Expand Down Expand Up @@ -86,6 +86,7 @@ openai==1.68.2 ; python_version >= "3.11" and python_version < "3.12"
orjson==3.10.16 ; python_version >= "3.11" and python_version < "3.12" and platform_python_implementation != "PyPy"
packaging==24.2 ; python_version >= "3.11" and python_version < "3.12"
pillow==11.1.0 ; python_version >= "3.11" and python_version < "3.12"
playa-pdf==0.4.1 ; python_version >= "3.11" and python_version < "3.12"
prometheus-client==0.21.0 ; python_version >= "3.11" and python_version < "3.12"
prometheus-fastapi-instrumentator==7.0.0 ; python_version >= "3.11" and python_version < "3.12"
propcache==0.3.0 ; python_version >= "3.11" and python_version < "3.12"
Expand Down
2 changes: 1 addition & 1 deletion skynet/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# Suppress some logs from uvicorn
class AccessLogSuppressor(Filter):
exclude_paths = ('/favicon.ico', '/healthz', '/metrics')
exclude_paths = ('/favicon.ico', '/metrics')

def filter(self, record: LogRecord) -> bool:
log_msg = record.getMessage()
Expand Down
4 changes: 2 additions & 2 deletions skynet/modules/ttt/rag/stores/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def create(self, store_id, documents: list[Document]):
return None

start = time.perf_counter_ns()
index = faiss.IndexFlatL2(len(self.embedding.embed_query(store_id)))
index = faiss.IndexFlatL2(len(await self.embedding.aembed_query(store_id)))
vector_store = FAISS(
embedding_function=self.embedding,
distance_strategy=DistanceStrategy.COSINE, # better for full-text search
Expand All @@ -72,7 +72,7 @@ async def create(self, store_id, documents: list[Document]):

uuids = [str(uuid4()) for _ in range(len(documents))]

batch_size = 10 # this is purely for logging the progress
batch_size = 100 # this is purely for logging the progress
for i in range(0, len(documents), batch_size):
await vector_store.aadd_documents(documents=documents[i : i + batch_size], ids=uuids[i : i + batch_size])
log.info(f'Embeddings for {store_id} progress: {i + batch_size} / {len(documents)} documents')
Expand Down
21 changes: 18 additions & 3 deletions skynet/modules/ttt/rag/text_extractor/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio

from multiprocessing import cpu_count
from pathlib import Path

from kreuzberg import batch_extract_file
Expand All @@ -8,6 +11,9 @@

log = get_logger(__name__)

MAX_CONCURRENT_PROCESSES = max(1, cpu_count() - 1) # Leave one core free for other tasks
cpu_semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)


async def extract(files: list[str]) -> list[Document]:
"""
Expand All @@ -16,10 +22,19 @@ async def extract(files: list[str]) -> list[Document]:

documents = []

results = await batch_extract_file(files)
# Process files in smaller chunks to prevent long blocking operations
chunk_size = 10 # Adjust based on performance testing
for i in range(0, len(files), chunk_size):
files_chunk = files[i : i + chunk_size]

async with cpu_semaphore:
chunk_results = await batch_extract_file(files_chunk)

for file, result in zip(files_chunk, chunk_results):
documents.append(Document(result.content, metadata={'source': Path(file).name}))

for file, result in zip(files, results):
documents.append(Document(result.content, metadata={'source': Path(file).name}))
# Yield control back to the event loop to allow healthchecks to run
await asyncio.sleep(0.01)

splits = split_documents(documents)

Expand Down
2 changes: 1 addition & 1 deletion skynet/modules/ttt/rag/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def workflow(self, store_id: str, files: list[str], urls: list[str], max_d
zip_files = [f for f in files if f.endswith('.zip')]
if zip_files:
files = [f for f in files if f not in zip_files]
files.extend(extract_files(zip_files, self.get_temp_folder(store_id), min_size_kb=1))
files.extend(await extract_files(zip_files, self.get_temp_folder(store_id), min_size_kb=1))

files = [f for f in files if any(f.endswith(ext) for ext in supported_files)]

Expand Down
20 changes: 15 additions & 5 deletions skynet/modules/ttt/rag/zip_extractor/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import asyncio
from pathlib import Path
from shutil import unpack_archive
from typing import Optional


def extract_files(compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None) -> list[str]:
"""
Extract all files from the given list of compressed files.
"""

def _extract_files_sync(
compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None
) -> list[str]:
"""Synchronous implementation of file extraction"""
extracted_files = []

for compressed_file in compressed_files:
Expand All @@ -24,3 +24,13 @@ def extract_files(compressed_files: list[str], destination_folder: str, min_size
extracted_files = [f for f in extracted_files if Path(f).stat().st_size >= min_size_kb * 1024]

return extracted_files


async def extract_files(
compressed_files: list[str], destination_folder: str, min_size_kb: Optional[int] = None
) -> list[str]:
"""
Extract all files from the given list of compressed files.
Runs the entire extraction process in a separate thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
return await asyncio.to_thread(_extract_files_sync, compressed_files, destination_folder, min_size_kb)
7 changes: 5 additions & 2 deletions skynet/modules/ttt/rag/zip_extractor/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import shutil
from pathlib import Path

import pytest

from skynet.modules.ttt.rag.zip_extractor.main import extract_files


class TestExtractFiles:
def test_extract_files(self):
@pytest.mark.asyncio
async def test_extract_files(self):
"""
Test extract_files function retrieves all files in all folders.
"""
Expand All @@ -33,7 +36,7 @@ def test_extract_files(self):
for folder, zip_file in zip(folders, zip_files):
shutil.make_archive(zip_file, 'zip', folder)

extracted_files = extract_files([f'{file}.zip' for file in zip_files], 'temp')
extracted_files = await extract_files([f'{file}.zip' for file in zip_files], 'temp')
expected_files = [
'temp/archives/test1/subfolder_0/file_0.txt',
'temp/archives/test1/subfolder_0/file_1.txt',
Expand Down