Skip to content

feat(providers): add OpenSearch vector store provider#4952

Open
emirsimsek00 wants to merge 1 commit intollamastack:mainfrom
emirsimsek00:feat/opensearch-provider
Open

feat(providers): add OpenSearch vector store provider#4952
emirsimsek00 wants to merge 1 commit intollamastack:mainfrom
emirsimsek00:feat/opensearch-provider

Conversation

@emirsimsek00
Copy link

What does this PR do?

This PR implements the OpenSearch vector store provider for Llama Stack, enabling users to use OpenSearch (and Amazon OpenSearch Service) as a vector database backend.

It introduces:

  • Remote Provider: src/llama_stack/providers/remote/vector_io/opensearch/ containing the core OpenSearchVectorIOAdapter and OpenSearchIndex.
  • Inline Provider: Alias for the remote provider under src/llama_stack/providers/inline/vector_io/opensearch/.
  • Configuration: Schema for OpenSearch connection details (host, port, auth, SSL).
  • Registration: Registers remote::opensearch and inline::opensearch in src/llama_stack/providers/registry/vector_io.py.
  • Dependencies: Adds opensearch-py as an optional dependency.

Supports:

  • Vector Search (k-NN using HNSW)
  • Keyword Search (Match query)
  • Hybrid Search (Bool query combining vector and keyword)

Test Plan

I verified the changes using:

  1. Unit Tests: Added tests/unit/providers/vector_io/test_opensearch.py which mocks the OpenSearch client to test initialization, registration, insertion, and query logic.
  2. Manual Verification: Created a script verify_opensearch.py to test against a local OpenSearch instance.

Testing Script (verify_opensearch.py):

import asyncio
import os
import sys

# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), "src"))

from llama_stack_api import EmbeddedChunk, InsertChunksRequest, QueryChunksRequest, VectorStore
from llama_stack.providers.remote.vector_io.opensearch.config import OpenSearchVectorIOConfig
from llama_stack.providers.remote.vector_io.opensearch.opensearch import OpenSearchVectorIOAdapter

# Mock dependencies
class MockInference:
    async def openai_embeddings(self, *args, **kwargs):
        pass

class MockFiles:
    pass

async def main():
    print(" verifying OpenSearch provider implementation...")
    
    # 1. Config
    config = OpenSearchVectorIOConfig(
        host="localhost",
        port=9200,
    )
    print(f" Config loaded: {config}")

    # 2. Adapter Instantiation
    try:
        adapter = OpenSearchVectorIOAdapter(
            config, 
            inference_api=MockInference(), 
            files_api=MockFiles()
        )
        print(" Adapter instantiated successfully.")
    except ImportError as e:
        print(f" Failed to instantiate adapter (likely missing opensearch-py): {e}")
        return

    # 3. Initialize (requires running OpenSearch, so we mock or skip if connection fails)
    print(" Attempting initialization (will fail if no OpenSearch running)...")
    try:
        await adapter.initialize()
        print(" Initialization successful!")
    except Exception as e:
        print(f" Initialization failed (expected if manual OpenSearch not running): {e}")
        print(" Skipping further integration tests.")
        return

    # 4. Register Store
    store_id = "test_verify_store"
    vector_store = VectorStore(
        identifier=store_id,
        provider_id="test_provider",
        provider_resource_id="test_resource",
        embedding_dimension=4,
        embedding_model="test_model",
        status="completed",
    )
    
    try:
        await adapter.register_vector_store(vector_store)
        print(f" Registered vector store: {store_id}")
    except Exception as e:
        print(f" Failed to register vector store: {e}")

    # 5. Insert Chunks
    chunks = [
        EmbeddedChunk(
            chunk_id="c1",
            content="Hello OpenSearch",
            embedding=[0.1, 0.2, 0.3, 0.4],
            metadata={"source": "manual_verify"},
        )
    ]
    try:
        await adapter.insert_chunks(InsertChunksRequest(vector_store_id=store_id, chunks=chunks))
        print(" Inserted chunks successfully.")
    except Exception as e:
         print(f" Failed to insert chunks: {e}")

    # 6. Query Chunks
    try:
        response = await adapter.query_chunks(
            QueryChunksRequest(
                vector_store_id=store_id,
                query=[0.1, 0.2, 0.3, 0.4],
                params={"mode": "vector", "score_threshold": 0.0}
            )
        )
        print(f" Query response: {len(response.chunks)} chunks found.")
        if len(response.chunks) > 0:
            print(f" First chunk content: {response.chunks[0].content}")
    except Exception as e:
        print(f" Failed to query chunks: {e}")

    # 7. Cleanup
    try:
        await adapter.unregister_vector_store(store_id)
        print(" Unregistered vector store.")
    except Exception as e:
        print(f" Failed to unregister vector store: {e}")

    print(" Verification script finished.")

