Skip to content

Commit

Permalink
[86]: Adding milvus as a vector database and a search service (RAG) (#86
Browse files Browse the repository at this point in the history
)

* initial setup for job expression generation service

* fixing a typo

* adding milvus as a vector database

* adding new env var

* adding new env var

* changed the embedding service to OpenAI

* Embedding OpenFn docs from the repo

* fixing a typo

* added a better text splitting strategy to avoid uneven chunks of data

* embed docs before database connection

* changing chunking strategy, size and overlap after testing (this seems to return better search results).

* integrating gen_job and search service.

* added the adaptor description service to add more context to the prompt and some minor changes to improve the search service.

* adding Readme for job generation service and removing adaptor docs from the embeddings as we can use describe_adaptor service for retreiving adaptor info.

* improving prompts and removing adaptor docs

* Remove gen_job folder for search_docs focus

* improved the visualization of doc spliting by writing the splits to the disk

* collection name can be added to args, logs embeddings generated per file, updated readme

* removing unnecessary commands from dockerfile

* changing search parameters

* changing search parameters

* added semantic search

* better handling of paths to be embedded

* updated the Readme

* using markdown and recursive text spilters

* added path.config and updated readme

* adding range search

* fixing a limit error

* added some search results with the score

* added adaptor doc embeddings and improved the code quality

* fixing typos

* adding more validation and improving the readme

* resolved minor errors

* fixing typo

* separating client connection and adding logger

* writing output to file bug fixed

* changeset & readme update

---------

Co-authored-by: Joe Clark <[email protected]>
  • Loading branch information
SatyamMattoo and josephjclark authored Sep 19, 2024
1 parent 9bee9b9 commit 013506f
Show file tree
Hide file tree
Showing 11 changed files with 1,559 additions and 91 deletions.
5 changes: 5 additions & 0 deletions .changeset/sour-ways-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"apollo": minor
---

Added search service (RAG)
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
OPENAI_API_KEY=sk-YOUR-API-KEY-HERE
HF_ACCESS_TOKEN=hf_YOUR-API-KEY-HERE # llama2 base
HF_ACCESS_TOKEN=hf_YOUR-API-KEY-HERE # llama2 base

ZILLIZ_URI = https://in01-XXXXXXXXXXXXX.aws-us-west-2.vectordb.zillizcloud.com:XXXXX
ZILLIZ_TOKEN =db_admin:password (or ApiKey)
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ WORKDIR /app
COPY ./pyproject.toml ./poetry.lock poetry.toml ./
COPY ./package.json bun.lockb ./
COPY ./tsconfig.json ./
COPY ./path.config ./

COPY ./platform/ ./platform
COPY ./services/ ./services
COPY ./models/ ./models

RUN apt-get update && apt-get install -y git
RUN git clone --depth 1 https://github.com/OpenFn/docs.git /app/docs

RUN python -m pip install --user pipx
RUN python -m pipx install poetry
ENV PATH="${PATH}:/root/.local/bin/"
Expand All @@ -20,6 +24,9 @@ ENV PATH="${PATH}:/root/.bun/bin/"

RUN bun install

RUN --mount=type=secret,id=_env,dst=/.env cat /.env \
&& poetry run python services/search/generate_docs_embeddings.py docs openfn_docs

EXPOSE 3000

