Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metadata to ingestion routes #2008

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions private_gpt/components/ingest/ingest_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ def __init__(
self.transformations = transformations

@abc.abstractmethod
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it is better idea to type like dict[str, Any] | None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I thought about because of llama index's restriction for documents: the metadata can only be a flat value (string, int, float, see https://docs.llamaindex.ai/en/stable/module_guides/loading/documents_and_nodes/usage_documents/#metadata). By using string I made it "fool proof". But lmk if you want Any, i will change it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is better idea try to convert data when we're storing data into vector store that compromise all application with this decision. What do you think @imartinez?

) -> list[Document]:
pass

@abc.abstractmethod
Expand Down Expand Up @@ -117,9 +122,16 @@ def __init__(
) -> None:
super().__init__(storage_context, embed_model, transformations, *args, **kwargs)

def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
Expand Down Expand Up @@ -175,16 +187,24 @@ def __init__(
processes=self.count_workers
)

def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)

def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:

documents = list(
itertools.chain.from_iterable(
self._file_to_documents_work_pool.starmap(
Expand Down Expand Up @@ -257,12 +277,18 @@ def __init__(
processes=self.count_workers
)

def ingest(self, file_name: str, file_data: Path) -> list[Document]:
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
# Running in a single (1) process to release the current
# thread, and take a dedicated CPU core for computation
documents = self._file_to_documents_work_pool.apply(
IngestionHelper.transform_file_into_documents, (file_name, file_data)
IngestionHelper.transform_file_into_documents,
(file_name, file_data, file_metadata),
)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
Expand All @@ -271,9 +297,9 @@ def ingest(self, file_name: str, file_data: Path) -> list[Document]:
return self._save_docs(documents)

def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:

# Lightweight threads, used for parallelize the
# underlying IO calls made in the ingestion

documents = list(
itertools.chain.from_iterable(
self._ingest_work_pool.starmap(self.ingest, files)
Expand Down Expand Up @@ -459,8 +485,15 @@ def _flush(self) -> None:
self.node_q.put(("flush", None, None, None))
self.node_q.join()

def ingest(self, file_name: str, file_data: Path) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
def ingest(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[Document]:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data, file_metadata
)
self.doc_q.put(("process", file_name, documents))
self._flush()
return documents
Expand Down
4 changes: 3 additions & 1 deletion private_gpt/components/ingest/ingest_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ class IngestionHelper:

@staticmethod
def transform_file_into_documents(
file_name: str, file_data: Path
file_name: str, file_data: Path, file_metadata: dict[str, str] | None = None
) -> list[Document]:
documents = IngestionHelper._load_file_to_documents(file_name, file_data)
for document in documents:
document.metadata.update(file_metadata or {})
document.metadata["file_name"] = file_name

IngestionHelper._exclude_metadata(documents)
return documents

Expand Down
29 changes: 25 additions & 4 deletions private_gpt/server/ingest/ingest_router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Literal

from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile
from fastapi import APIRouter, Depends, Form, HTTPException, Request, UploadFile
from pydantic import BaseModel, Field

from private_gpt.server.ingest.ingest_service import IngestService
Expand All @@ -20,6 +20,16 @@ class IngestTextBody(BaseModel):
"Chinese martial arts."
]
)
metadata: dict[str, str] = Field(
None,
examples=[
{
"title": "Avatar: The Last Airbender",
"author": "Michael Dante DiMartino, Bryan Konietzko",
"year": "2005",
}
],
)


class IngestResponse(BaseModel):
Expand All @@ -38,9 +48,15 @@ def ingest(request: Request, file: UploadFile) -> IngestResponse:


@ingest_router.post("/ingest/file", tags=["Ingestion"])
def ingest_file(request: Request, file: UploadFile) -> IngestResponse:
def ingest_file(
request: Request, file: UploadFile, metadata: str = Form(None)
) -> IngestResponse:
"""Ingests and processes a file, storing its chunks to be used as context.

metadata: Optional metadata to be associated with the file.
You do not have to specify this field if not needed.
e.g. {"title": "Avatar: The Last Airbender", "year": "2005"}

The context obtained from files is later used in
`/chat/completions`, `/completions`, and `/chunks` APIs.

Expand All @@ -57,7 +73,11 @@ def ingest_file(request: Request, file: UploadFile) -> IngestResponse:
service = request.state.injector.get(IngestService)
if file.filename is None:
raise HTTPException(400, "No file name provided")
ingested_documents = service.ingest_bin_data(file.filename, file.file)

metadata_dict = None if metadata is None else eval(metadata)
NathanLenas marked this conversation as resolved.
Show resolved Hide resolved
ingested_documents = service.ingest_bin_data(
file.filename, file.file, metadata_dict
)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)


Expand All @@ -73,11 +93,12 @@ def ingest_text(request: Request, body: IngestTextBody) -> IngestResponse:
extracted Metadata (which is later used to improve context retrieval). That ID
can be used to filter the context used to create responses in
`/chat/completions`, `/completions`, and `/chunks` APIs.

"""
service = request.state.injector.get(IngestService)
if len(body.file_name) == 0:
raise HTTPException(400, "No file name provided")
ingested_documents = service.ingest_text(body.file_name, body.text)
ingested_documents = service.ingest_text(body.file_name, body.text, body.metadata)
return IngestResponse(object="list", model="private-gpt", data=ingested_documents)


Expand Down
31 changes: 23 additions & 8 deletions private_gpt/server/ingest/ingest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ def __init__(
settings=settings(),
)

def _ingest_data(self, file_name: str, file_data: AnyStr) -> list[IngestedDoc]:
def _ingest_data(
self,
file_name: str,
file_data: AnyStr,
file_metadata: dict[str, str] | None = None,
) -> list[IngestedDoc]:
logger.debug("Got file data of size=%s to ingest", len(file_data))
# llama-index mainly supports reading from files, so
# we have to create a tmp file to read for it to work
Expand All @@ -60,27 +65,37 @@ def _ingest_data(self, file_name: str, file_data: AnyStr) -> list[IngestedDoc]:
path_to_tmp.write_bytes(file_data)
else:
path_to_tmp.write_text(str(file_data))
return self.ingest_file(file_name, path_to_tmp)
return self.ingest_file(file_name, path_to_tmp, file_metadata)
finally:
tmp.close()
path_to_tmp.unlink()

def ingest_file(self, file_name: str, file_data: Path) -> list[IngestedDoc]:
def ingest_file(
self,
file_name: str,
file_data: Path,
file_metadata: dict[str, str] | None = None,
) -> list[IngestedDoc]:
logger.info("Ingesting file_name=%s", file_name)
documents = self.ingest_component.ingest(file_name, file_data)
documents = self.ingest_component.ingest(file_name, file_data, file_metadata)
logger.info("Finished ingestion file_name=%s", file_name)
return [IngestedDoc.from_document(document) for document in documents]

def ingest_text(self, file_name: str, text: str) -> list[IngestedDoc]:
def ingest_text(
self, file_name: str, text: str, metadata: dict[str, str] | None = None
) -> list[IngestedDoc]:
logger.debug("Ingesting text data with file_name=%s", file_name)
return self._ingest_data(file_name, text)
return self._ingest_data(file_name, text, metadata)

def ingest_bin_data(
self, file_name: str, raw_file_data: BinaryIO
self,
file_name: str,
raw_file_data: BinaryIO,
file_metadata: dict[str, str] | None = None,
) -> list[IngestedDoc]:
logger.debug("Ingesting binary data with file_name=%s", file_name)
file_data = raw_file_data.read()
return self._ingest_data(file_name, file_data)
return self._ingest_data(file_name, file_data, file_metadata)

def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[IngestedDoc]:
logger.info("Ingesting file_names=%s", [f[0] for f in files])
Expand Down
13 changes: 13 additions & 0 deletions tests/fixtures/ingest_helper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from pathlib import Path

import pytest
Expand All @@ -18,6 +19,18 @@ def ingest_file(self, path: Path) -> IngestResponse:
ingest_result = IngestResponse.model_validate(response.json())
return ingest_result

def ingest_file_with_metadata(self, path: Path, metadata: dict) -> IngestResponse:
files = {
"file": (path.name, path.open("rb")),
"metadata": (None, json.dumps(metadata)),
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below. Ideally, it would say it has to be something like this:

files = {
  "file": (path.name, path.open("rb")),
   "metadata": metadata
}


response = self.test_client.post("/v1/ingest/file", files=files)

assert response.status_code == 200
ingest_result = IngestResponse.model_validate(response.json())
return ingest_result


@pytest.fixture()
def ingest_helper(test_client: TestClient) -> IngestHelper:
Expand Down
22 changes: 22 additions & 0 deletions tests/server/ingest/test_ingest_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,25 @@ def test_ingest_plain_text(test_client: TestClient) -> None:
assert response.status_code == 200
ingest_result = IngestResponse.model_validate(response.json())
assert len(ingest_result.data) == 1


def test_ingest_text_with_metadata(test_client: TestClient):
response = test_client.post(
"/v1/ingest/text",
json={"file_name": "file_name", "text": "text", "metadata": {"foo": "bar"}},
)
assert response.status_code == 200
ingest_result = IngestResponse.model_validate(response.json())
assert len(ingest_result.data) == 1

assert ingest_result.data[0].doc_metadata == {
"file_name": "file_name",
"foo": "bar",
}


def test_ingest_accepts_txt_files_with_metadata(ingest_helper: IngestHelper) -> None:
path = Path(__file__).parents[0] / "test.txt"
ingest_result = ingest_helper.ingest_file_with_metadata(path, {"foo": "bar"})
assert len(ingest_result.data) == 1
assert ingest_result.data[0].doc_metadata == {"file_name": "test.txt", "foo": "bar"}
Loading