Embark on an exciting journey into the world of Retrieval-Augmented Generation (RAG) for Generative AI! The foundational step in this innovative process is creating a vector store, a dynamic tool designed for performing vector or semantic searches based on meaning. RAG is used to generate prompts to the LLM with proprietary data, enabling the LLM to make precise recommendations. In the context of Generative AI, a vector is a mathematical representation of data that captures its semantic meaning. It is a numerical array, where each element of the array represents a feature or characteristic of the data. Vectors are used to encode information in a way that can be easily processed by machine learning models, particularly for tasks that involve understanding and generating natural language.
Data Representation:
Vectors transform raw data (such as text, images, or audio) into a structured format that models can understand. For text, this involves converting words, sentences, or documents into fixed-length arrays of numbers.
Semantic Search:
Vectors allow for semantic search, meaning searches based on the meaning of the data rather than just keywords. For example, similar words or phrases will have similar vector representations, enabling more accurate and relevant search results.
Retrieval-Augmented Generation (RAG):
In RAG, vectors are used to retrieve relevant data from a vector database. When a query is made, the system converts it into a vector and searches the vector database for the closest matches. This relevant data is then used to generate context-aware responses or recommendations.
Training and Inference:
During the training phase, AI models learn to convert input data into vectors that capture the essential features and relationships. During inference, these vectors are used to generate outputs, such as text generation, image recognition, or recommendation systems.
Contextual Understanding:
Vectors enable AI models to understand the context and relationships between different pieces of data. For instance, in natural language processing, vectors can represent the context of words within a sentence, helping the model generate coherent and contextually appropriate responses.
Imagine a Generative AI system designed to recommend clothing items based on user preferences:
User Query: "Find me a red summer dress in medium size."
Vectorization: The query is converted into a vector.
Semantic Search: The vector is used to search a vector database of clothing items.
Data Retrieval: The most relevant items (red, summer dresses, medium size) are retrieved.
Prompt Generation: These items are used to create a prompt for the LLM.
LLM Response: The LLM generates a response with specific and accurate recommendations for the user.
In summary, vectors are essential for transforming unstructured data into a format that Generative AI can effectively process, enabling tasks like semantic search, contextual understanding, and precise data retrieval.
In this github example, reatil data from source systems will be vector encoded in real-time as data changes. It will be inserted as the data sourced from a connector int a topic. It will then be converted into a vector with flink SQL inserted into a new topic. The connector architecture will then sink this data into a vector database. This GitHub example shows you how to do this first step.
Next the Gennerative AI application will query this vector database. With the user profile data and questions the meaning will be matched to the size of clothing and the fashion type requested by the user's query. The results will be sent to the LLM as prompts, ensuring that the recommendations made by the LLM are specific and accurate.
See a demo of the vector database query in action here
The github behind this demo is here
Traditional developers often rely on batch processes to vector enocde their data, leading to stale data and a labyrinth of point-to-point ETL integrations that demand constant maintenance. But what if there was a better way? Imagine building and maintaining a vector database with real-time data, free from the cumbersome batch ETL processes.
This GitHub repository is your gateway to that future. Dive in and discover how to revolutionize your approach to data augmentation for Generative AI, making your workflows smarter, faster, and more efficient. Let's get started on transforming the way you handle data!
Before we can convert data into vectors, we need some sample data. Setting up a connector to a retail database would be time-consuming, so for the purposes of this demo, we will generate some random data using Confluent Cloud's datagen connector. To get started, we need to create the datagen connector and have it insert data into a product-updates
topic.
Create a topic named product-updates
Set up the datagen conector for product updates... Add a new connector, select datagen, then use the link for "additional configuration"
Use the product-updates topic.
You will need an api key, use one if you have it if not create one and save the secret.
new connector --> datagen --> custom schema on configuration tab
Use the link for "additional configuration" and select JSON_SR for JSON and Schema Registry.
Now we need a custom schema for product updates, we will use the custom schema "Provide Your Own Schema" on configuration tab. Get the product schema that datagen will use to update random products here:
Product Schema
Paste it in the configuration tab.
The datagen connector will generate random product data simulating a real connector from an operational data store recieveing product updates as inventory is recieved from each retail store.
Flink can be accessed through the environments menu on the left-hand side of the Confluent Cloud interface. Select your environment, then navigate to the "Flink SQL" tab, which is located in the middle of the screen, instead of the default "Clusters" tab. As of this GitHub update (June 19th, 2024), the ML_Model function used in this github is in Early Access and is expected to be generally available (GA) soon.
If flink is not set up you will need to create a new flink pool in the same region and cloud provider as your confluent cluster. You can create a free basic cluster at for this exersize if you do not have a confluent cloud cluster here: https://confluent.cloud/
You should issue these commands from the Confluent CLI. If you do not have the Confluent CLI, you can find the installation instructions here. Instructions for connecting to your environment through the Confluent CLI are available here.
Here are some useful confluent cli commands
confluent login --save --organization-id dacfbee1-acbc-my-org-id-from-the-cloud-gui
confluent environment list
confluent environment use env-myenv-from-list
confluent kafka cluster list
confluent kafka cluster use lkc-mycluster-from-list
confluent kafka topic list
A quick getting started guide can on the flink shell can be found here.
You should be able to connect to the Flink Shell with the following.
Your specific environment and the connect information is displayed with a copy button (center near the bottom) in the image above.
confluent flink shell --compute-pool lfcp-pool-from-gui --environment env-myenv-from-gui
Useful documentation for getting started with Flink Shell via the Confluent command line client can be found here
Secret for key OPENAI.API_KEY must be set with 'set sql.secrets.<some_key> = <your_secret>' first.
Then refer it in model options with 'OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.<some_key>}}'
set 'sql.secrets.openaikey' = 'sk-Babbmpndff5CkyrFuuVTBCiKyrAaaaBBBwwwwwxxxxdsssaa';
Statement successfully submitted.
Statement phase is COMPLETED.
configuration updated successfully.
+-----------------------+-----------------------------------------------------+
| Key | Value |
+-----------------------+-----------------------------------------------------+
| sql.secrets.openaikey | sk-Babbmpndff5CkyrFuuVTBCiKyrAaaaBBBwwwwwxxxxdsssaa |
+-----------------------+-----------------------------------------------------+
CREATE MODEL `vector_encoding`
INPUT (input STRING)
OUTPUT (vector ARRAY<FLOAT>)
WITH(
'TASK' = 'classification',
'PROVIDER' = 'OPENAI',
'OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings',
'OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}'
);
Statement name: cli-2024-05-20-125717-5a83ee6c-1d05-4e27-b7f0-141e0b6bd416
Statement successfully submitted.
Waiting for statement to be ready. Statement phase is PENDING.
set 'sql.secrets.openaikey' = '<your openAI API key>';
CREATE MODEL `vector_encoding`
INPUT (input STRING)
OUTPUT (vector ARRAY<FLOAT>)
WITH(
'TASK' = 'classification',
'PROVIDER' = 'OPENAI',
'OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings',
'OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}'
);
select * from `product-updates`, lateral table (ml_predict('vector_encoding', articleType));
Very cool we did vector embedding for the article type. Now we need to build some content to vector encode, we want to make it more like natural language. Lets do this by concatenating fields.
We may also want to enrich this data so we are breaking this down into a few steps.
This table is backed up by a kafka topic with the same name for storage.
CREATE TABLE `product-content` (
`store_id` INT,
`product_id` INT,
`count` INT,
`articleType` STRING,
`size` STRING,
`fashionType` STRING,
`brandName` STRING,
`baseColor` STRING,
`gender` STRING,
`ageGroup` STRING,
`price` DOUBLE,
`season` STRING,
`content` STRING
);
Lets put in a new field that seems more like a natual language description that we can send to the vector encoding service. We will create a field called "content" which concatenates all the fields except count which is the number of items in the store. We can check inventory in a post processing step. For now we need a product description the price, store number and product id.
insert into `product-content` (
`store_id`,
`product_id`,
`count`,
`price`,
`size`,
`ageGroup`,
`gender`,
`season`,
`fashionType`,
`brandName`,
`baseColor`,
`articleType`,
`content`
)
select `store_id`,
`product_id`,
`count`,
`price`,
`size`,
`ageGroup`,
`gender`,
`season`,
`fashionType`,
`brandName`,
`baseColor`,
`articleType`,
INITCAP(
concat_ws(' ',
size,
ageGroup,
gender,
season,
fashionType,
brandName,
baseColor,
articleType,
', price: '||cast(price as string),
', store number: '||cast(store_id as string),
', product id: '||cast(product_id as string))
)
from `product-updates`;
You will notice that the table will create a new " product-vector" kafka topic and that it creates the schema for us. The vector field is an array of floats.
CREATE TABLE `product-vector` (
`store_id` INT,
`product_id` INT,
`count` INT,
`articleType` STRING,
`size` STRING,
`fashionType` STRING,
`brandName` STRING,
`baseColor` STRING,
`gender` STRING,
`ageGroup` STRING,
`price` DOUBLE,
`season` STRING,
`content` STRING,
`vector` ARRAY<FLOAT>
);
This statement will call the vector embedding service and will run in the background in FlinkSQL. For testing or demo purposes you may wish to stop it as it will use tokens against the embedding service. You can see this under the running querries tab
insert into product_vector select * from `product-content`,
lateral table (ml_predict('vector_encoding', content));
To view and stop running Flink SQL statements clisck the "running statements" tab
My favorite is MongoDB. I created this table with additional fields because MongoDB can create a vector index on the vector field and still be able to query the rest of the fields like a regular database. MongoDB combines the ODS and the Vector search in one place! The sink connector will create a document. Once the document is in MongoDB we create an index on the vector field for vector searches. And we are done! MongoDB Example