CMD ["bun", "start"]
1 change: 1 addition & 0 deletions path.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docs/jobs/*.md
827 changes: 738 additions & 89 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ openai = "^1.23.3"
transformers = "^4.40.1"
black = "^24.4.2"
python-dotenv = "^1.0.1"
anthropic = "^0.34.0"
pymilvus = "^2.4.6"
langchain-text-splitters = "^0.2.4"

# These dependencies should only be needed for fine tuning
# use poetry install --with ft
anthropic = "^0.34.0"
[tool.poetry.group.ft]
optional = true

Expand Down
100 changes: 100 additions & 0 deletions services/search/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
## Search (RAG)

** RAG SEARCH IS AN EXPERIMENTAL FEATURE. IT IS NOT SUPPORTED BY THE STAGING SERVER**

The Search Service integrates Retrieval-Augmented Generation (RAG) with Apollo to enhance the accuracy and relevance of data retrieval based on user queries. By leveraging this service, clients can obtain pertinent information from documents, provided they include sufficient context in their queries.

## Usage - Search Service

### To make a search with `curl`:

```bash
curl -X POST https://apollo-staging.openfn.org/services/search --json @tmp/payload.json
```

### With the CLI, returning to stdout:

```bash
openfn apollo search tmp/payload.json
```
To run directly from this repo (note that the server must be started):

```bash
bun py search tmp/payload.json -O
```

## Config Paths
To make it easier to specify which documents should be embedded, we use a configuration file named `path.config` located at the root of the repository.

### Adding Document Paths

1. **Open the `path.config` file**: This file contains the paths to the markdown files that you want to embed into the vector database.

2. **Specify one path per line**: Add each document path you wish to include, with one path per line. These paths should be relative to the docs folder within the repository.

3. **Use standard glob patterns**: You can use standard glob patterns to match multiple files or directories. For example:
- `docs/jobs/*.md`: This pattern matches all markdown files in the `docs/jobs` directory.
- `docs/adaptors/**/*.md`: This pattern matches all markdown files within the `docs/adaptors` directory and any of its subdirectories.

### Example `path.config`

```plaintext
docs/jobs/*.md
docs/adaptors/**/*.md
```
In this example, all markdown files under docs/jobs and docs/adaptors (including subdirectories) will be processed and embedded.

#### Important Note:
The paths specified in the `path.config` file are relative to the `docs` directory within your repository, not from the root folder.

By following these instructions, you can easily manage and update the list of documents to be embedded without needing to dive into the underlying code.

## Usage - Embedding OpenFn Docs
This service also includes the embedding of OpenFn docs to a vector database. The vector database used here is Zilliz. To obtain the env variables follow these steps:

1. Create an account on [Zilliz](https://zilliz.com/) and set up a free cluster.
2. Obtain the URL and token for the cluster and add them to the `.env` file.
3. You'll also need an OpenAI API key to generate embeddings.

There are two ways to run the embed the docs:

### Running the Generate Embeddings File
Use the poetry command to run the service. You will need to clone the docs repo in this case using:

```bash
git clone --depth 1 https://github.com/OpenFn/docs.git tmp
```

Then run:
```bash
poetry run python services/search/generate_docs_embeddings.py tmp/docs openfn_docs
```
Here, tmp/docs is the path to the OpenFn docs and openfn_docs is the name of the collection in the vector database.

### Docker
Alternatively, run the Docker file directly using:

```bash
docker build --secret id=_env,src=.env -t apollo .
```

**NOTE**: The docs are split into chunks which can be seen in the `tmp/split_sections` folder.

## Implementation
The service retrieves data from job documents stored in a vector database. It uses the OpenAI Embedding Function to embed the OpenFn Documentation, allowing for relevant context retrieval and enhancing the performance of other Apollo services. You can also configure the number of chunks retrieved from the database.

## Payload Reference
The input payload is a JSON object with the following structure:

```js
{
"api_key": "<OpenAI api key>",
"query": "What are jobs in OpenFn?",
"limit": 10, // Custom limit for number of chunks retrieved (1 to 15). Default value is 5. (optional)
"partition_name": "openfn_docs", // Name of the partition in the vector database. (optional)
}
```
The `partition_name` is optional and the expected values are `normal_docs` or `adaptor_docs`.

## Response Reference
The server returns an array of relevant strings from the documents based on the provided query.
151 changes: 151 additions & 0 deletions services/search/generate_docs_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import os
import time
import argparse
from openai import OpenAI
from dotenv import load_dotenv
from pymilvus import FieldSchema, CollectionSchema, DataType, utility, Collection, connections
from utils import split_docs, read_md_files, read_paths_config, fetch_adaptor_data, process_adaptor_data, logger

load_dotenv()

def parse_arguments():
"""
Parses command line arguments.
:return: Parsed arguments
"""
parser = argparse.ArgumentParser(description="Initialize Milvus with markdown files")
parser.add_argument("repo_path", type=str, help="Path to the repository containing markdown files")
parser.add_argument("collection_name", type=str, help="Name of the Milvus collection")
return parser.parse_args()

def create_collection(collection_name: str, max_chunk_length: int) -> Collection:
"""
Creates a Milvus collection with the specified schema.
:param collection_name: Name of the collection to create
:param max_chunk_length: Maximum length of text field
:return: Milvus Collection instance
"""
# Define field schemas
id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True, description="Primary ID")
embedding_field = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536, description="Vector")
text_field = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=max_chunk_length, description="Text data")

# Create collection schema
schema = CollectionSchema(fields=[id_field, embedding_field, text_field], description="Corpus collection schema")

# Create the collection
logger.info(f"Creating collection: {collection_name}")
collection = Collection(name=collection_name, schema=schema)
logger.info("Collection created!")

# Create partitions
collection.create_partition(partition_name="normal_docs")
collection.create_partition(partition_name="adaptor_docs")
logger.info("Partitions 'normal_docs' and 'adaptor_docs' created!")

return collection

def embed_documents(client: OpenAI, corpus: list, adaptor_corpus: list) -> tuple:
"""
Embeds documents using the OpenAI API.
:param client: OpenAI client instance
:param corpus: List of normal documents to embed
:param adaptor_corpus: List of adaptor documents to embed
:return: Tuple containing embedded normal and adaptor documents
"""
logger.info("Embedding documents...")
corpus_vectors = client.embeddings.create(input=corpus, model="text-embedding-3-small").data
adaptor_vectors = client.embeddings.create(input=adaptor_corpus, model="text-embedding-3-small").data
return corpus_vectors, adaptor_vectors

def insert_data(collection: Collection, vectors: list, texts: list, partition_name: str) -> None:
"""
Inserts data into a specified partition of the Milvus collection.
:param collection: Milvus Collection instance
:param vectors: List of embeddings
:param texts: List of document texts
:param partition_name: Name of the partition to insert data into
"""
data = [{"embedding": vec.embedding, "text": texts[i]} for i, vec in enumerate(vectors)]
collection.insert(data=data, partition_name=partition_name)
logger.info(f"Inserted {len(data)} documents into '{partition_name}' partition.")

def main():
args = parse_arguments()

# Fetch API keys and connection details
openai_key = os.getenv("OPENAI_API_KEY")
zilliz_uri = os.getenv('ZILLIZ_URI')
zilliz_token = os.getenv('ZILLIZ_TOKEN')

# Connect to OpenAI and Milvus
logger.info("Connecting to Milvus Database...")
client = OpenAI(api_key=openai_key)
connections.connect("default", uri=zilliz_uri, token=zilliz_token, db_name="openfn_docs")

# Read and process markdown files
docs_to_embed = read_paths_config("./path.config", args.repo_path)
md_files_content = read_md_files(docs_to_embed)

corpus = []
file_embeddings_count = {}

for file_name, content in md_files_content:
sections = split_docs(file_name, content)
corpus.extend(section.page_content for section in sections)
file_embeddings_count[file_name] = len(sections)

# Log the number of embeddings per file
for file_name, count in file_embeddings_count.items():
logger.info(f"File: {file_name} - Generated {count} embeddings")

logger.info(f"Normal docs split: {len(corpus)}")

# Fetch and process adaptor data
adaptor_data = fetch_adaptor_data()
adaptor_corpus, max_chunk_length = process_adaptor_data(adaptor_data)
logger.info(f"Adaptor data split: {len(adaptor_corpus)}")

# Combine normal and adaptor docs
logger.info(f"Total documents after adding adaptors: {len(corpus) + len(adaptor_corpus)}")

collection_name = args.collection_name
check_collection = utility.has_collection(collection_name)
if check_collection:
utility.drop_collection(collection_name)

# Create collection
collection = create_collection(collection_name=collection_name, max_chunk_length=max_chunk_length)

# Embed and insert data
corpus_vectors, adaptor_vectors = embed_documents(client, corpus, adaptor_corpus)

insert_data(collection, corpus_vectors, corpus, partition_name="normal_docs")
insert_data(collection, adaptor_vectors, adaptor_corpus, partition_name="adaptor_docs")

# Flush and create index
logger.info("Flushing...")
start_flush = time.time()
collection.flush()
end_flush = time.time()
logger.info(f"Succeeded in {round(end_flush - start_flush, 4)} seconds!")

logger.info("Creating index...")
default_index = {'index_type': 'IVF_FLAT', 'metric_type': 'L2', 'params': {'nlist': 1024}}
collection.create_index(field_name="embedding", index_params=default_index)

# Load collection
t0 = time.time()
logger.info("Loading collection...")
collection.load()
t1 = time.time()
logger.info(f"Loaded collection in {round(t1 - t0, 4)} seconds!")

logger.info("Milvus database configured successfully!")

if __name__ == "__main__":
main()
Loading

0 comments on commit 013506f

Please sign in to comment.