if __name__ == "__main__":
    asyncio.run(main())

Copilot AI review requested due to automatic review settings February 19, 2026 08:21
@meta-cla
Copy link

meta-cla bot commented Feb 19, 2026

Hi @emirsimsek00!

Thank you for your pull request and welcome to our community.

Action Required

In order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at cla@meta.com. Thanks!

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds OpenSearch vector store provider support to Llama Stack, enabling users to leverage OpenSearch (and Amazon OpenSearch Service) as a vector database backend. The implementation follows the existing provider pattern with both remote and inline configurations, supporting vector search (k-NN/HNSW), keyword search (BM25), and hybrid search capabilities.

Changes:

  • Implements OpenSearch vector store adapter with OpenSearchIndex for index operations and OpenSearchVectorIOAdapter for the provider interface
  • Adds configuration schema for OpenSearch connection settings (host, port, SSL, authentication)
  • Registers both remote::opensearch and inline::opensearch providers in the vector_io registry with appropriate documentation

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 23 comments.

Show a summary per file
File Description
src/llama_stack/providers/remote/vector_io/opensearch/opensearch.py Core implementation of OpenSearch adapter with index operations (add, query, delete chunks) and vector store management
src/llama_stack/providers/remote/vector_io/opensearch/config.py Configuration schema for OpenSearch connection parameters including host, port, SSL settings, and authentication
src/llama_stack/providers/inline/vector_io/opensearch/init.py Inline provider entry point that delegates to remote implementation
src/llama_stack/providers/inline/vector_io/opensearch/config.py Inline provider config that aliases remote config
src/llama_stack/providers/registry/vector_io.py Registers OpenSearch as both remote and inline provider with documentation and dependency specifications
tests/unit/providers/vector_io/test_opensearch.py Unit tests covering initialization, registration, insertion, and querying with mocked OpenSearch client
pyproject.toml Adds opensearch-py to test dependencies

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1 to +6
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remote OpenSearch provider is missing an __init__.py file. All other remote vector_io providers (chroma, elasticsearch, milvus, etc.) have an __init__.py file that exports a get_adapter_impl function. This file is required for the module to be properly imported and initialized by the Llama Stack provider system.

You need to create src/llama_stack/providers/remote/vector_io/opensearch/__init__.py with content similar to the elasticsearch provider, containing a get_adapter_impl async function that instantiates the adapter and calls initialize().

Copilot uses AI. Check for mistakes.
Comment on lines +288 to +312
async def initialize(self) -> None:
if OpenSearch is None:
raise ImportError(
"opensearch-py is not installed. Please install it with `pip install opensearch-py`."
)

auth = None
if self.config.username and self.config.password:
auth = (self.config.username, self.config.password.get_secret_value())

self.client = OpenSearch(
hosts=[{"host": self.config.host, "port": self.config.port}],
http_compress=True,
http_auth=auth,
use_ssl=self.config.use_ssl,
verify_certs=self.config.verify_certs,
)

# Try to ping or get info to verify connection
try:
await asyncio.to_thread(self.client.info)
except Exception as e:
logger.warning(f"Could not connect to OpenSearch at startup: {e}")

await self.initialize_openai_vector_stores()
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OpenSearchVectorIOAdapter doesn't properly integrate with KVStore for persistent storage of vector store metadata. Looking at the elasticsearch provider (lines 383-395 in elasticsearch.py), it uses KVStore to persist and reload vector store configurations across restarts. The OpenSearch implementation only stores vector stores in memory (self.cache), which means they'll be lost on restart.

You should:

  1. Add persistence configuration to OpenSearchVectorIOConfig
  2. Initialize kvstore in the initialize() method
  3. Persist vector stores to kvstore in register_vector_store()
  4. Load existing vector stores from kvstore during initialize()
  5. Remove from kvstore in unregister_vector_store()

Copilot uses AI. Check for mistakes.
)


class OpenSearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO):
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OpenSearchVectorIOAdapter should inherit from VectorStoresProtocolPrivate in addition to OpenAIVectorStoreMixin and VectorIO. Looking at the elasticsearch provider (line 367 in elasticsearch.py), it inherits from all three: ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate). This protocol is needed to properly support the full vector stores API.

