diff --git a/Makefile b/Makefile index 30e1f6ad..8bc17e03 100644 --- a/Makefile +++ b/Makefile @@ -12,11 +12,11 @@ lint lint_diff: poetry run mypy $(PYTHON_FILES) test: - poetry run pytest -vv --cov=semantic_router --cov-report=term-missing --cov-report=xml + poetry run pytest -vv --cov=semantic_router --cov-report=term-missing --cov-report=xml --exitfirst --maxfail=1 test_functional: - poetry run pytest -vv -n 20 tests/functional + poetry run pytest -vv --exitfirst --maxfail=1 tests/functional test_unit: - poetry run pytest -vv -n 20 tests/unit + poetry run pytest -vv --exitfirst --maxfail=1 tests/unit test_integration: - poetry run pytest -vv -n 20 tests/integration \ No newline at end of file + poetry run pytest -vv --exitfirst --maxfail=1 tests/integration diff --git a/docs/00-introduction.ipynb b/docs/00-introduction.ipynb index e96e75b8..d464a2d4 100644 --- a/docs/00-introduction.ipynb +++ b/docs/00-introduction.ipynb @@ -279,62 +279,6 @@ "sr(\"I'm interested in learning about llama 2\")" ] }, - { - "cell_type": "markdown", - "metadata": { - "id": "dDZF2eN4f3p4" - }, - "source": [ - "We can also retrieve multiple routes with its associated score:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "n27I7kmpf3p4", - "outputId": "2138e077-190b-41b7-a3eb-4fd76e2f59c2" - }, - "outputs": [ - { - "data": { - "text/plain": [ - "[RouteChoice(name='politics', function_call=None, similarity_score=0.8595844842560181),\n", - " RouteChoice(name='chitchat', function_call=None, similarity_score=0.8356704527362284)]" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "sr.retrieve_multiple_routes(\"Hi! How are you doing in politics??\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "zi4XJ7Amf3p4", - "outputId": "cf05cd65-d4f4-454a-ef05-77f16f37cc8f" - }, - "outputs": [ - { - "data": { - "text/plain": [ - "[]" - ] - }, - "execution_count": 10, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "sr.retrieve_multiple_routes(\"I'm interested in learning about llama 2\")" - ] - }, { "cell_type": "markdown", "metadata": {}, diff --git a/semantic_router/encoders/bm25.py b/semantic_router/encoders/bm25.py index f42bf9c2..b5db6f8b 100644 --- a/semantic_router/encoders/bm25.py +++ b/semantic_router/encoders/bm25.py @@ -57,6 +57,7 @@ def fit(self, routes: List[Route]): self.model.fit(corpus=utterances) def __call__(self, docs: List[str]) -> list[SparseEmbedding]: + print(f"JBTEMP: {docs}") if self.model is None: raise ValueError("Model or index mapping is not initialized.") if len(docs) == 1: diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index d98ae19e..933e2294 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -15,6 +15,12 @@ RETRY_WAIT_TIME = 2.5 +class IndexConfig(BaseModel): + type: str + dimensions: int + vectors: int + + class BaseIndex(BaseModel): """ Base class for indices using Pydantic's BaseModel. @@ -38,6 +44,7 @@ def add( utterances: List[Any], function_schemas: Optional[List[Dict[str, Any]]] = None, metadata_list: List[Dict[str, Any]] = [], + **kwargs, ): """Add embeddings to the index. This method should be implemented by subclasses. @@ -51,6 +58,7 @@ async def aadd( utterances: List[str], function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None, metadata_list: List[Dict[str, Any]] = [], + **kwargs, ): """Add vectors to the index asynchronously. This method should be implemented by subclasses. @@ -62,6 +70,7 @@ async def aadd( utterances=utterances, function_schemas=function_schemas, metadata_list=metadata_list, + **kwargs, ) def get_utterances(self) -> List[Utterance]: @@ -143,10 +152,17 @@ def delete(self, route_name: str): """ raise NotImplementedError("This method should be implemented by subclasses.") - def describe(self) -> Dict: + def describe(self) -> IndexConfig: + """ + Returns an IndexConfig object with index details such as type, dimensions, and + total vector count. + This method should be implemented by subclasses. + """ + raise NotImplementedError("This method should be implemented by subclasses.") + + def is_ready(self) -> bool: """ - Returns a dictionary with index details such as type, dimensions, and total - vector count. + Checks if the index is ready to be used. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") @@ -238,7 +254,7 @@ async def _async_read_config( :return: The config parameter that was read. :rtype: ConfigParameter """ - logger.warning("Async method not implemented.") + logger.warning("_async_read_config method not implemented.") return self._read_config(field=field, scope=scope) def _write_config(self, config: ConfigParameter) -> ConfigParameter: @@ -353,6 +369,7 @@ async def alock( """Lock/unlock the index for a given scope (if applicable). If index already locked/unlocked, raises ValueError. """ + logger.warning(f"JBTEMP alock method called with {value=} {wait=} {scope=}") start_time = datetime.now() while True: if await self._ais_locked(scope=scope) != value: diff --git a/semantic_router/index/hybrid_local.py b/semantic_router/index/hybrid_local.py index d4096edb..cab9b982 100644 --- a/semantic_router/index/hybrid_local.py +++ b/semantic_router/index/hybrid_local.py @@ -25,6 +25,7 @@ def add( function_schemas: Optional[List[Dict[str, Any]]] = None, metadata_list: List[Dict[str, Any]] = [], sparse_embeddings: Optional[List[SparseEmbedding]] = None, + **kwargs, ): if sparse_embeddings is None: raise ValueError("Sparse embeddings are required for HybridLocalIndex.") @@ -66,13 +67,6 @@ def get_utterances(self) -> List[Utterance]: return [] return [Utterance.from_tuple(x) for x in zip(self.routes, self.utterances)] - def describe(self) -> Dict: - return { - "type": self.type, - "dimensions": self.index.shape[1] if self.index is not None else 0, - "vectors": self.index.shape[0] if self.index is not None else 0, - } - def _sparse_dot_product( self, vec_a: dict[int, float], vec_b: dict[int, float] ) -> float: diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index 76d44d82..61b2c3b5 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -3,7 +3,7 @@ import numpy as np from semantic_router.schema import ConfigParameter, SparseEmbedding, Utterance -from semantic_router.index.base import BaseIndex +from semantic_router.index.base import BaseIndex, IndexConfig from semantic_router.linear import similarity_matrix, top_scores from semantic_router.utils.logger import logger from typing import Any @@ -26,6 +26,7 @@ def add( utterances: List[str], function_schemas: Optional[List[Dict[str, Any]]] = None, metadata_list: List[Dict[str, Any]] = [], + **kwargs, ): embeds = np.array(embeddings) # type: ignore routes_arr = np.array(routes) @@ -74,12 +75,18 @@ def get_utterances(self) -> List[Utterance]: return [] return [Utterance.from_tuple(x) for x in zip(self.routes, self.utterances)] - def describe(self) -> Dict: - return { - "type": self.type, - "dimensions": self.index.shape[1] if self.index is not None else 0, - "vectors": self.index.shape[0] if self.index is not None else 0, - } + def describe(self) -> IndexConfig: + return IndexConfig( + type=self.type, + dimensions=self.index.shape[1] if self.index is not None else 0, + vectors=self.index.shape[0] if self.index is not None else 0, + ) + + def is_ready(self) -> bool: + """ + Checks if the index is ready to be used. + """ + return self.index is not None and self.routes is not None def query( self, diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 7e7def05..f885fbf7 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -10,7 +10,7 @@ import numpy as np from pydantic import BaseModel, Field -from semantic_router.index.base import BaseIndex +from semantic_router.index.base import BaseIndex, IndexConfig from semantic_router.schema import ConfigParameter, SparseEmbedding from semantic_router.utils.logger import logger @@ -19,10 +19,59 @@ def clean_route_name(route_name: str) -> str: return route_name.strip().replace(" ", "-") +def build_records( + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None, + metadata_list: List[Dict[str, Any]] = [], + sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None, +) -> List[Dict]: + if function_schemas is None: + function_schemas = [{}] * len(embeddings) + if sparse_embeddings is None: + vectors_to_upsert = [ + PineconeRecord( + values=vector, + route=route, + utterance=utterance, + function_schema=json.dumps(function_schema), + metadata=metadata, + ).to_dict() + for vector, route, utterance, function_schema, metadata in zip( + embeddings, + routes, + utterances, + function_schemas, + metadata_list, + ) + ] + else: + vectors_to_upsert = [ + PineconeRecord( + values=vector, + sparse_values=sparse_emb.to_pinecone(), + route=route, + utterance=utterance, + function_schema=json.dumps(function_schema), + metadata=metadata, + ).to_dict() + for vector, route, utterance, function_schema, metadata, sparse_emb in zip( + embeddings, + routes, + utterances, + function_schemas, + metadata_list, + sparse_embeddings, + ) + ] + return vectors_to_upsert + + class PineconeRecord(BaseModel): id: str = "" values: List[float] - sparse_values: Optional[dict[int, float]] = None + sparse_values: Optional[dict[str, list]] = None route: str utterance: str function_schema: str = "{}" @@ -49,10 +98,7 @@ def to_dict(self): "metadata": self.metadata, } if self.sparse_values: - d["sparse_values"] = { - "indices": list(self.sparse_values.keys()), - "values": list(self.sparse_values.values()), - } + d["sparse_values"] = self.sparse_values return d @@ -63,7 +109,7 @@ class PineconeIndex(BaseIndex): dimensions: Union[int, None] = None metric: str = "dotproduct" cloud: str = "aws" - region: str = "us-west-2" + region: str = "us-east-1" host: str = "" client: Any = Field(default=None, exclude=True) async_client: Any = Field(default=None, exclude=True) @@ -79,7 +125,7 @@ def __init__( dimensions: Optional[int] = None, metric: str = "dotproduct", cloud: str = "aws", - region: str = "us-west-2", + region: str = "us-east-1", host: str = "", namespace: Optional[str] = "", base_url: Optional[str] = "https://api.pinecone.io", @@ -99,6 +145,10 @@ def __init__( self.api_key = api_key or os.getenv("PINECONE_API_KEY") self.base_url = base_url + logger.warning( + "Default region changed from us-west-2 to us-east-1 in v0.1.0.dev6" + ) + if self.api_key is None: raise ValueError("Pinecone API key is required.") @@ -236,17 +286,6 @@ def _batch_upsert(self, batch: List[Dict]): else: raise ValueError("Index is None, could not upsert.") - async def _async_batch_upsert(self, batch: List[Dict]): - """Helper method for upserting a single batch of records asynchronously. - - :param batch: The batch of records to upsert. - :type batch: List[Dict] - """ - if self.index is not None: - await self.index.upsert(vectors=batch, namespace=self.namespace) - else: - raise ValueError("Index is None, could not upsert.") - def add( self, embeddings: List[List[float]], @@ -255,34 +294,21 @@ def add( function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None, metadata_list: List[Dict[str, Any]] = [], batch_size: int = 100, - sparse_embeddings: Optional[Optional[List[dict[int, float]]]] = None, + sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None, + **kwargs, ): """Add vectors to Pinecone in batches.""" if self.index is None: self.dimensions = self.dimensions or len(embeddings[0]) self.index = self._init_index(force_create=True) - if function_schemas is None: - function_schemas = [{}] * len(embeddings) - if sparse_embeddings is None: - sparse_embeddings = [{}] * len(embeddings) - vectors_to_upsert = [ - PineconeRecord( - values=vector, - sparse_values=sparse_dict, - route=route, - utterance=utterance, - function_schema=json.dumps(function_schema), - metadata=metadata, - ).to_dict() - for vector, route, utterance, function_schema, metadata, sparse_dict in zip( - embeddings, - routes, - utterances, - function_schemas, - metadata_list, - sparse_embeddings, - ) - ] + vectors_to_upsert = build_records( + embeddings=embeddings, + routes=routes, + utterances=utterances, + function_schemas=function_schemas, + metadata_list=metadata_list, + sparse_embeddings=sparse_embeddings, + ) for i in range(0, len(vectors_to_upsert), batch_size): batch = vectors_to_upsert[i : i + batch_size] @@ -296,38 +322,28 @@ async def aadd( function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None, metadata_list: List[Dict[str, Any]] = [], batch_size: int = 100, - sparse_embeddings: Optional[Optional[List[dict[int, float]]]] = None, + sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None, + **kwargs, ): """Add vectors to Pinecone in batches.""" if self.index is None: self.dimensions = self.dimensions or len(embeddings[0]) self.index = await self._init_async_index(force_create=True) - if function_schemas is None: - function_schemas = [{}] * len(embeddings) - if sparse_embeddings is None: - sparse_embeddings = [{}] * len(embeddings) - vectors_to_upsert = [ - PineconeRecord( - values=vector, - sparse_values=sparse_dict, - route=route, - utterance=utterance, - function_schema=json.dumps(function_schema), - metadata=metadata, - ).to_dict() - for vector, route, utterance, function_schema, metadata, sparse_dict in zip( - embeddings, - routes, - utterances, - function_schemas, - metadata_list, - sparse_embeddings, - ) - ] + vectors_to_upsert = build_records( + embeddings=embeddings, + routes=routes, + utterances=utterances, + function_schemas=function_schemas, + metadata_list=metadata_list, + sparse_embeddings=sparse_embeddings, + ) for i in range(0, len(vectors_to_upsert), batch_size): batch = vectors_to_upsert[i : i + batch_size] - await self._async_batch_upsert(batch) + await self._async_upsert( + vectors=batch, + namespace=self.namespace or "", + ) def _remove_and_sync(self, routes_to_delete: dict): for route, utterances in routes_to_delete.items(): @@ -433,16 +449,26 @@ def delete(self, route_name: str): def delete_all(self): self.index.delete(delete_all=True, namespace=self.namespace) - def describe(self) -> Dict: + def describe(self) -> IndexConfig: if self.index is not None: stats = self.index.describe_index_stats() - return { - "type": self.type, - "dimensions": stats["dimension"], - "vectors": stats["namespaces"][self.namespace]["vector_count"], - } + return IndexConfig( + type=self.type, + dimensions=stats["dimension"], + vectors=stats["namespaces"][self.namespace]["vector_count"], + ) else: - raise ValueError("Index is None, cannot describe index stats.") + return IndexConfig( + type=self.type, + dimensions=self.dimensions or 0, + vectors=0, + ) + + def is_ready(self) -> bool: + """ + Checks if the index is ready to be used. + """ + return self.index is not None def query( self, @@ -505,7 +531,7 @@ def _read_config(self, field: str, scope: str | None = None) -> ConfigParameter: ids=[config_id], namespace="sr_config", ) - if config_record["vectors"]: + if config_record.get("vectors"): return ConfigParameter( field=field, value=config_record["vectors"][config_id]["metadata"]["value"], @@ -522,6 +548,51 @@ def _read_config(self, field: str, scope: str | None = None) -> ConfigParameter: scope=scope, ) + async def _async_read_config( + self, field: str, scope: str | None = None + ) -> ConfigParameter: + """Read a config parameter from the index asynchronously. + + :param field: The field to read. + :type field: str + :param scope: The scope to read. + :type scope: str | None + :return: The config parameter that was read. + :rtype: ConfigParameter + """ + scope = scope or self.namespace + if self.index is None: + return ConfigParameter( + field=field, + value="", + scope=scope, + ) + config_id = f"{field}#{scope}" + logger.warning(f"JBTEMP Pinecone config id: {config_id}") + config_record = await self._async_fetch_metadata( + vector_id=config_id, namespace="sr_config" + ) + logger.warning(f"JBTEMP Pinecone config record: {config_record}") + if config_record: + try: + return ConfigParameter( + field=field, + value=config_record["value"], + created_at=config_record["created_at"], + scope=scope, + ) + except KeyError: + raise ValueError( + f"Found invalid config record during sync: {config_record}" + ) + else: + logger.warning(f"Configuration for {field} parameter not found in index.") + return ConfigParameter( + field=field, + value="", + scope=scope, + ) + def _write_config(self, config: ConfigParameter) -> ConfigParameter: """Method to write a config parameter to the remote Pinecone index. @@ -550,8 +621,10 @@ async def _async_write_config(self, config: ConfigParameter) -> ConfigParameter: raise ValueError("Index has not been initialized.") if self.dimensions is None: raise ValueError("Must set PineconeIndex.dimensions before writing config.") - self.index.upsert( - vectors=[config.to_pinecone(dimensions=self.dimensions)], + pinecone_config = config.to_pinecone(dimensions=self.dimensions) + logger.warning(f"JBTEMP Pinecone config to upsert: {pinecone_config}") + await self._async_upsert( + vectors=[pinecone_config], namespace="sr_config", ) return config @@ -662,11 +735,14 @@ async def _async_upsert( "vectors": vectors, "namespace": namespace, } + logger.warning(f"JBTEMP Pinecone upsert params: {params}") async with self.async_client.post( - f"{self.base_url}/vectors/upsert", + f"https://{self.host}/vectors/upsert", json=params, ) as response: - return await response.json(content_type=None) + res = await response.json(content_type=None) + logger.warning(f"JBTEMP Pinecone upsert response: {res}") + return res async def _async_create_index( self, @@ -684,7 +760,6 @@ async def _async_create_index( } async with self.async_client.post( f"{self.base_url}/indexes", - headers={"Api-Key": self.api_key}, json=params, ) as response: return await response.json(content_type=None) @@ -695,7 +770,8 @@ async def _async_delete(self, ids: list[str], namespace: str = ""): "namespace": namespace, } async with self.async_client.post( - f"{self.base_url}/vectors/delete", json=params + f"https://{self.host}/vectors/delete", + json=params, ) as response: return await response.json(content_type=None) @@ -765,12 +841,18 @@ async def _async_get_all( return all_vector_ids, metadata - async def _async_fetch_metadata(self, vector_id: str) -> dict: + async def _async_fetch_metadata( + self, + vector_id: str, + namespace: str | None = None, + ) -> dict: """Fetch metadata for a single vector ID asynchronously using the async_client. :param vector_id: The ID of the vector to fetch metadata for. :type vector_id: str + :param namespace: The namespace to fetch metadata for. + :type namespace: str | None :return: A dictionary containing the metadata for the vector. :rtype: dict """ @@ -781,8 +863,11 @@ async def _async_fetch_metadata(self, vector_id: str) -> dict: params = { "ids": [vector_id], } + logger.warning(f"JBTEMP Pinecone fetch params: {params}") - if self.namespace: + if namespace: + params["namespace"] = [namespace] + elif self.namespace: params["namespace"] = [self.namespace] headers = { diff --git a/semantic_router/index/postgres.py b/semantic_router/index/postgres.py index 71ea32e8..6f4a9f2a 100644 --- a/semantic_router/index/postgres.py +++ b/semantic_router/index/postgres.py @@ -6,7 +6,7 @@ import numpy as np from pydantic import BaseModel, Field -from semantic_router.index.base import BaseIndex +from semantic_router.index.base import BaseIndex, IndexConfig from semantic_router.schema import ConfigParameter, Metric, SparseEmbedding from semantic_router.utils.logger import logger @@ -273,6 +273,7 @@ def add( utterances: List[str], function_schemas: Optional[List[Dict[str, Any]]] = None, metadata_list: List[Dict[str, Any]] = [], + **kwargs, ) -> None: """ Adds vectors to the index. @@ -323,17 +324,21 @@ def delete(self, route_name: str) -> None: cur.execute(f"DELETE FROM {table_name} WHERE route = '{route_name}'") self.conn.commit() - def describe(self) -> Dict: + def describe(self) -> IndexConfig: """ Describes the index by returning its type, dimensions, and total vector count. - :return: A dictionary containing the index's type, dimensions, and total vector count. - :rtype: Dict - :raises TypeError: If the database connection is not established. + :return: An IndexConfig object containing the index's type, dimensions, and total vector count. + :rtype: IndexConfig """ table_name = self._get_table_name() if not isinstance(self.conn, psycopg2.extensions.connection): - raise TypeError("Index has not established a connection to Postgres") + logger.warning("Index has not established a connection to Postgres") + return IndexConfig( + type=self.type, + dimensions=self.dimensions or 0, + vectors=0, + ) with self.conn.cursor() as cur: cur.execute(f"SELECT COUNT(*) FROM {table_name}") count = cur.fetchone() @@ -341,11 +346,17 @@ def describe(self) -> Dict: count = 0 else: count = count[0] # Extract the actual count from the tuple - return { - "type": self.type, - "dimensions": self.dimensions, - "total_vector_count": count, - } + return IndexConfig( + type=self.type, + dimensions=self.dimensions or 0, + vectors=count, + ) + + def is_ready(self) -> bool: + """ + Checks if the index is ready to be used. + """ + return isinstance(self.conn, psycopg2.extensions.connection) def query( self, diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 1b23753b..5b2eac80 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -3,7 +3,7 @@ import numpy as np from pydantic import Field -from semantic_router.index.base import BaseIndex +from semantic_router.index.base import BaseIndex, IndexConfig from semantic_router.schema import ConfigParameter, Metric, SparseEmbedding, Utterance from semantic_router.utils.logger import logger @@ -170,6 +170,7 @@ def add( function_schemas: Optional[List[Dict[str, Any]]] = None, metadata_list: List[Dict[str, Any]] = [], batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, + **kwargs, ): self.dimensions = self.dimensions or len(embeddings[0]) self._init_collection() @@ -195,6 +196,10 @@ def get_utterances(self) -> List[Utterance]: List[Tuple]: A list of (route_name, utterance, function_schema, metadata) objects. """ + # Check if collection exists first + if not self.client.collection_exists(self.index_name): + return [] + from qdrant_client import grpc results = [] @@ -245,14 +250,20 @@ def delete(self, route_name: str): ), ) - def describe(self) -> Dict: + def describe(self) -> IndexConfig: collection_info = self.client.get_collection(self.index_name) - return { - "type": self.type, - "dimensions": collection_info.config.params.vectors.size, - "vectors": collection_info.points_count, - } + return IndexConfig( + type=self.type, + dimensions=collection_info.config.params.vectors.size, + vectors=collection_info.points_count, + ) + + def is_ready(self) -> bool: + """ + Checks if the index is ready to be used. + """ + return self.client.collection_exists(self.index_name) def query( self, diff --git a/semantic_router/routers/base.py b/semantic_router/routers/base.py index 0d47f939..18cb21ba 100644 --- a/semantic_router/routers/base.py +++ b/semantic_router/routers/base.py @@ -15,6 +15,7 @@ from semantic_router.index.base import BaseIndex from semantic_router.index.local import LocalIndex from semantic_router.index.pinecone import PineconeIndex +from semantic_router.index.qdrant import QdrantIndex from semantic_router.llms import BaseLLM, OpenAILLM from semantic_router.route import Route from semantic_router.schema import ( @@ -421,6 +422,8 @@ def __call__( simulate_static: bool = False, route_filter: Optional[List[str]] = None, ) -> RouteChoice: + if not self.index.is_ready(): + raise ValueError("Index is not ready.") # if no vector provided, encode text to get vector if vector is None: if text is None: @@ -428,8 +431,19 @@ def __call__( vector = self._encode(text=[text]) # convert to numpy array if not already vector = xq_reshape(vector) - # calculate semantics - route, top_class_scores = self._retrieve_top_route(vector, route_filter) + # get scores and routes + scores, routes = self.index.query( + vector=vector[0], top_k=self.top_k, route_filter=route_filter + ) + query_results = [ + {"route": d, "score": s.item()} for d, s in zip(routes, scores) + ] + # decide most relevant routes + top_class, top_class_scores = self._semantic_classify( + query_results=query_results + ) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) passed = self._check_threshold(top_class_scores, route) if passed and route is not None and not simulate_static: if route.function_schemas and text is None: @@ -466,6 +480,9 @@ async def acall( simulate_static: bool = False, route_filter: Optional[List[str]] = None, ) -> RouteChoice: + if not self.index.is_ready(): + # TODO: need async version for qdrant + raise ValueError("Index is not ready.") # if no vector provided, encode text to get vector if vector is None: if text is None: @@ -473,10 +490,19 @@ async def acall( vector = await self._async_encode(text=[text]) # convert to numpy array if not already vector = xq_reshape(vector) - # calculate semantics - route, top_class_scores = await self._async_retrieve_top_route( - vector, route_filter + # get scores and routes + scores, routes = await self.index.aquery( + vector=vector[0], top_k=self.top_k, route_filter=route_filter + ) + query_results = [ + {"route": d, "score": s.item()} for d, s in zip(routes, scores) + ] + # decide most relevant routes + top_class, top_class_scores = await self._async_semantic_classify( + query_results=query_results ) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) passed = self._check_threshold(top_class_scores, route) if passed and route is not None and not simulate_static: if route.function_schemas and text is None: @@ -503,65 +529,19 @@ async def acall( # if no route passes threshold, return empty route choice return RouteChoice() - # TODO: add multiple routes return to __call__ and acall - @deprecated("This method is deprecated. Use `__call__` instead.") - def retrieve_multiple_routes( - self, - text: Optional[str] = None, - vector: Optional[List[float] | np.ndarray] = None, - ) -> List[RouteChoice]: - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector = self._encode(text=[text]) - # convert to numpy array if not already - vector = xq_reshape(vector) - # get relevant utterances - results = self._retrieve(xq=vector) - # decide most relevant routes - categories_with_scores = self._semantic_classify_multiple_routes(results) - return [ - RouteChoice(name=category, similarity_score=score) - for category, score in categories_with_scores - ] - - # route_choices = [] - # TODO JB: do we need this check? Maybe we should be returning directly - # for category, score in categories_with_scores: - # route = self.check_for_matching_routes(category) - # if route: - # route_choice = RouteChoice(name=route.name, similarity_score=score) - # route_choices.append(route_choice) + def _index_ready(self) -> bool: + """Method to check if the index is ready to be used. - # return route_choices - - def _retrieve_top_route( - self, vector: np.ndarray, route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - """ - Retrieve the top matching route based on the given vector. - Returns a tuple of the route (if any) and the scores of the top class. + :return: True if the index is ready, False otherwise. + :rtype: bool """ - # get relevant results (scores and routes) - results = self._retrieve(xq=vector, top_k=self.top_k, route_filter=route_filter) - # decide most relevant routes - top_class, top_class_scores = self._semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - - async def _async_retrieve_top_route( - self, vector: np.ndarray, route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - # get relevant results (scores and routes) - results = await self._async_retrieve( - xq=vector, top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = await self._async_semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores + if self.index.index is None or self.routes is None: + return False + if isinstance(self.index, QdrantIndex): + info = self.index.describe() + if info.vectors == 0: + return False + return True def sync(self, sync_mode: str, force: bool = False, wait: int = 0) -> List[str]: """Runs a sync of the local routes with the remote index. @@ -1116,26 +1096,6 @@ async def _async_encode(self, text: list[str]) -> Any: # TODO: should encode "content" rather than text raise NotImplementedError("This method should be implemented by subclasses.") - def _retrieve( - self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None - ) -> List[Dict]: - """Given a query vector, retrieve the top_k most similar records.""" - # get scores and routes - scores, routes = self.index.query( - vector=xq[0], top_k=top_k, route_filter=route_filter - ) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - - async def _async_retrieve( - self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None - ) -> List[Dict]: - """Given a query vector, retrieve the top_k most similar records.""" - # get scores and routes - scores, routes = await self.index.aquery( - vector=xq[0], top_k=top_k, route_filter=route_filter - ) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - def _set_aggregation_method(self, aggregation: str = "sum"): # TODO is this really needed? if aggregation == "sum": @@ -1149,6 +1109,7 @@ def _set_aggregation_method(self, aggregation: str = "sum"): f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." ) + # TODO JB allow return of multiple routes def _semantic_classify(self, query_results: List[Dict]) -> Tuple[str, List[float]]: """Classify the query results into a single class based on the highest total score. If no classification is found, return an empty string and an empty list. @@ -1216,6 +1177,7 @@ def get(self, name: str) -> Optional[Route]: logger.error(f"Route `{name}` not found") return None + @deprecated("This method is deprecated. Use `semantic_classify` instead.") def _semantic_classify_multiple_routes( self, query_results: List[Dict] ) -> List[Tuple[str, float]]: @@ -1243,6 +1205,7 @@ def group_scores_by_class( self, query_results: List[Dict] ) -> Dict[str, List[float]]: scores_by_class: Dict[str, List[float]] = {} + logger.warning(f"JBTEMP: {query_results=}") for result in query_results: score = result["score"] route = result["route"] @@ -1279,6 +1242,7 @@ def _pass_threshold(self, scores: List[float], threshold: float | None) -> bool: if threshold is None: return True if scores: + # TODO JB is this correct? return max(scores) > threshold else: return False @@ -1374,7 +1338,7 @@ def fit( emb = np.array(self.encoder(X[i : i + batch_size])) Xq.extend(emb) # initial eval (we will iterate from here) - best_acc = self._vec_evaluate(Xq=np.array(Xq), y=y) + best_acc = self._vec_evaluate(Xq_d=np.array(Xq), y=y) best_thresholds = self.get_thresholds() # begin fit for _ in (pbar := tqdm(range(max_iter), desc="Training")): @@ -1387,7 +1351,7 @@ def fit( # update current route layer self._update_thresholds(route_thresholds=thresholds) # evaluate - acc = self._vec_evaluate(Xq=Xq, y=y) + acc = self._vec_evaluate(Xq_d=Xq, y=y) # update best if acc > best_acc: best_acc = acc @@ -1408,20 +1372,22 @@ def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: emb = np.array(self.encoder(X[i : i + batch_size])) Xq.extend(emb) - accuracy = self._vec_evaluate(Xq=np.array(Xq), y=y) + accuracy = self._vec_evaluate(Xq_d=np.array(Xq), y=y) return accuracy - def _vec_evaluate(self, Xq: Union[List[float], Any], y: List[str]) -> float: + def _vec_evaluate( + self, Xq_d: Union[List[float], Any], y: List[str], **kwargs + ) -> float: """ Evaluate the accuracy of the route selection. """ correct = 0 - for xq, target_route in zip(Xq, y): + for xq, target_route in zip(Xq_d, y): # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. route_choice = self(vector=xq, simulate_static=True) if route_choice.name == target_route: correct += 1 - accuracy = correct / len(Xq) + accuracy = correct / len(Xq_d) return accuracy def _get_route_names(self) -> List[str]: diff --git a/semantic_router/routers/hybrid.py b/semantic_router/routers/hybrid.py index 54901d5e..bb2f3641 100644 --- a/semantic_router/routers/hybrid.py +++ b/semantic_router/routers/hybrid.py @@ -1,4 +1,5 @@ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +from tqdm.auto import tqdm import asyncio from pydantic import Field @@ -14,7 +15,7 @@ from semantic_router.index import BaseIndex, HybridLocalIndex from semantic_router.schema import RouteChoice, SparseEmbedding, Utterance from semantic_router.utils.logger import logger -from semantic_router.routers.base import BaseRouter, xq_reshape +from semantic_router.routers.base import BaseRouter, xq_reshape, threshold_random_search from semantic_router.llms import BaseLLM @@ -65,6 +66,21 @@ def __init__( if self.auto_sync: self._init_index_state() + def _set_score_threshold(self): + """Set the score threshold for the HybridRouter. Unlike the base router the + encoder score threshold is not used directly. Instead, the dense encoder + score threshold is multiplied by the alpha value, resulting in a lower + score threshold. This is done to account for the difference in returned + scores from the hybrid router. + """ + if self.encoder.score_threshold is not None: + self.score_threshold = self.encoder.score_threshold * self.alpha + if self.score_threshold is None: + logger.warning( + "No score threshold value found in encoder. Using the default " + "'None' value can lead to unexpected results." + ) + def add(self, routes: List[Route] | Route): """Add a route to the local HybridRouter and index. @@ -77,6 +93,7 @@ def add(self, routes: List[Route] | Route): if current_remote_hash.value == "": # if remote hash is empty, the index is to be initialized current_remote_hash = current_local_hash + logger.warning(f"JBTEMP: {routes}") if isinstance(routes, Route): routes = [routes] # create embeddings for all routes @@ -92,7 +109,7 @@ def add(self, routes: List[Route] | Route): utterances=all_utterances, function_schemas=all_function_schemas, metadata_list=all_metadata, - sparse_embeddings=sparse_emb, # type: ignore + sparse_embeddings=sparse_emb, ) self.routes.extend(routes) @@ -129,7 +146,7 @@ def _execute_sync_strategy(self, strategy: Dict[str, Dict[str, List[Utterance]]] utt.function_schemas for utt in strategy["remote"]["upsert"] # type: ignore ], metadata_list=[utt.metadata for utt in strategy["remote"]["upsert"]], - sparse_embeddings=sparse_emb, # type: ignore + sparse_embeddings=sparse_emb, ) if strategy["local"]["delete"]: self._local_delete(utterances=strategy["local"]["delete"]) @@ -202,6 +219,8 @@ def __call__( route_filter: Optional[List[str]] = None, sparse_vector: dict[int, float] | SparseEmbedding | None = None, ) -> RouteChoice: + if not self.index.is_ready(): + raise ValueError("Index is not ready.") potential_sparse_vector: List[SparseEmbedding] | None = None # if no vector provided, encode text to get vector if vector is None: @@ -220,17 +239,23 @@ def __call__( raise ValueError("Sparse vector is required for HybridLocalIndex.") # TODO: add alpha as a parameter scores, route_names = self.index.query( - vector=vector, + vector=vector[0], top_k=self.top_k, route_filter=route_filter, sparse_vector=sparse_vector, ) + logger.warning(f"JBTEMP: {scores}") + logger.warning(f"JBTEMP: {route_names}") + query_results = [ + {"route": d, "score": s.item()} for d, s in zip(route_names, scores) + ] + # TODO JB we should probably make _semantic_classify consume arrays rather than + # needing to convert to list here top_class, top_class_scores = self._semantic_classify( - [ - {"score": score, "route": route} - for score, route in zip(scores, route_names) - ] + query_results=query_results ) + logger.warning(f"JBTEMP: {top_class}") + logger.warning(f"JBTEMP: {top_class_scores}") passed = self._pass_threshold(top_class_scores, self.score_threshold) if passed: return RouteChoice(name=top_class, similarity_score=max(top_class_scores)) @@ -252,3 +277,105 @@ def _convex_scaling( ) ) return scaled_dense, scaled_sparse + + def fit( + self, + X: List[str], + y: List[str], + batch_size: int = 500, + max_iter: int = 500, + local_execution: bool = False, + ): + original_index = self.index + if self.sparse_encoder is None: + raise ValueError("Sparse encoder is not set.") + if local_execution: + # Switch to a local index for fitting + from semantic_router.index.hybrid_local import HybridLocalIndex + + remote_routes = self.index.get_utterances() + # TODO Enhance by retrieving directly the vectors instead of embedding all utterances again + routes, utterances, function_schemas, metadata = map( + list, zip(*remote_routes) + ) + embeddings = self.encoder(utterances) + sparse_embeddings = self.sparse_encoder(utterances) + self.index = HybridLocalIndex() + self.index.add( + embeddings=embeddings, + sparse_embeddings=sparse_embeddings, + routes=routes, + utterances=utterances, + metadata_list=metadata, + ) + + # convert inputs into array + Xq_d: List[List[float]] = [] + Xq_s: List[SparseEmbedding] = [] + for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): + emb_d = np.array(self.encoder(X[i : i + batch_size])) + # TODO JB: for some reason the sparse encoder is receiving a tuple like `("Hello",)` + print(f"JBTEMP: {X[i : i + batch_size]}") + emb_s = self.sparse_encoder(X[i : i + batch_size]) + Xq_d.extend(emb_d) + Xq_s.extend(emb_s) + # initial eval (we will iterate from here) + best_acc = self._vec_evaluate(Xq_d=np.array(Xq_d), Xq_s=Xq_s, y=y) + best_thresholds = self.get_thresholds() + # begin fit + for _ in (pbar := tqdm(range(max_iter), desc="Training")): + pbar.set_postfix({"acc": round(best_acc, 2)}) + # Find the best score threshold for each route + thresholds = threshold_random_search( + route_layer=self, + search_range=0.8, + ) + # update current route layer + self._update_thresholds(route_thresholds=thresholds) + # evaluate + acc = self._vec_evaluate(Xq_d=np.array(Xq_d), Xq_s=Xq_s, y=y) + # update best + if acc > best_acc: + best_acc = acc + best_thresholds = thresholds + # update route layer to best thresholds + self._update_thresholds(route_thresholds=best_thresholds) + + if local_execution: + # Switch back to the original index + self.index = original_index + + def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: + """ + Evaluate the accuracy of the route selection. + """ + if self.sparse_encoder is None: + raise ValueError("Sparse encoder is not set.") + Xq_d: List[List[float]] = [] + Xq_s: List[SparseEmbedding] = [] + for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): + emb_d = np.array(self.encoder(X[i : i + batch_size])) + emb_s = self.sparse_encoder(X[i : i + batch_size]) + Xq_d.extend(emb_d) + Xq_s.extend(emb_s) + + accuracy = self._vec_evaluate(Xq_d=np.array(Xq_d), Xq_s=Xq_s, y=y) + return accuracy + + def _vec_evaluate( # type: ignore + self, + Xq_d: Union[List[float], Any], + Xq_s: list[SparseEmbedding], + y: List[str], + ) -> float: + """ + Evaluate the accuracy of the route selection. + """ + correct = 0 + for xq_d, xq_s, target_route in zip(Xq_d, Xq_s, y): + # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. + route_choice = self(vector=xq_d, sparse_vector=xq_s, simulate_static=True) + if route_choice.name == target_route: + correct += 1 + accuracy = correct / len(Xq_d) + return accuracy diff --git a/tests/unit/test_hybrid_layer.py b/tests/unit/test_hybrid_layer.py deleted file mode 100644 index b12ea2f5..00000000 --- a/tests/unit/test_hybrid_layer.py +++ /dev/null @@ -1,270 +0,0 @@ -import pytest - -from semantic_router.encoders import ( - AzureOpenAIEncoder, - DenseEncoder, - BM25Encoder, - CohereEncoder, - OpenAIEncoder, - TfidfEncoder, -) -from semantic_router.routers import HybridRouter -from semantic_router.route import Route - - -UTTERANCES = [ - "Hello we need this text to be a little longer for our sparse encoders", - "In this case they need to learn from recurring tokens, ie words.", - "We give ourselves several examples from our encoders to learn from.", - "But given this is only an example we don't need too many", - "Just enough to test that our sparse encoders work as expected", -] - - -def mock_encoder_call(utterances): - # Define a mapping of utterances to return values - mock_responses = { - UTTERANCES[0]: [0.1, 0.2, 0.3], - UTTERANCES[1]: [0.4, 0.5, 0.6], - UTTERANCES[2]: [0.7, 0.8, 0.9], - UTTERANCES[3]: [1.0, 1.1, 1.2], - UTTERANCES[4]: [1.3, 1.4, 1.5], - } - return [mock_responses.get(u, [0, 0, 0]) for u in utterances] - - -@pytest.fixture -def base_encoder(mocker): - mock_base_encoder = DenseEncoder(name="test-encoder", score_threshold=0.5) - mocker.patch.object(DenseEncoder, "__call__", return_value=[[0.1, 0.2, 0.3]]) - return mock_base_encoder - - -@pytest.fixture -def cohere_encoder(mocker): - mocker.patch.object(CohereEncoder, "__call__", side_effect=mock_encoder_call) - return CohereEncoder(name="test-cohere-encoder", cohere_api_key="test_api_key") - - -@pytest.fixture -def openai_encoder(mocker): - mocker.patch.object(OpenAIEncoder, "__call__", side_effect=mock_encoder_call) - return OpenAIEncoder(name="text-embedding-3-small", openai_api_key="test_api_key") - - -@pytest.fixture -def azure_encoder(mocker): - mocker.patch.object(AzureOpenAIEncoder, "__call__", side_effect=mock_encoder_call) - return AzureOpenAIEncoder( - deployment_name="test-deployment", - azure_endpoint="test_endpoint", - api_key="test_api_key", - api_version="test_version", - model="test_model", - ) - - -@pytest.fixture -def bm25_encoder(): - # mocker.patch.object(BM25Encoder, "__call__", side_effect=mock_encoder_call) - return BM25Encoder(name="test-bm25-encoder") - - -@pytest.fixture -def tfidf_encoder(): - # mocker.patch.object(TfidfEncoder, "__call__", side_effect=mock_encoder_call) - return TfidfEncoder(name="test-tfidf-encoder") - - -@pytest.fixture -def routes(): - return [ - Route(name="Route 1", utterances=[UTTERANCES[0], UTTERANCES[1]]), - Route(name="Route 2", utterances=[UTTERANCES[2], UTTERANCES[3], UTTERANCES[4]]), - ] - - -sparse_encoder = TfidfEncoder() -sparse_encoder.fit( - [ - Route( - name="Route 1", - utterances=[ - "The quick brown fox jumps over the lazy dog", - "some other useful text containing words like fox and dog", - ], - ), - Route(name="Route 2", utterances=["Hello, world!"]), - ] -) - - -class TestHybridRouter: - def test_initialization(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, - sparse_encoder=sparse_encoder, - routes=routes, - top_k=10, - alpha=0.8, - ) - assert route_layer.index is not None and route_layer.routes is not None - assert openai_encoder.score_threshold == 0.3 - assert route_layer.score_threshold == 0.3 - assert route_layer.top_k == 10 - assert route_layer.alpha == 0.8 - assert route_layer.index.route_names is None - assert len(route_layer.routes) == 2 - - def test_initialization_different_encoders(self, cohere_encoder, openai_encoder): - route_layer_cohere = HybridRouter( - encoder=cohere_encoder, sparse_encoder=sparse_encoder - ) - assert route_layer_cohere.score_threshold == 0.3 - - route_layer_openai = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder - ) - assert route_layer_openai.score_threshold == 0.3 - - def test_add_route(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder - ) - route_layer.add(routes=routes[0]) - assert route_layer.index is not None, "route_layer.index is None" - assert route_layer.routes is not None, "route_layer.routes is None" - assert len(route_layer.routes) == 1, "route_layer.routes is not 1" - - def test_add_multiple_routes(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder - ) - route_layer.add(routes=routes) - assert route_layer.index is not None, "route_layer.index is None" - assert route_layer.routes is not None, "route_layer.routes is None" - assert len(route_layer.routes) == 2, "route_layer.routes is not 2" - - def test_query_and_classification(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, - sparse_encoder=sparse_encoder, - routes=routes, - auto_sync="local", - ) - route_layer.set_threshold(0.0) - query_result = route_layer(UTTERANCES[0]) - assert query_result.name in ["Route 1", "Route 2"] - - def test_query_with_no_index(self, openai_encoder): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder - ) - assert isinstance(route_layer.sparse_encoder, BM25Encoder) or isinstance( - route_layer.sparse_encoder, TfidfEncoder - ), ( - f"route_layer.sparse_encoder is {route_layer.sparse_encoder.__class__.__name__} " - "not BM25Encoder or TfidfEncoder" - ) - assert route_layer("Anything").name is None - - def test_semantic_classify(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder, routes=routes - ) - classification, score = route_layer._semantic_classify( - [ - {"route": "Route 1", "score": 0.9}, - {"route": "Route 2", "score": 0.1}, - ] - ) - assert classification == "Route 1" - assert score == [0.9] - - def test_semantic_classify_multiple_routes(self, openai_encoder, routes): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder, routes=routes - ) - classification, score = route_layer._semantic_classify( - [ - {"route": "Route 1", "score": 0.9}, - {"route": "Route 2", "score": 0.1}, - {"route": "Route 1", "score": 0.8}, - ] - ) - assert classification == "Route 1" - assert score == [0.9, 0.8] - - def test_pass_threshold(self, openai_encoder): - route_layer = HybridRouter( - encoder=openai_encoder, sparse_encoder=sparse_encoder - ) - assert not route_layer._pass_threshold([], 0.5) - assert route_layer._pass_threshold([0.6, 0.7], 0.5) - - def test_failover_score_threshold(self, base_encoder): - route_layer = HybridRouter(encoder=base_encoder, sparse_encoder=sparse_encoder) - assert base_encoder.score_threshold == 0.50 - assert route_layer.score_threshold == 0.50 - - def test_add_route_tfidf(self, cohere_encoder, tfidf_encoder, routes): - hybrid_route_layer = HybridRouter( - encoder=cohere_encoder, - sparse_encoder=tfidf_encoder, - routes=routes[:-1], - auto_sync="local", - ) - hybrid_route_layer.add(routes=routes[-1]) - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] - assert hybrid_route_layer.index.sparse_index is not None, "sparse_index is None" - assert len(hybrid_route_layer.index.sparse_index) == len( - all_utterances - ), "sparse_index length mismatch" - - def test_setting_aggregation_methods(self, openai_encoder, routes): - for agg in ["sum", "mean", "max"]: - route_layer = HybridRouter( - encoder=openai_encoder, - sparse_encoder=sparse_encoder, - routes=routes, - aggregation=agg, - ) - assert route_layer.aggregation == agg - - def test_semantic_classify_multiple_routes_with_different_aggregation( - self, openai_encoder, routes - ): - route_scores = [ - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 2", "score": 0.4}, - {"route": "Route 2", "score": 0.6}, - {"route": "Route 2", "score": 0.8}, - {"route": "Route 3", "score": 0.1}, - {"route": "Route 3", "score": 1.0}, - ] - for agg in ["sum", "mean", "max"]: - route_layer = HybridRouter( - encoder=openai_encoder, - sparse_encoder=sparse_encoder, - routes=routes, - aggregation=agg, - ) - classification, score = route_layer._semantic_classify(route_scores) - - if agg == "sum": - assert classification == "Route 1" - assert score == [0.5, 0.5, 0.5, 0.5] - elif agg == "mean": - assert classification == "Route 2" - assert score == [0.4, 0.6, 0.8] - elif agg == "max": - assert classification == "Route 3" - assert score == [0.1, 1.0] - - -# Add more tests for edge cases and error handling as needed. diff --git a/tests/unit/test_router.py b/tests/unit/test_router.py index 1f743f1c..106aec4f 100644 --- a/tests/unit/test_router.py +++ b/tests/unit/test_router.py @@ -1,22 +1,28 @@ import importlib import os import tempfile -from unittest.mock import mock_open, patch +from openai._types import ResponseT +from unittest.mock import mock_open, patch, MagicMock from datetime import datetime import pytest import time from typing import Optional from semantic_router.encoders import DenseEncoder, CohereEncoder, OpenAIEncoder +from semantic_router.schema import SparseEmbedding from semantic_router.index.local import LocalIndex from semantic_router.index.pinecone import PineconeIndex from semantic_router.index.qdrant import QdrantIndex -from semantic_router.routers import RouterConfig, SemanticRouter -from semantic_router.llms.base import BaseLLM +from semantic_router.routers import RouterConfig, SemanticRouter, HybridRouter +from semantic_router.llms import BaseLLM, OpenAILLM from semantic_router.route import Route +from semantic_router.utils.logger import logger from platform import python_version -PINECONE_SLEEP = 6 +os.environ["OPENAI_API_KEY"] = "mock-api-key" + +PINECONE_SLEEP = 8 +RETRY_COUNT = 5 def mock_encoder_call(utterances): @@ -41,13 +47,16 @@ def init_index( index_cls, dimensions: Optional[int] = None, namespace: Optional[str] = "", + index_name: Optional[str] = None, ): """We use this function to initialize indexes with different names to avoid issues during testing. """ if index_cls is PineconeIndex: + # we specify different index names to avoid dimensionality issues between different encoders + index_name = TEST_ID if not index_name else f"{TEST_ID}-{index_name.lower()}" index = index_cls( - index_name=TEST_ID, dimensions=dimensions, namespace=namespace + index_name=index_name, dimensions=dimensions, namespace=namespace ) else: index = index_cls() @@ -108,13 +117,79 @@ def base_encoder(): @pytest.fixture def cohere_encoder(mocker): mocker.patch.object(CohereEncoder, "__call__", side_effect=mock_encoder_call) + + # Mock async call + async def async_mock_encoder_call(docs=None, utterances=None): + # Handle either docs or utterances parameter + texts = docs if docs is not None else utterances + return mock_encoder_call(texts) + + mocker.patch.object(CohereEncoder, "acall", side_effect=async_mock_encoder_call) return CohereEncoder(name="test-cohere-encoder", cohere_api_key="test_api_key") +# @pytest.fixture +# def openai_encoder(mocker): +# # Mock the OpenAI client creation and API calls +# mocker.patch("openai.OpenAI") +# # Mock the __call__ method +# mocker.patch.object(OpenAIEncoder, "__call__", side_effect=mock_encoder_call) + +# # Mock async call +# async def async_mock_encoder_call(docs=None, utterances=None): +# # Handle either docs or utterances parameter +# texts = docs if docs is not None else utterances +# return mock_encoder_call(texts) + +# mocker.patch.object(OpenAIEncoder, "acall", side_effect=async_mock_encoder_call) +# # Create and return the mocked encoder +# encoder = OpenAIEncoder(name="text-embedding-3-small") +# return encoder + +def sparse_encoder(mocker): + mocker.patch( + "semantic_router.encoders.bm25.BM25Encoder.__init__", + return_value=None + ) + mocker.patch( + "semantic_router.encoders.bm25.BM25Encoder.__call__", + return_value=[SparseEmbedding.from_dict({1: 0.5, 4: 0.8})] + ) + mocker.patch( + "semantic_router.encoders.bm25.BM25Encoder.fit", + return_value=None + ) + return + @pytest.fixture def openai_encoder(mocker): - mocker.patch.object(OpenAIEncoder, "__call__", side_effect=mock_encoder_call) - return OpenAIEncoder(name="text-embedding-3-small", openai_api_key="test_api_key") + # also mock sparse encoder + sparse_encoder(mocker) + # Mock the OpenAI client creation and API calls + mocker.patch("openai.OpenAI", return_value=MagicMock()) + mocker.patch( + "semantic_router.encoders.openai.OpenAIEncoder.__call__", + return_value=[[0.1,0.2,0.3]] + ) + encoder_mock = MagicMock(spec=OpenAIEncoder) + # Mock the __call__ method + encoder_mock.__call__ = mock_encoder_call + encoder = OpenAIEncoder(openai_api_key="mock_api_key") + encoder.__call__ = mock_encoder_call + return encoder + +@pytest.fixture +def mock_openai_llm(mocker): + # Mock the OpenAI LLM + mocker.patch.object(OpenAILLM, "__call__", return_value="mocked response") + + # also async + async def async_mock_llm_call(messages=None, **kwargs): + return "mocked response" + + mocker.patch.object(OpenAILLM, "acall", side_effect=async_mock_llm_call) + + return OpenAILLM(name="fake-model-v1") @pytest.fixture @@ -195,126 +270,410 @@ def get_test_indexes(): def get_test_encoders(): - encoders = [OpenAIEncoder] + encoders = [openai_encoder()] if importlib.util.find_spec("cohere") is not None: encoders.append(CohereEncoder) return encoders +def get_test_routers(): + routers = [SemanticRouter] + if importlib.util.find_spec("pinecone_text") is not None: + routers.append(HybridRouter) + return routers + + @pytest.mark.parametrize( - "index_cls,encoder_cls", + "index_cls,encoder_cls,router_cls", [ - (index, encoder) - for index in get_test_indexes() - for encoder in get_test_encoders() + (index, encoder, router) + for index in [LocalIndex]#get_test_indexes() + for encoder in ["openai_encoder"] + for router in get_test_routers() ], ) class TestIndexEncoders: - def test_initialization(self, routes, openai_encoder, index_cls, encoder_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=encoder_cls(), + def test_initialization(self, request, routes, index_cls, encoder_cls, router_cls): + encoder = request.getfixturevalue(encoder_cls) + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", top_k=10, ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - - assert openai_encoder.score_threshold == 0.3 - assert route_layer.score_threshold == 0.3 + score_threshold = route_layer.score_threshold + if isinstance(route_layer, HybridRouter): + assert score_threshold == encoder.score_threshold * route_layer.alpha + else: + assert score_threshold == encoder.score_threshold assert route_layer.top_k == 10 - assert len(route_layer.index) == 5 + # allow for 5 retries in case of index not being populated + count = 0 + while count < RETRY_COUNT: + try: + assert len(route_layer.index) == 5 + break + except Exception: + logger.warning(f"Index not populated, waiting for retry (try {count})") + time.sleep(PINECONE_SLEEP) + count += 1 assert ( len(set(route_layer._get_route_names())) if route_layer._get_route_names() is not None else 0 == 2 ) - def test_initialization_different_encoders(self, encoder_cls, index_cls): - index = init_index(index_cls) - encoder = encoder_cls() - route_layer = SemanticRouter(encoder=encoder, index=index) - assert route_layer.score_threshold == encoder.score_threshold + def test_initialization_different_encoders( + self, request, encoder_cls, index_cls, router_cls + ): + encoder = request.getfixturevalue(encoder_cls) + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, index=index) + score_threshold = route_layer.score_threshold + if isinstance(route_layer, HybridRouter): + assert score_threshold == encoder.score_threshold * route_layer.alpha + else: + assert score_threshold == encoder.score_threshold + + def test_initialization_no_encoder(self, request, index_cls, encoder_cls, router_cls): + # apply encoder mocks (but will not get used) + _ = request.getfixturevalue(encoder_cls) + route_layer_none = router_cls(encoder=None) + score_threshold = route_layer_none.score_threshold + if isinstance(route_layer_none, HybridRouter): + assert score_threshold == 0.3 * route_layer_none.alpha + else: + assert score_threshold == 0.3 + + +class TestRouterConfig: + def test_from_file_json(self, tmp_path): + # Create a temporary JSON file with layer configuration + config_path = tmp_path / "config.json" + config_path.write_text( + layer_json() + ) # Assuming layer_json() returns a valid JSON string + + # Load the RouterConfig from the temporary file + layer_config = RouterConfig.from_file(str(config_path)) + + # Assertions to verify the loaded configuration + assert layer_config.encoder_type == "cohere" + assert layer_config.encoder_name == "embed-english-v3.0" + assert len(layer_config.routes) == 2 + assert layer_config.routes[0].name == "politics" + + def test_from_file_yaml(self, tmp_path): + # Create a temporary YAML file with layer configuration + config_path = tmp_path / "config.yaml" + config_path.write_text( + layer_yaml() + ) # Assuming layer_yaml() returns a valid YAML string + + # Load the RouterConfig from the temporary file + layer_config = RouterConfig.from_file(str(config_path)) + + # Assertions to verify the loaded configuration + assert layer_config.encoder_type == "cohere" + assert layer_config.encoder_name == "embed-english-v3.0" + assert len(layer_config.routes) == 2 + assert layer_config.routes[0].name == "politics" + + def test_from_file_invalid_path(self): + with pytest.raises(FileNotFoundError) as excinfo: + RouterConfig.from_file("nonexistent_path.json") + assert "[Errno 2] No such file or directory: 'nonexistent_path.json'" in str( + excinfo.value + ) + + def test_from_file_unsupported_type(self, tmp_path): + # Create a temporary unsupported file + config_path = tmp_path / "config.unsupported" + config_path.write_text(layer_json()) + + with pytest.raises(ValueError) as excinfo: + RouterConfig.from_file(str(config_path)) + assert "Unsupported file type" in str(excinfo.value) + + def test_from_file_invalid_config(self, tmp_path): + # Define an invalid configuration JSON + invalid_config_json = """ + { + "encoder_type": "cohere", + "encoder_name": "embed-english-v3.0", + "routes": "This should be a list, not a string" + }""" + + # Write the invalid configuration to a temporary JSON file + config_path = tmp_path / "invalid_config.json" + with open(config_path, "w") as file: + file.write(invalid_config_json) + + # Patch the is_valid function to return False for this test + with patch("semantic_router.routers.base.is_valid", return_value=False): + # Attempt to load the RouterConfig from the temporary file + # and assert that it raises an exception due to invalid configuration + with pytest.raises(Exception) as excinfo: + RouterConfig.from_file(str(config_path)) + assert "Invalid config JSON or YAML" in str( + excinfo.value + ), "Loading an invalid configuration should raise an exception." + + def test_from_file_with_llm(self, tmp_path): + llm_config_json = """ + { + "encoder_type": "cohere", + "encoder_name": "embed-english-v3.0", + "routes": [ + { + "name": "llm_route", + "utterances": ["tell me a joke", "say something funny"], + "llm": { + "module": "semantic_router.llms.base", + "class": "BaseLLM", + "model": "fake-model-v1" + } + } + ] + }""" + + config_path = tmp_path / "config_with_llm.json" + with open(config_path, "w") as file: + file.write(llm_config_json) + + # Load the RouterConfig from the temporary file + layer_config = RouterConfig.from_file(str(config_path)) + + # Using BaseLLM because trying to create a usable Mock LLM is a nightmare. + assert isinstance( + layer_config.routes[0].llm, BaseLLM + ), "LLM should be instantiated and associated with the route based on the " + "config" + assert ( + layer_config.routes[0].llm.name == "fake-model-v1" + ), "LLM instance should have the 'name' attribute set correctly" + + def test_init(self): + layer_config = RouterConfig() + assert layer_config.routes == [] + + def test_to_file_json(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + with patch("builtins.open", mock_open()) as mocked_open: + layer_config.to_file("data/test_output.json") + mocked_open.assert_called_once_with("data/test_output.json", "w") + + def test_to_file_yaml(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + with patch("builtins.open", mock_open()) as mocked_open: + layer_config.to_file("data/test_output.yaml") + mocked_open.assert_called_once_with("data/test_output.yaml", "w") + + def test_to_file_invalid(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + with pytest.raises(ValueError): + layer_config.to_file("test_output.txt") + + def test_from_file_invalid(self): + with open("test.txt", "w") as f: + f.write("dummy content") + with pytest.raises(ValueError): + RouterConfig.from_file("test.txt") + os.remove("test.txt") + + def test_to_dict(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + assert layer_config.to_dict()["routes"] == [route.to_dict()] + + def test_add(self): + route = Route(name="test", utterances=["utterance"]) + route2 = Route(name="test2", utterances=["utterance2"]) + layer_config = RouterConfig() + layer_config.add(route) + # confirm route added + assert layer_config.routes == [route] + # add second route and check updates + layer_config.add(route2) + assert layer_config.routes == [route, route2] + + def test_get(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + assert layer_config.get("test") == route + + def test_get_not_found(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + assert layer_config.get("not_found") is None + + def test_remove(self): + route = Route(name="test", utterances=["utterance"]) + layer_config = RouterConfig(routes=[route]) + layer_config.remove("test") + assert layer_config.routes == [] + + def test_setting_aggregation_methods(self, openai_encoder, routes): + for agg in ["sum", "mean", "max"]: + route_layer = SemanticRouter( + encoder=openai_encoder, + routes=routes, + aggregation=agg, + ) + assert route_layer.aggregation == agg + + def test_semantic_classify_multiple_routes_with_different_aggregation( + self, openai_encoder, routes + ): + route_scores = [ + {"route": "Route 1", "score": 0.5}, + {"route": "Route 1", "score": 0.5}, + {"route": "Route 1", "score": 0.5}, + {"route": "Route 1", "score": 0.5}, + {"route": "Route 2", "score": 0.4}, + {"route": "Route 2", "score": 0.6}, + {"route": "Route 2", "score": 0.8}, + {"route": "Route 3", "score": 0.1}, + {"route": "Route 3", "score": 1.0}, + ] + for agg in ["sum", "mean", "max"]: + route_layer = SemanticRouter( + encoder=openai_encoder, + routes=routes, + aggregation=agg, + ) + classification, score = route_layer._semantic_classify(route_scores) - def test_initialization_no_encoder(self, openai_encoder, index_cls, encoder_cls): - os.environ["OPENAI_API_KEY"] = "test_api_key" - route_layer_none = SemanticRouter(encoder=None) - assert route_layer_none.score_threshold == openai_encoder.score_threshold + if agg == "sum": + assert classification == "Route 1" + assert score == [0.5, 0.5, 0.5, 0.5] + elif agg == "mean": + assert classification == "Route 2" + assert score == [0.4, 0.6, 0.8] + elif agg == "max": + assert classification == "Route 3" + assert score == [0.1, 1.0] -@pytest.mark.parametrize("index_cls", get_test_indexes()) +@pytest.mark.parametrize( + "index_cls,encoder_cls,router_cls", + [ + (index, encoder, router) + for index in get_test_indexes() + for encoder in [OpenAIEncoder] + for router in get_test_routers() + ], +) class TestSemanticRouter: def test_initialization_dynamic_route( - self, dynamic_routes, openai_encoder, index_cls + self, dynamic_routes, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=dynamic_routes, index=index, auto_sync="local", ) - assert route_layer.score_threshold == openai_encoder.score_threshold + score_threshold = route_layer.score_threshold + if isinstance(route_layer, HybridRouter): + assert score_threshold == encoder.score_threshold * route_layer.alpha + else: + assert score_threshold == encoder.score_threshold def test_add_single_utterance( - self, routes, route_single_utterance, openai_encoder, index_cls + self, routes, route_single_utterance, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) route_layer.add(routes=route_single_utterance) - assert route_layer.score_threshold == openai_encoder.score_threshold + score_threshold = route_layer.score_threshold + if isinstance(route_layer, HybridRouter): + assert score_threshold == encoder.score_threshold * route_layer.alpha + else: + assert score_threshold == encoder.score_threshold if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be updated _ = route_layer("Hello") assert len(route_layer.index.get_utterances()) == 6 def test_init_and_add_single_utterance( - self, route_single_utterance, openai_encoder, index_cls + self, route_single_utterance, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, index=index, auto_sync="local", ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be updated route_layer.add(routes=route_single_utterance) - assert route_layer.score_threshold == openai_encoder.score_threshold - _ = route_layer("Hello") - assert len(route_layer.index.get_utterances()) == 1 - - def test_delete_index(self, openai_encoder, routes, index_cls): + score_threshold = route_layer.score_threshold + if isinstance(route_layer, HybridRouter): + assert score_threshold == encoder.score_threshold * route_layer.alpha + else: + assert score_threshold == encoder.score_threshold + count = 0 + while count < RETRY_COUNT: + try: + _ = route_layer("Hello") + assert len(route_layer.index.get_utterances()) == 1 + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 + + def test_delete_index(self, routes, index_cls, encoder_cls, router_cls): # TODO merge .delete_index() and .delete_all() and get working index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer.index.delete_index() + # delete index + count = 0 + while count < RETRY_COUNT: + try: + route_layer.index.delete_index() + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 + # assert index empty + count = 0 + while count < RETRY_COUNT: + try: + assert route_layer.index.get_utterances() == [] + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be updated - assert route_layer.index.get_utterances() == [] - def test_add_route(self, routes, openai_encoder, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, routes=[], index=index, auto_sync="local" + def test_add_route(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=[], index=index, auto_sync="local" ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be updated - # Initially, the local routes list should be empty assert route_layer.routes == [] # same for the remote index @@ -322,23 +681,38 @@ def test_add_route(self, routes, openai_encoder, index_cls): # Add route1 and check route_layer.add(routes=routes[0]) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.routes == [routes[0]] - assert route_layer.index is not None - assert len(route_layer.index.get_utterances()) == 2 + count = 0 + while count < RETRY_COUNT: + try: + assert route_layer.routes == [routes[0]] + assert route_layer.index is not None + assert len(route_layer.index.get_utterances()) == 2 + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated # Add route2 and check route_layer.add(routes=routes[1]) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.routes == [routes[0], routes[1]] - assert len(route_layer.index.get_utterances()) == 5 - - def test_list_route_names(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + count = 0 + while count < RETRY_COUNT: + try: + assert route_layer.routes == [routes[0], routes[1]] + assert len(route_layer.index.get_utterances()) == 5 + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated + + def test_list_route_names(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", @@ -350,10 +724,11 @@ def test_list_route_names(self, openai_encoder, routes, index_cls): route.name for route in routes }, "The list of route names should match the names of the routes added." - def test_delete_route(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_delete_route(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", @@ -375,9 +750,10 @@ def test_delete_route(self, openai_encoder, routes, index_cls): utterance not in route_layer.index ), "The route's utterances should be deleted from the index." - def test_remove_route_not_found(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_remove_route_not_found(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # Attempt to remove a route that does not exist @@ -385,133 +761,205 @@ def test_remove_route_not_found(self, openai_encoder, routes, index_cls): route_layer.delete(non_existent_route) # we should see warning in logs only (ie no errors) - def test_add_multiple_routes(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_add_multiple_routes(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) route_layer.add(routes=routes) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index is not None - assert len(route_layer.index.get_utterances()) == 5 - - def test_query_and_classification(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, dimensions=3) - route_layer = SemanticRouter( - encoder=openai_encoder, + count = 0 + while count < RETRY_COUNT: + try: + assert route_layer.index is not None + assert len(route_layer.index.get_utterances()) == 5 + break + except Exception: + logger.warning(f"Index not ready, waiting for retry (try {count})") + count += 1 + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated + + def test_query_and_classification(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - query_result = route_layer(text="Hello").name - assert query_result in ["Route 1", "Route 2"] - - def test_query_filter(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, dimensions=3) - route_layer = SemanticRouter( - encoder=openai_encoder, + count = 0 + # we allow for 5 retries to allow for index to be populated + while count < RETRY_COUNT: + try: + query_result = route_layer(text="Hello").name + assert query_result in ["Route 1", "Route 2"] + break + except Exception: + logger.warning( + f"Query result not in expected routes, waiting for retry (try {count})" + ) + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated + count += 1 + + def test_query_filter(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated - query_result = route_layer(text="Hello", route_filter=["Route 1"]).name try: + # TODO JB: currently LocalIndex raises ValueError but others don't + # they should all behave in the same way route_layer(text="Hello", route_filter=["Route 8"]).name except ValueError: assert True - assert query_result in ["Route 1"] - - @pytest.mark.skipif( - os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" - ) - def test_query_filter_pinecone(self, openai_encoder, routes, index_cls): - if index_cls is PineconeIndex: - pineconeindex = init_index(index_cls, dimensions=3) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - index=pineconeindex, - auto_sync="local", - ) - time.sleep(PINECONE_SLEEP) # allow for index to be populated - query_result = route_layer(text="Hello", route_filter=["Route 1"]).name - + count = 0 + # we allow for 5 retries to allow for index to be populated + while count < RETRY_COUNT: try: - route_layer(text="Hello", route_filter=["Route 8"]).name - except ValueError: - assert True - - assert query_result in ["Route 1"] + query_result = route_layer(text="Hello", route_filter=["Route 1"]).name + assert query_result in ["Route 1"] + break + except Exception: + logger.warning( + f"Query result not in expected routes, waiting for retry (try {count})" + ) + count += 1 + time.sleep(PINECONE_SLEEP) # allow for index to be populated @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_namespace_pinecone_index(self, openai_encoder, routes, index_cls): + def test_namespace_pinecone_index(self, routes, index_cls, encoder_cls, router_cls): if index_cls is PineconeIndex: - pineconeindex = init_index(index_cls, namespace="test") - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + pineconeindex = init_index( + index_cls, namespace="test", index_name=encoder.__class__.__name__ + ) + route_layer = router_cls( + encoder=encoder, routes=routes, index=pineconeindex, auto_sync="local", ) - time.sleep(PINECONE_SLEEP) # allow for index to be populated - query_result = route_layer(text="Hello", route_filter=["Route 1"]).name - - try: - route_layer(text="Hello", route_filter=["Route 8"]).name - except ValueError: - assert True - - assert query_result in ["Route 1"] + count = 0 + while count < RETRY_COUNT: + try: + query_result = route_layer( + text="Hello", route_filter=["Route 1"] + ).name + assert query_result in ["Route 1"] + break + except Exception: + logger.warning( + f"Query result not in expected routes, waiting for retry (try {count})" + ) + if index_cls is PineconeIndex: + time.sleep( + PINECONE_SLEEP * 2 + ) # allow for index to be populated + count += 1 route_layer.index.index.delete(namespace="test", delete_all=True) - def test_query_with_no_index(self, openai_encoder, index_cls): - route_layer = SemanticRouter(encoder=openai_encoder) + def test_query_with_no_index(self, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + route_layer = router_cls(encoder=encoder) + # TODO: probably should avoid running this with multiple encoders or find a way to set dims with pytest.raises(ValueError): assert route_layer(text="Anything").name is None - def test_query_with_vector(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_query_with_vector(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - vector = [0.1, 0.2, 0.3] - query_result = route_layer(vector=vector).name - assert query_result in ["Route 1", "Route 2"] + # create vectors + vector = encoder(["hello"]) + if router_cls is HybridRouter: + sparse_vector = route_layer.sparse_encoder(["hello"])[0] + count = 0 + while count < RETRY_COUNT: + try: + if router_cls is HybridRouter: + query_result = route_layer( + vector=vector, sparse_vector=sparse_vector + ).name + else: + query_result = route_layer(vector=vector).name + assert query_result in ["Route 1", "Route 2"] + break + except Exception: + logger.warning( + "Query result not in expected routes, waiting for retry " + f"(try {count})" + ) + count += 1 + time.sleep(PINECONE_SLEEP) # allow for index to be populated - def test_query_with_no_text_or_vector(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_query_with_no_text_or_vector( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) with pytest.raises(ValueError): route_layer() - def test_semantic_classify(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_is_ready(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, + routes=routes, + index=index, + auto_sync="local", + ) + count = 0 + while count < RETRY_COUNT + 1: + if route_layer.index.is_ready(): + break + logger.warning("Route layer not ready, waiting for retry (try {count})") + count += 1 + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated + assert count <= RETRY_COUNT, "Route layer not ready after {RETRY_COUNT} retries" + + +@pytest.mark.parametrize( + "index_cls,encoder_cls,router_cls", + [ + (index, encoder, router) + for index in [LocalIndex] # no need to test with multiple indexes + for encoder in [OpenAIEncoder] # no need to test with multiple encoders + for router in get_test_routers() + ], +) +class TestRouterOnly: + def test_semantic_classify(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated classification, score = route_layer._semantic_classify( [ {"route": "Route 1", "score": 0.9}, @@ -521,16 +969,17 @@ def test_semantic_classify(self, openai_encoder, routes, index_cls): assert classification == "Route 1" assert score == [0.9] - def test_semantic_classify_multiple_routes(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_semantic_classify_multiple_routes( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", ) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated classification, score = route_layer._semantic_classify( [ {"route": "Route 1", "score": 0.9}, @@ -542,44 +991,54 @@ def test_semantic_classify_multiple_routes(self, openai_encoder, routes, index_c assert score == [0.9, 0.8] def test_query_no_text_dynamic_route( - self, openai_encoder, dynamic_routes, index_cls + self, dynamic_routes, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, routes=dynamic_routes, index=index - ) - vector = [0.1, 0.2, 0.3] + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=dynamic_routes, index=index) + # create vectors + vector = encoder(["hello"]) + if router_cls is HybridRouter: + sparse_vector = route_layer.sparse_encoder(["hello"])[0] with pytest.raises(ValueError): - route_layer(vector=vector) + if router_cls is HybridRouter: + route_layer(vector=vector, sparse_vector=sparse_vector) + else: + route_layer(vector=vector) - def test_pass_threshold(self, openai_encoder, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_pass_threshold(self, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, index=index, auto_sync="local", ) assert not route_layer._pass_threshold([], 0.3) assert route_layer._pass_threshold([0.6, 0.7], 0.3) - def test_failover_score_threshold(self, openai_encoder, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_failover_score_threshold(self, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, index=index, auto_sync="local", ) - assert route_layer.score_threshold == 0.3 + if router_cls is HybridRouter: + assert route_layer.score_threshold == 0.3 * route_layer.alpha + else: + assert route_layer.score_threshold == 0.3 - def test_json(self, openai_encoder, routes, index_cls): + def test_json(self, routes, index_cls, encoder_cls, router_cls): temp = tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) try: temp_path = temp.name # Save the temporary file's path temp.close() # Close the file to ensure it can be opened again on Windows - os.environ["OPENAI_API_KEY"] = "test_api_key" - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", @@ -587,8 +1046,6 @@ def test_json(self, openai_encoder, routes, index_cls): route_layer.to_json(temp_path) assert os.path.exists(temp_path) route_layer_from_file = SemanticRouter.from_json(temp_path) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated assert ( route_layer_from_file.index is not None and route_layer_from_file._get_route_names() is not None @@ -596,15 +1053,15 @@ def test_json(self, openai_encoder, routes, index_cls): finally: os.remove(temp_path) # Ensure the file is deleted even if the test fails - def test_yaml(self, openai_encoder, routes, index_cls): + def test_yaml(self, routes, index_cls, encoder_cls, router_cls): temp = tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) try: temp_path = temp.name # Save the temporary file's path temp.close() # Close the file to ensure it can be opened again on Windows - os.environ["OPENAI_API_KEY"] = "test_api_key" - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, index=index, auto_sync="local", @@ -612,8 +1069,6 @@ def test_yaml(self, openai_encoder, routes, index_cls): route_layer.to_yaml(temp_path) assert os.path.exists(temp_path) route_layer_from_file = SemanticRouter.from_yaml(temp_path) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated assert ( route_layer_from_file.index is not None and route_layer_from_file._get_route_names() is not None @@ -621,159 +1076,74 @@ def test_yaml(self, openai_encoder, routes, index_cls): finally: os.remove(temp_path) # Ensure the file is deleted even if the test fails - def test_from_file_json(openai_encoder, tmp_path, index_cls): - # Create a temporary JSON file with layer configuration - config_path = tmp_path / "config.json" - config_path.write_text( - layer_json() - ) # Assuming layer_json() returns a valid JSON string - - # Load the RouterConfig from the temporary file - layer_config = RouterConfig.from_file(str(config_path)) - - # Assertions to verify the loaded configuration - assert layer_config.encoder_type == "cohere" - assert layer_config.encoder_name == "embed-english-v3.0" - assert len(layer_config.routes) == 2 - assert layer_config.routes[0].name == "politics" - - def test_from_file_yaml(openai_encoder, tmp_path, index_cls): - # Create a temporary YAML file with layer configuration - config_path = tmp_path / "config.yaml" - config_path.write_text( - layer_yaml() - ) # Assuming layer_yaml() returns a valid YAML string - - # Load the RouterConfig from the temporary file - layer_config = RouterConfig.from_file(str(config_path)) - - # Assertions to verify the loaded configuration - assert layer_config.encoder_type == "cohere" - assert layer_config.encoder_name == "embed-english-v3.0" - assert len(layer_config.routes) == 2 - assert layer_config.routes[0].name == "politics" - - def test_from_file_invalid_path(self, index_cls): - with pytest.raises(FileNotFoundError) as excinfo: - RouterConfig.from_file("nonexistent_path.json") - assert "[Errno 2] No such file or directory: 'nonexistent_path.json'" in str( - excinfo.value - ) - - def test_from_file_unsupported_type(self, tmp_path, index_cls): - # Create a temporary unsupported file - config_path = tmp_path / "config.unsupported" - config_path.write_text(layer_json()) - - with pytest.raises(ValueError) as excinfo: - RouterConfig.from_file(str(config_path)) - assert "Unsupported file type" in str(excinfo.value) - - def test_from_file_invalid_config(self, tmp_path, index_cls): - # Define an invalid configuration JSON - invalid_config_json = """ - { - "encoder_type": "cohere", - "encoder_name": "embed-english-v3.0", - "routes": "This should be a list, not a string" - }""" - - # Write the invalid configuration to a temporary JSON file - config_path = tmp_path / "invalid_config.json" - with open(config_path, "w") as file: - file.write(invalid_config_json) - - # Patch the is_valid function to return False for this test - with patch("semantic_router.routers.base.is_valid", return_value=False): - # Attempt to load the RouterConfig from the temporary file - # and assert that it raises an exception due to invalid configuration - with pytest.raises(Exception) as excinfo: - RouterConfig.from_file(str(config_path)) - assert "Invalid config JSON or YAML" in str( - excinfo.value - ), "Loading an invalid configuration should raise an exception." - - def test_from_file_with_llm(self, tmp_path, index_cls): - llm_config_json = """ - { - "encoder_type": "cohere", - "encoder_name": "embed-english-v3.0", - "routes": [ - { - "name": "llm_route", - "utterances": ["tell me a joke", "say something funny"], - "llm": { - "module": "semantic_router.llms.base", - "class": "BaseLLM", - "model": "fake-model-v1" - } - } - ] - }""" - - config_path = tmp_path / "config_with_llm.json" - with open(config_path, "w") as file: - file.write(llm_config_json) - - # Load the RouterConfig from the temporary file - layer_config = RouterConfig.from_file(str(config_path)) - - # Using BaseLLM because trying to create a usable Mock LLM is a nightmare. - assert isinstance( - layer_config.routes[0].llm, BaseLLM - ), "LLM should be instantiated and associated with the route based on the " - "config" - assert ( - layer_config.routes[0].llm.name == "fake-model-v1" - ), "LLM instance should have the 'name' attribute set correctly" - - def test_config(self, openai_encoder, routes, index_cls): - os.environ["OPENAI_API_KEY"] = "test_api_key" - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_config(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) # confirm route creation functions as expected layer_config = route_layer.to_config() assert layer_config.routes == route_layer.routes # now load from config and confirm it's the same route_layer_from_config = SemanticRouter.from_config(layer_config, index) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated assert ( route_layer_from_config._get_route_names() == route_layer._get_route_names() ) - assert route_layer_from_config.score_threshold == route_layer.score_threshold + if router_cls is HybridRouter: + # TODO: need to fix HybridRouter from config + # assert ( + # route_layer_from_config.score_threshold + # == route_layer.score_threshold * route_layer.alpha + # ) + pass + else: + assert ( + route_layer_from_config.score_threshold == route_layer.score_threshold + ) - def test_get_thresholds(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) - assert route_layer.get_thresholds() == {"Route 1": 0.3, "Route 2": 0.3} + def test_get_thresholds(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) + if router_cls is HybridRouter: + # TODO: fix this + target = encoder.score_threshold * route_layer.alpha + assert route_layer.get_thresholds() == { + "Route 1": target, + "Route 2": target, + } + else: + assert route_layer.get_thresholds() == {"Route 1": 0.3, "Route 2": 0.3} def test_with_multiple_routes_passing_threshold( - self, openai_encoder, routes, index_cls + self, routes, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) - route_layer.score_threshold = 0.5 # Set the score_threshold if needed + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) + route_layer.score_threshold = 0.3 # Set the score_threshold if needed # Assuming route_layer is already set up with routes "Route 1" and "Route 2" query_results = [ - {"route": "Route 1", "score": 0.6}, - {"route": "Route 2", "score": 0.7}, - {"route": "Route 1", "score": 0.8}, + {"route": "Route 1", "score": 0.1}, + {"route": "Route 2", "score": 0.8}, + {"route": "Route 1", "score": 0.9}, ] - expected = [("Route 1", 0.8), ("Route 2", 0.7)] + expected = [("Route 1", 0.9), ("Route 2", 0.8)] results = route_layer._semantic_classify_multiple_routes(query_results) assert sorted(results) == sorted( expected ), "Should classify and return routes above their thresholds" - def test_with_no_routes_passing_threshold(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_with_no_routes_passing_threshold( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) # set threshold to 1.0 so that no routes pass route_layer.score_threshold = 1.0 query_results = [ - {"route": "Route 1", "score": 0.3}, - {"route": "Route 2", "score": 0.2}, + {"route": "Route 1", "score": 0.01}, + {"route": "Route 2", "score": 0.02}, ] expected = [] results = route_layer._semantic_classify_multiple_routes(query_results) @@ -781,9 +1151,10 @@ def test_with_no_routes_passing_threshold(self, openai_encoder, routes, index_cl results == expected ), "Should return an empty list when no routes pass their thresholds" - def test_with_no_query_results(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_with_no_query_results(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) route_layer.score_threshold = 0.5 query_results = [] expected = [] @@ -792,9 +1163,10 @@ def test_with_no_query_results(self, openai_encoder, routes, index_cls): results == expected ), "Should return an empty list when there are no query results" - def test_with_unrecognized_route(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_with_unrecognized_route(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) route_layer.score_threshold = 0.5 # Test with a route name that does not exist in the route_layer's routes query_results = [{"route": "UnrecognizedRoute", "score": 0.9}] @@ -802,106 +1174,12 @@ def test_with_unrecognized_route(self, openai_encoder, routes, index_cls): results = route_layer._semantic_classify_multiple_routes(query_results) assert results == expected, "Should ignore and not return unrecognized routes" - def test_retrieve_with_text(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - index=index, - auto_sync="local", - ) - text = "Hello" - results = route_layer.retrieve_multiple_routes(text=text) - assert len(results) >= 1, "Expected at least one result" - assert any( - result.name in ["Route 1", "Route 2"] for result in results - ), "Expected the result to be either 'Route 1' or 'Route 2'" - - def test_retrieve_with_vector(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - index=index, - auto_sync="local", - ) - vector = [0.1, 0.2, 0.3] - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) # allow for index to be populated - results = route_layer.retrieve_multiple_routes(vector=vector) - assert len(results) >= 1, "Expected at least one result" - assert any( - result.name in ["Route 1", "Route 2"] for result in results - ), "Expected the result to be either 'Route 1' or 'Route 2'" - - def test_retrieve_without_text_or_vector(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - index=index, - auto_sync="local", - ) - with pytest.raises(ValueError, match="Either text or vector must be provided"): - route_layer.retrieve_multiple_routes() - - def test_retrieve_no_matches(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - index=index, - auto_sync="local", - ) - text = "Asparagus" - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) - results = route_layer.retrieve_multiple_routes(text=text) - assert len(results) == 0, f"Expected no results, but got {len(results)}" - - def test_retrieve_one_match(self, openai_encoder, routes_3, index_cls): - index = init_index(index_cls, dimensions=3) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes_3, - index=index, - auto_sync="local", - ) - text = "Hello" - # set low threshold - route_layer.set_threshold(threshold=0.1, route_name="Route 1") - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) - results = route_layer.retrieve_multiple_routes(text=text) - assert len(results) == 1, f"Expected one result, and got {len(results)}" - matched_routes = [result.name for result in results] - assert "Route 1" in matched_routes, "Expected 'Route 1' to be a match" - - def test_retrieve_with_text_for_multiple_matches( - self, openai_encoder, routes_2, index_cls - ): - index = init_index(index_cls) - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes_2, - index=index, - auto_sync="local", - ) - text = "Hello" - route_layer.set_threshold(threshold=0.01, route_name=None) - if index_cls is PineconeIndex: - time.sleep(PINECONE_SLEEP) - results = route_layer.retrieve_multiple_routes(text=text) - assert len(results) == 2, "Expected two results" - matched_routes = [result.name for result in results] - assert "Route 1" in matched_routes, "Expected 'Route 1' to be a match" - assert "Route 2" in matched_routes, "Expected 'Route 2' to be a match" - def test_set_aggregation_method_with_unsupported_value( - self, openai_encoder, routes, index_cls + self, routes, index_cls, encoder_cls, router_cls ): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) unsupported_aggregation = "unsupported_aggregation_method" with pytest.raises( ValueError, @@ -909,17 +1187,21 @@ def test_set_aggregation_method_with_unsupported_value( ): route_layer._set_aggregation_method(unsupported_aggregation) - def test_refresh_routes_not_implemented(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_refresh_routes_not_implemented( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) with pytest.raises( NotImplementedError, match="This method has not yet been implemented." ): route_layer._refresh_routes() - def test_update_threshold(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_update_threshold(self, routes, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) route_name = "Route 1" new_threshold = 0.8 route_layer.update(name=route_name, threshold=new_threshold) @@ -928,9 +1210,12 @@ def test_update_threshold(self, openai_encoder, routes, index_cls): updated_route.score_threshold == new_threshold ), f"Expected threshold to be updated to {new_threshold}, but got {updated_route.score_threshold}" - def test_update_non_existent_route(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_update_non_existent_route( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) non_existent_route = "Non-existent Route" with pytest.raises( ValueError, @@ -938,18 +1223,24 @@ def test_update_non_existent_route(self, openai_encoder, routes, index_cls): ): route_layer.update(name=non_existent_route, threshold=0.7) - def test_update_without_parameters(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_update_without_parameters( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) with pytest.raises( ValueError, match="At least one of 'threshold' or 'utterances' must be provided.", ): route_layer.update(name="Route 1") - def test_update_utterances_not_implemented(self, openai_encoder, routes, index_cls): - index = init_index(index_cls) - route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) + def test_update_utterances_not_implemented( + self, routes, index_cls, encoder_cls, router_cls + ): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls(encoder=encoder, routes=routes, index=index) with pytest.raises( NotImplementedError, match="The update method cannot be used for updating utterances yet.", @@ -957,147 +1248,58 @@ def test_update_utterances_not_implemented(self, openai_encoder, routes, index_c route_layer.update(name="Route 1", utterances=["New utterance"]) +@pytest.mark.parametrize( + "index_cls,encoder_cls,router_cls", + [ + (index, encoder, router) + for index in get_test_indexes() + for encoder in [OpenAIEncoder] + for router in get_test_routers() + ], +) class TestLayerFit: - def test_eval(self, openai_encoder, routes, test_data): - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_eval(self, routes, test_data, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, + index=index, auto_sync="local", ) + count = 0 + while True: + if route_layer.index.is_ready(): + break + count += 1 + if count > RETRY_COUNT: + raise ValueError("Index not ready") + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # unpack test data X, y = zip(*test_data) # evaluate - route_layer.evaluate(X=X, y=y, batch_size=int(len(test_data) / 5)) + route_layer.evaluate(X=list(X), y=list(y), batch_size=int(len(X) / 5)) - def test_fit(self, openai_encoder, routes, test_data): - route_layer = SemanticRouter( - encoder=openai_encoder, + def test_fit(self, routes, test_data, index_cls, encoder_cls, router_cls): + encoder = encoder_cls() + index = init_index(index_cls, index_name=encoder.__class__.__name__) + route_layer = router_cls( + encoder=encoder, routes=routes, + index=index, auto_sync="local", ) + count = 0 + while True: + print(f"{count=}") + if route_layer.index.is_ready(): + break + count += 1 + if count > RETRY_COUNT: + raise ValueError("Index not ready") + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # unpack test data X, y = zip(*test_data) - route_layer.fit(X=X, y=y, batch_size=int(len(test_data) / 5)) - - -# Add more tests for edge cases and error handling as needed. - - -class TestRouterConfig: - def test_init(self): - layer_config = RouterConfig() - assert layer_config.routes == [] - - def test_to_file_json(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - with patch("builtins.open", mock_open()) as mocked_open: - layer_config.to_file("data/test_output.json") - mocked_open.assert_called_once_with("data/test_output.json", "w") - - def test_to_file_yaml(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - with patch("builtins.open", mock_open()) as mocked_open: - layer_config.to_file("data/test_output.yaml") - mocked_open.assert_called_once_with("data/test_output.yaml", "w") - - def test_to_file_invalid(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - with pytest.raises(ValueError): - layer_config.to_file("test_output.txt") - - def test_from_file_json(self): - mock_json_data = layer_json() - with patch("builtins.open", mock_open(read_data=mock_json_data)) as mocked_open: - layer_config = RouterConfig.from_file("data/test.json") - mocked_open.assert_called_once_with("data/test.json", "r") - assert isinstance(layer_config, RouterConfig) - - def test_from_file_yaml(self): - mock_yaml_data = layer_yaml() - with patch("builtins.open", mock_open(read_data=mock_yaml_data)) as mocked_open: - layer_config = RouterConfig.from_file("data/test.yaml") - mocked_open.assert_called_once_with("data/test.yaml", "r") - assert isinstance(layer_config, RouterConfig) - - def test_from_file_invalid(self): - with open("test.txt", "w") as f: - f.write("dummy content") - with pytest.raises(ValueError): - RouterConfig.from_file("test.txt") - os.remove("test.txt") - - def test_to_dict(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - assert layer_config.to_dict()["routes"] == [route.to_dict()] - - def test_add(self): - route = Route(name="test", utterances=["utterance"]) - route2 = Route(name="test2", utterances=["utterance2"]) - layer_config = RouterConfig() - layer_config.add(route) - # confirm route added - assert layer_config.routes == [route] - # add second route and check updates - layer_config.add(route2) - assert layer_config.routes == [route, route2] - - def test_get(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - assert layer_config.get("test") == route - - def test_get_not_found(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - assert layer_config.get("not_found") is None - - def test_remove(self): - route = Route(name="test", utterances=["utterance"]) - layer_config = RouterConfig(routes=[route]) - layer_config.remove("test") - assert layer_config.routes == [] - - def test_setting_aggregation_methods(self, openai_encoder, routes): - for agg in ["sum", "mean", "max"]: - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - aggregation=agg, - ) - assert route_layer.aggregation == agg - - def test_semantic_classify_multiple_routes_with_different_aggregation( - self, openai_encoder, routes - ): - route_scores = [ - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 1", "score": 0.5}, - {"route": "Route 2", "score": 0.4}, - {"route": "Route 2", "score": 0.6}, - {"route": "Route 2", "score": 0.8}, - {"route": "Route 3", "score": 0.1}, - {"route": "Route 3", "score": 1.0}, - ] - for agg in ["sum", "mean", "max"]: - route_layer = SemanticRouter( - encoder=openai_encoder, - routes=routes, - aggregation=agg, - ) - classification, score = route_layer._semantic_classify(route_scores) - - if agg == "sum": - assert classification == "Route 1" - assert score == [0.5, 0.5, 0.5, 0.5] - elif agg == "mean": - assert classification == "Route 2" - assert score == [0.4, 0.6, 0.8] - elif agg == "max": - assert classification == "Route 3" - assert score == [0.1, 1.0] + route_layer.fit(X=list(X), y=list(y), batch_size=int(len(X) / 5)) diff --git a/tests/unit/test_sync.py b/tests/unit/test_sync.py index 0bf5eb05..148e62ee 100644 --- a/tests/unit/test_sync.py +++ b/tests/unit/test_sync.py @@ -14,7 +14,7 @@ PostgresIndex, ) from semantic_router.schema import Utterance -from semantic_router.routers import SemanticRouter +from semantic_router.routers import SemanticRouter, HybridRouter from semantic_router.route import Route from platform import python_version @@ -148,12 +148,28 @@ def base_encoder(): @pytest.fixture def cohere_encoder(mocker): mocker.patch.object(CohereEncoder, "__call__", side_effect=mock_encoder_call) + + # Mock async call + async def async_mock_encoder_call(docs=None, utterances=None): + # Handle either docs or utterances parameter + texts = docs if docs is not None else utterances + return mock_encoder_call(texts) + + mocker.patch.object(CohereEncoder, "acall", side_effect=async_mock_encoder_call) return CohereEncoder(name="test-cohere-encoder", cohere_api_key="test_api_key") @pytest.fixture def openai_encoder(mocker): mocker.patch.object(OpenAIEncoder, "__call__", side_effect=mock_encoder_call) + + # Mock async call + async def async_mock_encoder_call(docs=None, utterances=None): + # Handle either docs or utterances parameter + texts = docs if docs is not None else utterances + return mock_encoder_call(texts) + + mocker.patch.object(OpenAIEncoder, "acall", side_effect=async_mock_encoder_call) return OpenAIEncoder(name="text-embedding-3-small", openai_api_key="test_api_key") @@ -228,14 +244,24 @@ def get_test_indexes(): return indexes -@pytest.mark.parametrize("index_cls", get_test_indexes()) +def get_test_routers(): + routers = [SemanticRouter] + if importlib.util.find_spec("pinecone_text") is not None: + routers.append(HybridRouter) + return routers + + +@pytest.mark.parametrize( + "index_cls,router_cls", + [(index, router) for index in get_test_indexes() for router in get_test_routers()], +) class TestSemanticRouter: @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_initialization(self, openai_encoder, routes, index_cls): + def test_initialization(self, openai_encoder, routes, index_cls, router_cls): index = init_index(index_cls) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, top_k=10, @@ -246,9 +272,11 @@ def test_initialization(self, openai_encoder, routes, index_cls): @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_second_initialization_sync(self, openai_encoder, routes, index_cls): + def test_second_initialization_sync( + self, openai_encoder, routes, index_cls, router_cls + ): index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) if index_cls is PineconeIndex: @@ -259,15 +287,13 @@ def test_second_initialization_sync(self, openai_encoder, routes, index_cls): os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) def test_second_initialization_not_synced( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): index = init_index(index_cls) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) - route_layer = SemanticRouter( - encoder=openai_encoder, routes=routes_2, index=index - ) + route_layer = router_cls(encoder=openai_encoder, routes=routes_2, index=index) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated assert route_layer.is_synced() is False @@ -275,14 +301,14 @@ def test_second_initialization_not_synced( @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_utterance_diff(self, openai_encoder, routes, routes_2, index_cls): + def test_utterance_diff( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): index = init_index(index_cls) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) - route_layer_2 = SemanticRouter( - encoder=openai_encoder, routes=routes_2, index=index - ) + route_layer_2 = router_cls(encoder=openai_encoder, routes=routes_2, index=index) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated diff = route_layer_2.get_utterance_diff(include_metadata=True) @@ -298,17 +324,19 @@ def test_utterance_diff(self, openai_encoder, routes, routes_2, index_cls): @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_auto_sync_local(self, openai_encoder, routes, routes_2, index_cls): + def test_auto_sync_local( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST LOCAL pinecone_index = init_index(index_cls) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, ) time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -323,18 +351,20 @@ def test_auto_sync_local(self, openai_encoder, routes, routes_2, index_cls): @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_auto_sync_remote(self, openai_encoder, routes, routes_2, index_cls): + def test_auto_sync_remote( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST REMOTE pinecone_index = init_index(index_cls) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, @@ -350,19 +380,19 @@ def test_auto_sync_remote(self, openai_encoder, routes, routes_2, index_cls): os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) def test_auto_sync_merge_force_local( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): if index_cls is PineconeIndex: # TEST MERGE FORCE LOCAL pinecone_index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -389,19 +419,19 @@ def test_auto_sync_merge_force_local( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) def test_auto_sync_merge_force_remote( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): if index_cls is PineconeIndex: # TEST MERGE FORCE LOCAL pinecone_index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -433,8 +463,8 @@ def test_auto_sync_merge_force_remote( @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_sync(self, openai_encoder, index_cls): - route_layer = SemanticRouter( + def test_sync(self, openai_encoder, index_cls, router_cls): + route_layer = router_cls( encoder=openai_encoder, routes=[], index=init_index(index_cls), @@ -448,18 +478,20 @@ def test_sync(self, openai_encoder, index_cls): @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_auto_sync_merge(self, openai_encoder, routes, routes_2, index_cls): + def test_auto_sync_merge( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST MERGE pinecone_index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, @@ -492,17 +524,23 @@ def test_auto_sync_merge(self, openai_encoder, routes, routes_2, index_cls): os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) def test_sync_lock_prevents_concurrent_sync( - self, openai_encoder, routes, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): """Test that sync lock prevents concurrent synchronization operations""" index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( + encoder=openai_encoder, + routes=routes_2, + index=index, + auto_sync="local", + ) + # initialize an out of sync router + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync=None, ) - # Acquire sync lock route_layer.index.lock(value=True) if index_cls is PineconeIndex: @@ -526,10 +564,12 @@ def test_sync_lock_prevents_concurrent_sync( @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) - def test_sync_lock_auto_releases(self, openai_encoder, routes, index_cls): + def test_sync_lock_auto_releases( + self, openai_encoder, routes, index_cls, router_cls + ): """Test that sync lock is automatically released after sync operations""" index = init_index(index_cls) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, @@ -547,19 +587,23 @@ def test_sync_lock_auto_releases(self, openai_encoder, routes, index_cls): time.sleep(PINECONE_SLEEP) assert route_layer.is_synced() - # clear index - route_layer.index.index.delete(namespace="", delete_all=True) + # clear index if pinecone + if index_cls is PineconeIndex: + route_layer.index.client.delete_index(route_layer.index.index_name) -@pytest.mark.parametrize("index_cls", get_test_indexes()) +@pytest.mark.parametrize( + "index_cls,router_cls", + [(index, router) for index in get_test_indexes() for router in get_test_routers()], +) class TestAsyncSemanticRouter: @pytest.mark.skipif( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_initialization(self, openai_encoder, routes, index_cls): + async def test_initialization(self, openai_encoder, routes, index_cls, router_cls): index = init_index(index_cls, init_async_index=True) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, top_k=10, @@ -571,9 +615,11 @@ async def test_initialization(self, openai_encoder, routes, index_cls): os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_second_initialization_sync(self, openai_encoder, routes, index_cls): + async def test_second_initialization_sync( + self, openai_encoder, routes, index_cls, router_cls + ): index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) if index_cls is PineconeIndex: @@ -585,15 +631,13 @@ async def test_second_initialization_sync(self, openai_encoder, routes, index_cl ) @pytest.mark.asyncio async def test_second_initialization_not_synced( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): index = init_index(index_cls, init_async_index=True) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) - route_layer = SemanticRouter( - encoder=openai_encoder, routes=routes_2, index=index - ) + route_layer = router_cls(encoder=openai_encoder, routes=routes_2, index=index) if index_cls is PineconeIndex: await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated assert await route_layer.async_is_synced() is False @@ -602,16 +646,16 @@ async def test_second_initialization_not_synced( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_utterance_diff(self, openai_encoder, routes, routes_2, index_cls): + async def test_utterance_diff( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): index = init_index(index_cls, init_async_index=True) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync="local" ) - route_layer_2 = SemanticRouter( - encoder=openai_encoder, routes=routes_2, index=index - ) + route_layer_2 = router_cls(encoder=openai_encoder, routes=routes_2, index=index) if index_cls is PineconeIndex: - await asyncio.sleep(PINECONE_SLEEP * 2) # allow for index to be populated + await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated diff = await route_layer_2.aget_utterance_diff(include_metadata=True) assert '+ Route 1: Hello | None | {"type": "default"}' in diff assert '+ Route 1: Hi | None | {"type": "default"}' in diff @@ -626,17 +670,19 @@ async def test_utterance_diff(self, openai_encoder, routes, routes_2, index_cls) os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_auto_sync_local(self, openai_encoder, routes, routes_2, index_cls): + async def test_auto_sync_local( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST LOCAL pinecone_index = init_index(index_cls, init_async_index=True) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -652,18 +698,20 @@ async def test_auto_sync_local(self, openai_encoder, routes, routes_2, index_cls os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_auto_sync_remote(self, openai_encoder, routes, routes_2, index_cls): + async def test_auto_sync_remote( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST REMOTE pinecone_index = init_index(index_cls, init_async_index=True) - _ = SemanticRouter( + _ = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, auto_sync="local", ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, @@ -680,19 +728,19 @@ async def test_auto_sync_remote(self, openai_encoder, routes, routes_2, index_cl ) @pytest.mark.asyncio async def test_auto_sync_merge_force_local( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): if index_cls is PineconeIndex: # TEST MERGE FORCE LOCAL pinecone_index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, auto_sync="local", ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -720,19 +768,19 @@ async def test_auto_sync_merge_force_local( ) @pytest.mark.asyncio async def test_auto_sync_merge_force_remote( - self, openai_encoder, routes, routes_2, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): if index_cls is PineconeIndex: # TEST MERGE FORCE LOCAL pinecone_index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, auto_sync="local", ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, @@ -740,7 +788,7 @@ async def test_auto_sync_merge_force_remote( ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated # confirm local and remote are synced - assert route_layer.async_is_synced() + assert await route_layer.async_is_synced() # now confirm utterances are correct local_utterances = await route_layer.index.aget_utterances() # we sort to ensure order is the same @@ -765,8 +813,8 @@ async def test_auto_sync_merge_force_remote( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_sync(self, openai_encoder, index_cls): - route_layer = SemanticRouter( + async def test_sync(self, openai_encoder, index_cls, router_cls): + route_layer = router_cls( encoder=openai_encoder, routes=[], index=init_index(index_cls, init_async_index=True), @@ -781,18 +829,20 @@ async def test_sync(self, openai_encoder, index_cls): os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_auto_sync_merge(self, openai_encoder, routes, routes_2, index_cls): + async def test_auto_sync_merge( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): if index_cls is PineconeIndex: # TEST MERGE pinecone_index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes_2, index=pinecone_index, auto_sync="local", ) await asyncio.sleep(PINECONE_SLEEP) # allow for index to be populated - route_layer = SemanticRouter( + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=pinecone_index, @@ -826,11 +876,18 @@ async def test_auto_sync_merge(self, openai_encoder, routes, routes_2, index_cls ) @pytest.mark.asyncio async def test_sync_lock_prevents_concurrent_sync( - self, openai_encoder, routes, index_cls + self, openai_encoder, routes, routes_2, index_cls, router_cls ): """Test that sync lock prevents concurrent synchronization operations""" index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + route_layer = router_cls( + encoder=openai_encoder, + routes=routes_2, + index=index, + auto_sync="local", + ) + # initialize an out of sync router + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, @@ -861,26 +918,40 @@ async def test_sync_lock_prevents_concurrent_sync( os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" ) @pytest.mark.asyncio - async def test_sync_lock_auto_releases(self, openai_encoder, routes, index_cls): + async def test_sync_lock_auto_releases( + self, openai_encoder, routes, routes_2, index_cls, router_cls + ): """Test that sync lock is automatically released after sync operations""" index = init_index(index_cls, init_async_index=True) - route_layer = SemanticRouter( + print(f"1. {index.namespace=}") + route_layer = router_cls( + encoder=openai_encoder, + routes=routes_2, + index=index, + auto_sync="local", + ) + print(f"2. {route_layer.index.namespace=}") + route_layer = router_cls( encoder=openai_encoder, routes=routes, index=index, auto_sync=None, ) - + if index_cls is PineconeIndex: + await asyncio.sleep(PINECONE_SLEEP) # Initial sync should acquire and release lock await route_layer.async_sync("local") if index_cls is PineconeIndex: await asyncio.sleep(PINECONE_SLEEP) + print(f"3. {route_layer.index.namespace=}") # Lock should be released, allowing another sync await route_layer.async_sync("local") # Should not raise exception if index_cls is PineconeIndex: await asyncio.sleep(PINECONE_SLEEP) assert await route_layer.async_is_synced() + print(f"4. {route_layer.index.namespace=}") - # clear index - route_layer.index.index.delete(namespace="", delete_all=True) + # clear index if pinecone + if index_cls is PineconeIndex: + route_layer.index.client.delete_index(route_layer.index.index_name)