Copilot uses AI. Check for mistakes.
Comment on lines +184 to +219
async def query_hybrid(
self,
embedding: List[float],
query_string: str,
k: int,
score_threshold: float,
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
# Simple hybrid using compound bool query
query = {
"size": k,
"query": {
"bool": {
"should": [
{
"knn": {
"embedding": {
"vector": embedding,
"k": k,
"boost": 0.5
}
}
},
{
"match": {
"content": {
"query": query_string,
"boost": 0.5
}
}
}
]
}
}
}
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query_hybrid method ignores the reranker_type and reranker_params arguments and always uses fixed 0.5 boost values. This means users cannot control the weighting between vector and keyword search.

The method should respect the reranker_params to allow customization. For example, if reranker_type is "weighted" and reranker_params contains an "alpha" value, the boost values should be adjusted accordingly (alpha for vector, 1-alpha for keyword). See the elasticsearch provider's query_hybrid implementation for reference on how to properly handle reranker parameters.

Copilot uses AI. Check for mistakes.

# We ignore 404s (not found) during bulk delete to be safe
await asyncio.to_thread(
helpers.bulk, self.client, actions, raise_on_error=False, refresh=True
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: line 250 has an extra space at the beginning. The line should align with line 249 (no extra leading space). This violates Python's consistent indentation conventions.

Suggested change
helpers.bulk, self.client, actions, raise_on_error=False, refresh=True
helpers.bulk, self.client, actions, raise_on_error=False, refresh=True

Copilot uses AI. Check for mistakes.
try:
await asyncio.to_thread(self.client.indices.delete, index=self.index_name, ignore=[404])
except Exception as e:
logger.error(f"Failed to delete index {self.index_name}: {e}")
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delete() method logs an error but doesn't re-raise the exception. This means that callers won't know if the deletion failed. Looking at the elasticsearch provider (lines 357-364 in elasticsearch.py), it logs the error and then re-raises it, allowing the caller to handle the failure appropriately.

The OpenSearch implementation should also re-raise the exception after logging to maintain consistent error handling behavior.

Suggested change
logger.error(f"Failed to delete index {self.index_name}: {e}")
logger.error(f"Failed to delete index {self.index_name}: {e}")
raise

Copilot uses AI. Check for mistakes.
async def delete_chunks(self, chunks_for_deletion: List[ChunkForDeletion]):
actions = []
for chunk in chunks_for_deletion:
actions.append({
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: line 242 has an extra space at the beginning. The line should align with lines 241 and 246 (no extra leading space). This violates Python's consistent indentation conventions.

Copilot uses AI. Check for mistakes.
def _reconstruct_chunk(self, source: dict) -> EmbeddedChunk:
chunk_content_json = source.get("chunk_content")
if chunk_content_json:
# It was stored as JSON string
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: line 256 has an extra space at the beginning. The line should align with lines 254-255 and 258-260 (no extra leading space). This violates Python's consistent indentation conventions.

Copilot uses AI. Check for mistakes.

from typing import Optional

from llama_stack_api.shared.schemas import AdapterSpec, StackComponentConfig
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import AdapterSpec from llama_stack_api.shared.schemas is unused. Looking at other vector_io provider configs (chroma, elasticsearch), they don't import AdapterSpec. This import should be removed.

Suggested change
from llama_stack_api.shared.schemas import AdapterSpec, StackComponentConfig
from llama_stack_api.shared.schemas import StackComponentConfig

Copilot uses AI. Check for mistakes.
Comment on lines +38 to +41
except ImportError:
OpenSearch = None
helpers = None

Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When opensearch-py is not installed, the helpers module is set to None (line 40), but it's used in multiple places (lines 109, 250) without checking if it's None first. This will cause an AttributeError at runtime if opensearch-py is not installed, even though there's a check for OpenSearch being None in the initialize() method (line 289).

The check at line 289 should also verify that helpers is not None, or the helpers import should raise an ImportError that gets caught, similar to how OpenSearch is handled.

Suggested change
except ImportError:
OpenSearch = None
helpers = None
except ImportError as _opensearch_import_error:
OpenSearch = None
class _MissingHelpers:
def __getattr__(self, name: str) -> Any:
# Raised when opensearch-py is not installed but helpers is used.
raise ImportError(
"opensearch-py is required to use the OpenSearch vector store "
"(missing helpers module)."
) from _opensearch_import_error
helpers = _MissingHelpers()

Copilot uses AI. Check for mistakes.
@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Feb 19, 2026
@meta-cla
Copy link

meta-cla bot commented Feb 19, 2026

Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants