Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: full mocking #507

Draft
wants to merge 46 commits into
base: main
Choose a base branch
from
Draft

feat: full mocking #507

wants to merge 46 commits into from

Conversation

jamescalam
Copy link
Member

@jamescalam jamescalam commented Jan 9, 2025

User description

Working on mocking everything in unit tests


PR Type

Enhancement, Tests, Bug fix


Description

  • Introduced extensive mocking and testing for hybrid and semantic routers.

  • Enhanced Pinecone index handling with new configurations and async support.

  • Improved error handling and logging across various modules.

  • Added support for sparse embeddings and hybrid routing logic.


Changes walkthrough 📝

Relevant files
Tests
2 files
test_router.py
Added extensive tests for hybrid and semantic routers.     
+783/-581
test_sync.py
Enhanced synchronization tests for hybrid and semantic routers.
+152/-81
Enhancement
4 files
pinecone.py
Improved Pinecone index handling and added async support.
+167/-82
base.py
Refactored base router logic and added index readiness checks.
+54/-88 
hybrid_local.py
Added support for sparse embeddings in hybrid local index.
+1/-7     
bm25.py
Enhanced BM25 encoder with debug logging.                               
+1/-0     
Additional files
8 files
Makefile +4/-4     
00-introduction.ipynb +0/-56   
base.py +21/-4   
local.py +14/-7   
postgres.py +22/-11 
qdrant.py +18/-7   
hybrid.py +136/-9 
test_hybrid_layer.py +0/-270 

💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

@jamescalam jamescalam marked this pull request as draft January 9, 2025 11:43
@jamescalam jamescalam self-assigned this Jan 9, 2025
@jamescalam jamescalam added enhancement Enhancement to existing features ci labels Jan 9, 2025
Copy link

github-actions bot commented Jan 9, 2025

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 5 🔵🔵🔵🔵🔵
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Test Coverage Gaps

The added tests are extensive, but some edge cases, such as invalid sparse embeddings or malformed metadata, may not be fully covered. Ensure all edge cases are tested.

@pytest.fixture
def cohere_encoder(mocker):
    mocker.patch.object(CohereEncoder, "__call__", side_effect=mock_encoder_call)

    # Mock async call
    async def async_mock_encoder_call(docs=None, utterances=None):
        # Handle either docs or utterances parameter
        texts = docs if docs is not None else utterances
        return mock_encoder_call(texts)

    mocker.patch.object(CohereEncoder, "acall", side_effect=async_mock_encoder_call)
    return CohereEncoder(name="test-cohere-encoder", cohere_api_key="test_api_key")


# @pytest.fixture
# def openai_encoder(mocker):
#     # Mock the OpenAI client creation and API calls
#     mocker.patch("openai.OpenAI")
#     # Mock the __call__ method
#     mocker.patch.object(OpenAIEncoder, "__call__", side_effect=mock_encoder_call)

#     # Mock async call
#     async def async_mock_encoder_call(docs=None, utterances=None):
#         # Handle either docs or utterances parameter
#         texts = docs if docs is not None else utterances
#         return mock_encoder_call(texts)

#     mocker.patch.object(OpenAIEncoder, "acall", side_effect=async_mock_encoder_call)
#     # Create and return the mocked encoder
#     encoder = OpenAIEncoder(name="text-embedding-3-small")
#     return encoder

def sparse_encoder(mocker):
    mocker.patch(
        "semantic_router.encoders.bm25.BM25Encoder.__init__",
        return_value=None
    )
    mocker.patch(
        "semantic_router.encoders.bm25.BM25Encoder.__call__",
        return_value=[SparseEmbedding.from_dict({1: 0.5, 4: 0.8})]
    )
    mocker.patch(
        "semantic_router.encoders.bm25.BM25Encoder.fit",
        return_value=None
    )
    return

@pytest.fixture
def openai_encoder(mocker):
    # also mock sparse encoder
    sparse_encoder(mocker)
    # Mock the OpenAI client creation and API calls
    mocker.patch("openai.OpenAI", return_value=MagicMock())
    mocker.patch(
        "semantic_router.encoders.openai.OpenAIEncoder.__call__",
        return_value=[[0.1,0.2,0.3]]
    )
    encoder_mock = MagicMock(spec=OpenAIEncoder)
    # Mock the __call__ method
    encoder_mock.__call__ = mock_encoder_call
    encoder = OpenAIEncoder(openai_api_key="mock_api_key")
    encoder.__call__ = mock_encoder_call
    return encoder

@pytest.fixture
def mock_openai_llm(mocker):
    # Mock the OpenAI LLM
    mocker.patch.object(OpenAILLM, "__call__", return_value="mocked response")

    # also async
    async def async_mock_llm_call(messages=None, **kwargs):
        return "mocked response"

    mocker.patch.object(OpenAILLM, "acall", side_effect=async_mock_llm_call)

    return OpenAILLM(name="fake-model-v1")


@pytest.fixture
def routes():
    return [
        Route(name="Route 1", utterances=["Hello", "Hi"], metadata={"type": "default"}),
        Route(name="Route 2", utterances=["Goodbye", "Bye", "Au revoir"]),
    ]


@pytest.fixture
def routes_2():
    return [
        Route(name="Route 1", utterances=["Hello"]),
        Route(name="Route 2", utterances=["Hi"]),
    ]


@pytest.fixture
def routes_3():
    return [
        Route(name="Route 1", utterances=["Hello"]),
        Route(name="Route 2", utterances=["Asparagus"]),
    ]


@pytest.fixture
def routes_4():
    return [
        Route(name="Route 1", utterances=["Goodbye"], metadata={"type": "default"}),
        Route(name="Route 2", utterances=["Asparagus"]),
    ]


@pytest.fixture
def route_single_utterance():
    return [
        Route(name="Route 3", utterances=["Hello"]),
    ]


@pytest.fixture
def dynamic_routes():
    return [
        Route(
            name="Route 1",
            utterances=["Hello", "Hi"],
            function_schemas=[{"name": "test"}],
        ),
        Route(
            name="Route 2",
            utterances=["Goodbye", "Bye", "Au revoir"],
            function_schemas=[{"name": "test"}],
        ),
    ]


@pytest.fixture
def test_data():
    return [
        ("What's your opinion on the current government?", "politics"),
        ("what's the weather like today?", "chitchat"),
        ("what is the Pythagorean theorem?", "mathematics"),
        ("what is photosynthesis?", "biology"),
        ("tell me an interesting fact", None),
    ]


def get_test_indexes():
    indexes = [LocalIndex]

    if importlib.util.find_spec("qdrant_client") is not None:
        indexes.append(QdrantIndex)
    if importlib.util.find_spec("pinecone") is not None:
        indexes.append(PineconeIndex)

    return indexes


def get_test_encoders():
    encoders = [openai_encoder()]
    if importlib.util.find_spec("cohere") is not None:
        encoders.append(CohereEncoder)
    return encoders


def get_test_routers():
    routers = [SemanticRouter]
    if importlib.util.find_spec("pinecone_text") is not None:
        routers.append(HybridRouter)
    return routers


@pytest.mark.parametrize(
    "index_cls,encoder_cls,router_cls",
    [
        (index, encoder, router)
        for index in [LocalIndex]#get_test_indexes()
        for encoder in ["openai_encoder"]
        for router in get_test_routers()
    ],
)
class TestIndexEncoders:
    def test_initialization(self, request, routes, index_cls, encoder_cls, router_cls):
        encoder = request.getfixturevalue(encoder_cls)
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
            top_k=10,
        )
        score_threshold = route_layer.score_threshold
        if isinstance(route_layer, HybridRouter):
            assert score_threshold == encoder.score_threshold * route_layer.alpha
        else:
            assert score_threshold == encoder.score_threshold
        assert route_layer.top_k == 10
        # allow for 5 retries in case of index not being populated
        count = 0
        while count < RETRY_COUNT:
            try:
                assert len(route_layer.index) == 5
                break
            except Exception:
                logger.warning(f"Index not populated, waiting for retry (try {count})")
                time.sleep(PINECONE_SLEEP)
                count += 1
        assert (
            len(set(route_layer._get_route_names()))
            if route_layer._get_route_names() is not None
            else 0 == 2
        )

    def test_initialization_different_encoders(
        self, request, encoder_cls, index_cls, router_cls
    ):
        encoder = request.getfixturevalue(encoder_cls)
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, index=index)
        score_threshold = route_layer.score_threshold
        if isinstance(route_layer, HybridRouter):
            assert score_threshold == encoder.score_threshold * route_layer.alpha
        else:
            assert score_threshold == encoder.score_threshold

    def test_initialization_no_encoder(self, request, index_cls, encoder_cls, router_cls):
        # apply encoder mocks (but will not get used)
        _ = request.getfixturevalue(encoder_cls)
        route_layer_none = router_cls(encoder=None)
        score_threshold = route_layer_none.score_threshold
        if isinstance(route_layer_none, HybridRouter):
            assert score_threshold == 0.3 * route_layer_none.alpha
        else:
            assert score_threshold == 0.3


class TestRouterConfig:
    def test_from_file_json(self, tmp_path):
        # Create a temporary JSON file with layer configuration
        config_path = tmp_path / "config.json"
        config_path.write_text(
            layer_json()
        )  # Assuming layer_json() returns a valid JSON string

        # Load the RouterConfig from the temporary file
        layer_config = RouterConfig.from_file(str(config_path))

        # Assertions to verify the loaded configuration
        assert layer_config.encoder_type == "cohere"
        assert layer_config.encoder_name == "embed-english-v3.0"
        assert len(layer_config.routes) == 2
        assert layer_config.routes[0].name == "politics"

    def test_from_file_yaml(self, tmp_path):
        # Create a temporary YAML file with layer configuration
        config_path = tmp_path / "config.yaml"
        config_path.write_text(
            layer_yaml()
        )  # Assuming layer_yaml() returns a valid YAML string

        # Load the RouterConfig from the temporary file
        layer_config = RouterConfig.from_file(str(config_path))

        # Assertions to verify the loaded configuration
        assert layer_config.encoder_type == "cohere"
        assert layer_config.encoder_name == "embed-english-v3.0"
        assert len(layer_config.routes) == 2
        assert layer_config.routes[0].name == "politics"

    def test_from_file_invalid_path(self):
        with pytest.raises(FileNotFoundError) as excinfo:
            RouterConfig.from_file("nonexistent_path.json")
        assert "[Errno 2] No such file or directory: 'nonexistent_path.json'" in str(
            excinfo.value
        )

    def test_from_file_unsupported_type(self, tmp_path):
        # Create a temporary unsupported file
        config_path = tmp_path / "config.unsupported"
        config_path.write_text(layer_json())

        with pytest.raises(ValueError) as excinfo:
            RouterConfig.from_file(str(config_path))
        assert "Unsupported file type" in str(excinfo.value)

    def test_from_file_invalid_config(self, tmp_path):
        # Define an invalid configuration JSON
        invalid_config_json = """
        {
            "encoder_type": "cohere",
            "encoder_name": "embed-english-v3.0",
            "routes": "This should be a list, not a string"
        }"""

        # Write the invalid configuration to a temporary JSON file
        config_path = tmp_path / "invalid_config.json"
        with open(config_path, "w") as file:
            file.write(invalid_config_json)

        # Patch the is_valid function to return False for this test
        with patch("semantic_router.routers.base.is_valid", return_value=False):
            # Attempt to load the RouterConfig from the temporary file
            # and assert that it raises an exception due to invalid configuration
            with pytest.raises(Exception) as excinfo:
                RouterConfig.from_file(str(config_path))
            assert "Invalid config JSON or YAML" in str(
                excinfo.value
            ), "Loading an invalid configuration should raise an exception."

    def test_from_file_with_llm(self, tmp_path):
        llm_config_json = """
        {
            "encoder_type": "cohere",
            "encoder_name": "embed-english-v3.0",
            "routes": [
                {
                    "name": "llm_route",
                    "utterances": ["tell me a joke", "say something funny"],
                    "llm": {
                        "module": "semantic_router.llms.base",
                        "class": "BaseLLM",
                        "model": "fake-model-v1"
                    }
                }
            ]
        }"""

        config_path = tmp_path / "config_with_llm.json"
        with open(config_path, "w") as file:
            file.write(llm_config_json)

        # Load the RouterConfig from the temporary file
        layer_config = RouterConfig.from_file(str(config_path))

        # Using BaseLLM because trying to create a usable Mock LLM is a nightmare.
        assert isinstance(
            layer_config.routes[0].llm, BaseLLM
        ), "LLM should be instantiated and associated with the route based on the "
        "config"
        assert (
            layer_config.routes[0].llm.name == "fake-model-v1"
        ), "LLM instance should have the 'name' attribute set correctly"

    def test_init(self):
        layer_config = RouterConfig()
        assert layer_config.routes == []

    def test_to_file_json(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        with patch("builtins.open", mock_open()) as mocked_open:
            layer_config.to_file("data/test_output.json")
            mocked_open.assert_called_once_with("data/test_output.json", "w")

    def test_to_file_yaml(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        with patch("builtins.open", mock_open()) as mocked_open:
            layer_config.to_file("data/test_output.yaml")
            mocked_open.assert_called_once_with("data/test_output.yaml", "w")

    def test_to_file_invalid(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        with pytest.raises(ValueError):
            layer_config.to_file("test_output.txt")

    def test_from_file_invalid(self):
        with open("test.txt", "w") as f:
            f.write("dummy content")
        with pytest.raises(ValueError):
            RouterConfig.from_file("test.txt")
        os.remove("test.txt")

    def test_to_dict(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        assert layer_config.to_dict()["routes"] == [route.to_dict()]

    def test_add(self):
        route = Route(name="test", utterances=["utterance"])
        route2 = Route(name="test2", utterances=["utterance2"])
        layer_config = RouterConfig()
        layer_config.add(route)
        # confirm route added
        assert layer_config.routes == [route]
        # add second route and check updates
        layer_config.add(route2)
        assert layer_config.routes == [route, route2]

    def test_get(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        assert layer_config.get("test") == route

    def test_get_not_found(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        assert layer_config.get("not_found") is None

    def test_remove(self):
        route = Route(name="test", utterances=["utterance"])
        layer_config = RouterConfig(routes=[route])
        layer_config.remove("test")
        assert layer_config.routes == []

    def test_setting_aggregation_methods(self, openai_encoder, routes):
        for agg in ["sum", "mean", "max"]:
            route_layer = SemanticRouter(
                encoder=openai_encoder,
                routes=routes,
                aggregation=agg,
            )
            assert route_layer.aggregation == agg

    def test_semantic_classify_multiple_routes_with_different_aggregation(
        self, openai_encoder, routes
    ):
        route_scores = [
            {"route": "Route 1", "score": 0.5},
            {"route": "Route 1", "score": 0.5},
            {"route": "Route 1", "score": 0.5},
            {"route": "Route 1", "score": 0.5},
            {"route": "Route 2", "score": 0.4},
            {"route": "Route 2", "score": 0.6},
            {"route": "Route 2", "score": 0.8},
            {"route": "Route 3", "score": 0.1},
            {"route": "Route 3", "score": 1.0},
        ]
        for agg in ["sum", "mean", "max"]:
            route_layer = SemanticRouter(
                encoder=openai_encoder,
                routes=routes,
                aggregation=agg,
            )
            classification, score = route_layer._semantic_classify(route_scores)

            if agg == "sum":
                assert classification == "Route 1"
                assert score == [0.5, 0.5, 0.5, 0.5]
            elif agg == "mean":
                assert classification == "Route 2"
                assert score == [0.4, 0.6, 0.8]
            elif agg == "max":
                assert classification == "Route 3"
                assert score == [0.1, 1.0]


@pytest.mark.parametrize(
    "index_cls,encoder_cls,router_cls",
    [
        (index, encoder, router)
        for index in get_test_indexes()
        for encoder in [OpenAIEncoder]
        for router in get_test_routers()
    ],
)
class TestSemanticRouter:
    def test_initialization_dynamic_route(
        self, dynamic_routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=dynamic_routes,
            index=index,
            auto_sync="local",
        )
        score_threshold = route_layer.score_threshold
        if isinstance(route_layer, HybridRouter):
            assert score_threshold == encoder.score_threshold * route_layer.alpha
        else:
            assert score_threshold == encoder.score_threshold

    def test_add_single_utterance(
        self, routes, route_single_utterance, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        route_layer.add(routes=route_single_utterance)
        score_threshold = route_layer.score_threshold
        if isinstance(route_layer, HybridRouter):
            assert score_threshold == encoder.score_threshold * route_layer.alpha
        else:
            assert score_threshold == encoder.score_threshold
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be updated
        _ = route_layer("Hello")
        assert len(route_layer.index.get_utterances()) == 6

    def test_init_and_add_single_utterance(
        self, route_single_utterance, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            index=index,
            auto_sync="local",
        )
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be updated
        route_layer.add(routes=route_single_utterance)
        score_threshold = route_layer.score_threshold
        if isinstance(route_layer, HybridRouter):
            assert score_threshold == encoder.score_threshold * route_layer.alpha
        else:
            assert score_threshold == encoder.score_threshold
        count = 0
        while count < RETRY_COUNT:
            try:
                _ = route_layer("Hello")
                assert len(route_layer.index.get_utterances()) == 1
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1

    def test_delete_index(self, routes, index_cls, encoder_cls, router_cls):
        # TODO merge .delete_index() and .delete_all() and get working
        index = init_index(index_cls)
        encoder = encoder_cls()
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        # delete index
        count = 0
        while count < RETRY_COUNT:
            try:
                route_layer.index.delete_index()
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1
        # assert index empty
        count = 0
        while count < RETRY_COUNT:
            try:
                assert route_layer.index.get_utterances() == []
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be updated

    def test_add_route(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder, routes=[], index=index, auto_sync="local"
        )
        # Initially, the local routes list should be empty
        assert route_layer.routes == []
        # same for the remote index
        assert route_layer.index.get_utterances() == []

        # Add route1 and check
        route_layer.add(routes=routes[0])
        count = 0
        while count < RETRY_COUNT:
            try:
                assert route_layer.routes == [routes[0]]
                assert route_layer.index is not None
                assert len(route_layer.index.get_utterances()) == 2
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1
                if index_cls is PineconeIndex:
                    time.sleep(PINECONE_SLEEP)  # allow for index to be populated

        # Add route2 and check
        route_layer.add(routes=routes[1])
        count = 0
        while count < RETRY_COUNT:
            try:
                assert route_layer.routes == [routes[0], routes[1]]
                assert len(route_layer.index.get_utterances()) == 5
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1
                if index_cls is PineconeIndex:
                    time.sleep(PINECONE_SLEEP)  # allow for index to be populated

    def test_list_route_names(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be populated
        route_names = route_layer.list_route_names()
        assert set(route_names) == {
            route.name for route in routes
        }, "The list of route names should match the names of the routes added."

    def test_delete_route(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be populated
        # Delete a route by name
        route_to_delete = routes[0].name
        route_layer.delete(route_to_delete)
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be populated
        # Ensure the route is no longer in the route layer
        assert (
            route_to_delete not in route_layer.list_route_names()
        ), "The route should be deleted from the route layer."
        # Ensure the route's utterances are no longer in the index
        for utterance in routes[0].utterances:
            assert (
                utterance not in route_layer.index
            ), "The route's utterances should be deleted from the index."

    def test_remove_route_not_found(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)
        # Attempt to remove a route that does not exist
        non_existent_route = "non-existent-route"
        route_layer.delete(non_existent_route)
        # we should see warning in logs only (ie no errors)

    def test_add_multiple_routes(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            index=index,
            auto_sync="local",
        )
        route_layer.add(routes=routes)
        count = 0
        while count < RETRY_COUNT:
            try:
                assert route_layer.index is not None
                assert len(route_layer.index.get_utterances()) == 5
                break
            except Exception:
                logger.warning(f"Index not ready, waiting for retry (try {count})")
                count += 1
                if index_cls is PineconeIndex:
                    time.sleep(PINECONE_SLEEP)  # allow for index to be populated

    def test_query_and_classification(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        count = 0
        # we allow for 5 retries to allow for index to be populated
        while count < RETRY_COUNT:
            try:
                query_result = route_layer(text="Hello").name
                assert query_result in ["Route 1", "Route 2"]
                break
            except Exception:
                logger.warning(
                    f"Query result not in expected routes, waiting for retry (try {count})"
                )
                if index_cls is PineconeIndex:
                    time.sleep(PINECONE_SLEEP)  # allow for index to be populated
            count += 1

    def test_query_filter(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        if index_cls is PineconeIndex:
            time.sleep(PINECONE_SLEEP)  # allow for index to be populated

        try:
            # TODO JB: currently LocalIndex raises ValueError but others don't
            # they should all behave in the same way
            route_layer(text="Hello", route_filter=["Route 8"]).name
        except ValueError:
            assert True

        count = 0
        # we allow for 5 retries to allow for index to be populated
        while count < RETRY_COUNT:
            try:
                query_result = route_layer(text="Hello", route_filter=["Route 1"]).name
                assert query_result in ["Route 1"]
                break
            except Exception:
                logger.warning(
                    f"Query result not in expected routes, waiting for retry (try {count})"
                )
            count += 1
            time.sleep(PINECONE_SLEEP)  # allow for index to be populated

    @pytest.mark.skipif(
        os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required"
    )
    def test_namespace_pinecone_index(self, routes, index_cls, encoder_cls, router_cls):
        if index_cls is PineconeIndex:
            encoder = encoder_cls()
            pineconeindex = init_index(
                index_cls, namespace="test", index_name=encoder.__class__.__name__
            )
            route_layer = router_cls(
                encoder=encoder,
                routes=routes,
                index=pineconeindex,
                auto_sync="local",
            )
            count = 0
            while count < RETRY_COUNT:
                try:
                    query_result = route_layer(
                        text="Hello", route_filter=["Route 1"]
                    ).name
                    assert query_result in ["Route 1"]
                    break
                except Exception:
                    logger.warning(
                        f"Query result not in expected routes, waiting for retry (try {count})"
                    )
                    if index_cls is PineconeIndex:
                        time.sleep(
                            PINECONE_SLEEP * 2
                        )  # allow for index to be populated
                count += 1
            route_layer.index.index.delete(namespace="test", delete_all=True)

    def test_query_with_no_index(self, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        route_layer = router_cls(encoder=encoder)
        # TODO: probably should avoid running this with multiple encoders or find a way to set dims
        with pytest.raises(ValueError):
            assert route_layer(text="Anything").name is None

    def test_query_with_vector(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        # create vectors
        vector = encoder(["hello"])
        if router_cls is HybridRouter:
            sparse_vector = route_layer.sparse_encoder(["hello"])[0]
        count = 0
        while count < RETRY_COUNT:
            try:
                if router_cls is HybridRouter:
                    query_result = route_layer(
                        vector=vector, sparse_vector=sparse_vector
                    ).name
                else:
                    query_result = route_layer(vector=vector).name
                assert query_result in ["Route 1", "Route 2"]
                break
            except Exception:
                logger.warning(
                    "Query result not in expected routes, waiting for retry "
                    f"(try {count})"
                )
                count += 1
                time.sleep(PINECONE_SLEEP)  # allow for index to be populated

    def test_query_with_no_text_or_vector(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        with pytest.raises(ValueError):
            route_layer()

    def test_is_ready(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        count = 0
        while count < RETRY_COUNT + 1:
            if route_layer.index.is_ready():
                break
            logger.warning("Route layer not ready, waiting for retry (try {count})")
            count += 1
            if index_cls is PineconeIndex:
                time.sleep(PINECONE_SLEEP)  # allow for index to be populated
        assert count <= RETRY_COUNT, "Route layer not ready after {RETRY_COUNT} retries"


@pytest.mark.parametrize(
    "index_cls,encoder_cls,router_cls",
    [
        (index, encoder, router)
        for index in [LocalIndex]  # no need to test with multiple indexes
        for encoder in [OpenAIEncoder]  # no need to test with multiple encoders
        for router in get_test_routers()
    ],
)
class TestRouterOnly:
    def test_semantic_classify(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        classification, score = route_layer._semantic_classify(
            [
                {"route": "Route 1", "score": 0.9},
                {"route": "Route 2", "score": 0.1},
            ]
        )
        assert classification == "Route 1"
        assert score == [0.9]

    def test_semantic_classify_multiple_routes(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        classification, score = route_layer._semantic_classify(
            [
                {"route": "Route 1", "score": 0.9},
                {"route": "Route 2", "score": 0.1},
                {"route": "Route 1", "score": 0.8},
            ]
        )
        assert classification == "Route 1"
        assert score == [0.9, 0.8]

    def test_query_no_text_dynamic_route(
        self, dynamic_routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=dynamic_routes, index=index)
        # create vectors
        vector = encoder(["hello"])
        if router_cls is HybridRouter:
            sparse_vector = route_layer.sparse_encoder(["hello"])[0]
        with pytest.raises(ValueError):
            if router_cls is HybridRouter:
                route_layer(vector=vector, sparse_vector=sparse_vector)
            else:
                route_layer(vector=vector)

    def test_pass_threshold(self, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            index=index,
            auto_sync="local",
        )
        assert not route_layer._pass_threshold([], 0.3)
        assert route_layer._pass_threshold([0.6, 0.7], 0.3)

    def test_failover_score_threshold(self, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            index=index,
            auto_sync="local",
        )
        if router_cls is HybridRouter:
            assert route_layer.score_threshold == 0.3 * route_layer.alpha
        else:
            assert route_layer.score_threshold == 0.3

    def test_json(self, routes, index_cls, encoder_cls, router_cls):
        temp = tempfile.NamedTemporaryFile(suffix=".yaml", delete=False)
        try:
            temp_path = temp.name  # Save the temporary file's path
            temp.close()  # Close the file to ensure it can be opened again on Windows
            encoder = encoder_cls()
            index = init_index(index_cls, index_name=encoder.__class__.__name__)
            route_layer = router_cls(
                encoder=encoder,
                routes=routes,
                index=index,
                auto_sync="local",
            )
            route_layer.to_json(temp_path)
            assert os.path.exists(temp_path)
            route_layer_from_file = SemanticRouter.from_json(temp_path)
            assert (
                route_layer_from_file.index is not None
                and route_layer_from_file._get_route_names() is not None
            )
        finally:
            os.remove(temp_path)  # Ensure the file is deleted even if the test fails

    def test_yaml(self, routes, index_cls, encoder_cls, router_cls):
        temp = tempfile.NamedTemporaryFile(suffix=".yaml", delete=False)
        try:
            temp_path = temp.name  # Save the temporary file's path
            temp.close()  # Close the file to ensure it can be opened again on Windows
            encoder = encoder_cls()
            index = init_index(index_cls, index_name=encoder.__class__.__name__)
            route_layer = router_cls(
                encoder=encoder,
                routes=routes,
                index=index,
                auto_sync="local",
            )
            route_layer.to_yaml(temp_path)
            assert os.path.exists(temp_path)
            route_layer_from_file = SemanticRouter.from_yaml(temp_path)
            assert (
                route_layer_from_file.index is not None
                and route_layer_from_file._get_route_names() is not None
            )
        finally:
            os.remove(temp_path)  # Ensure the file is deleted even if the test fails

    def test_config(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        # confirm route creation functions as expected
        layer_config = route_layer.to_config()
        assert layer_config.routes == route_layer.routes
        # now load from config and confirm it's the same
        route_layer_from_config = SemanticRouter.from_config(layer_config, index)
        assert (
            route_layer_from_config._get_route_names() == route_layer._get_route_names()
        )
        if router_cls is HybridRouter:
            # TODO: need to fix HybridRouter from config
            # assert (
            #     route_layer_from_config.score_threshold
            #     == route_layer.score_threshold * route_layer.alpha
            # )
            pass
        else:
            assert (
                route_layer_from_config.score_threshold == route_layer.score_threshold
            )

    def test_get_thresholds(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        if router_cls is HybridRouter:
            # TODO: fix this
            target = encoder.score_threshold * route_layer.alpha
            assert route_layer.get_thresholds() == {
                "Route 1": target,
                "Route 2": target,
            }
        else:
            assert route_layer.get_thresholds() == {"Route 1": 0.3, "Route 2": 0.3}

    def test_with_multiple_routes_passing_threshold(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        route_layer.score_threshold = 0.3  # Set the score_threshold if needed
        # Assuming route_layer is already set up with routes "Route 1" and "Route 2"
        query_results = [
            {"route": "Route 1", "score": 0.1},
            {"route": "Route 2", "score": 0.8},
            {"route": "Route 1", "score": 0.9},
        ]
        expected = [("Route 1", 0.9), ("Route 2", 0.8)]
        results = route_layer._semantic_classify_multiple_routes(query_results)
        assert sorted(results) == sorted(
            expected
        ), "Should classify and return routes above their thresholds"

    def test_with_no_routes_passing_threshold(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        # set threshold to 1.0 so that no routes pass
        route_layer.score_threshold = 1.0
        query_results = [
            {"route": "Route 1", "score": 0.01},
            {"route": "Route 2", "score": 0.02},
        ]
        expected = []
        results = route_layer._semantic_classify_multiple_routes(query_results)
        assert (
            results == expected
        ), "Should return an empty list when no routes pass their thresholds"

    def test_with_no_query_results(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        route_layer.score_threshold = 0.5
        query_results = []
        expected = []
        results = route_layer._semantic_classify_multiple_routes(query_results)
        assert (
            results == expected
        ), "Should return an empty list when there are no query results"

    def test_with_unrecognized_route(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        route_layer.score_threshold = 0.5
        # Test with a route name that does not exist in the route_layer's routes
        query_results = [{"route": "UnrecognizedRoute", "score": 0.9}]
        expected = []
        results = route_layer._semantic_classify_multiple_routes(query_results)
        assert results == expected, "Should ignore and not return unrecognized routes"

    def test_set_aggregation_method_with_unsupported_value(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        unsupported_aggregation = "unsupported_aggregation_method"
        with pytest.raises(
            ValueError,
            match=f"Unsupported aggregation method chosen: {unsupported_aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'.",
        ):
            route_layer._set_aggregation_method(unsupported_aggregation)

    def test_refresh_routes_not_implemented(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        with pytest.raises(
            NotImplementedError, match="This method has not yet been implemented."
        ):
            route_layer._refresh_routes()

    def test_update_threshold(self, routes, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        route_name = "Route 1"
        new_threshold = 0.8
        route_layer.update(name=route_name, threshold=new_threshold)
        updated_route = route_layer.get(route_name)
        assert (
            updated_route.score_threshold == new_threshold
        ), f"Expected threshold to be updated to {new_threshold}, but got {updated_route.score_threshold}"

    def test_update_non_existent_route(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        non_existent_route = "Non-existent Route"
        with pytest.raises(
            ValueError,
            match=f"Route '{non_existent_route}' not found. Nothing updated.",
        ):
            route_layer.update(name=non_existent_route, threshold=0.7)

    def test_update_without_parameters(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        with pytest.raises(
            ValueError,
            match="At least one of 'threshold' or 'utterances' must be provided.",
        ):
            route_layer.update(name="Route 1")

    def test_update_utterances_not_implemented(
        self, routes, index_cls, encoder_cls, router_cls
    ):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(encoder=encoder, routes=routes, index=index)
        with pytest.raises(
            NotImplementedError,
            match="The update method cannot be used for updating utterances yet.",
        ):
            route_layer.update(name="Route 1", utterances=["New utterance"])


@pytest.mark.parametrize(
    "index_cls,encoder_cls,router_cls",
    [
        (index, encoder, router)
        for index in get_test_indexes()
        for encoder in [OpenAIEncoder]
        for router in get_test_routers()
    ],
)
class TestLayerFit:
    def test_eval(self, routes, test_data, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        count = 0
        while True:
            if route_layer.index.is_ready():
                break
            count += 1
            if count > RETRY_COUNT:
                raise ValueError("Index not ready")
            if index_cls is PineconeIndex:
                time.sleep(PINECONE_SLEEP)
        # unpack test data
        X, y = zip(*test_data)
        # evaluate
        route_layer.evaluate(X=list(X), y=list(y), batch_size=int(len(X) / 5))

    def test_fit(self, routes, test_data, index_cls, encoder_cls, router_cls):
        encoder = encoder_cls()
        index = init_index(index_cls, index_name=encoder.__class__.__name__)
        route_layer = router_cls(
            encoder=encoder,
            routes=routes,
            index=index,
            auto_sync="local",
        )
        count = 0
        while True:
            print(f"{count=}")
            if route_layer.index.is_ready():
                break
            count += 1
            if count > RETRY_COUNT:
                raise ValueError("Index not ready")
            if index_cls is PineconeIndex:
                time.sleep(PINECONE_SLEEP)
        # unpack test data
        X, y = zip(*test_data)
        route_layer.fit(X=list(X), y=list(y), batch_size=int(len(X) / 5))
Potential Performance Bottleneck

The build_records function and its usage in add and aadd methods could introduce performance issues when handling large datasets. Consider optimizing or profiling these methods.

def build_records(
    embeddings: List[List[float]],
    routes: List[str],
    utterances: List[str],
    function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None,
    metadata_list: List[Dict[str, Any]] = [],
    sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None,
) -> List[Dict]:
    if function_schemas is None:
        function_schemas = [{}] * len(embeddings)
    if sparse_embeddings is None:
        vectors_to_upsert = [
            PineconeRecord(
                values=vector,
                route=route,
                utterance=utterance,
                function_schema=json.dumps(function_schema),
                metadata=metadata,
            ).to_dict()
            for vector, route, utterance, function_schema, metadata in zip(
                embeddings,
                routes,
                utterances,
                function_schemas,
                metadata_list,
            )
        ]
    else:
        vectors_to_upsert = [
            PineconeRecord(
                values=vector,
                sparse_values=sparse_emb.to_pinecone(),
                route=route,
                utterance=utterance,
                function_schema=json.dumps(function_schema),
                metadata=metadata,
            ).to_dict()
            for vector, route, utterance, function_schema, metadata, sparse_emb in zip(
                embeddings,
                routes,
                utterances,
                function_schemas,
                metadata_list,
                sparse_embeddings,
            )
        ]
    return vectors_to_upsert


class PineconeRecord(BaseModel):
    id: str = ""
    values: List[float]
    sparse_values: Optional[dict[str, list]] = None
    route: str
    utterance: str
    function_schema: str = "{}"
    metadata: Dict[str, Any] = {}  # Additional metadata dictionary

    def __init__(self, **data):
        super().__init__(**data)
        clean_route = clean_route_name(self.route)
        # Use SHA-256 for a more secure hash
        utterance_id = hashlib.sha256(self.utterance.encode()).hexdigest()
        self.id = f"{clean_route}#{utterance_id}"
        self.metadata.update(
            {
                "sr_route": self.route,
                "sr_utterance": self.utterance,
                "sr_function_schema": self.function_schema,
            }
        )

    def to_dict(self):
        d = {
            "id": self.id,
            "values": self.values,
            "metadata": self.metadata,
        }
        if self.sparse_values:
            d["sparse_values"] = self.sparse_values
        return d


class PineconeIndex(BaseIndex):
    index_prefix: str = "semantic-router--"
    api_key: Optional[str] = None
    index_name: str = "index"
    dimensions: Union[int, None] = None
    metric: str = "dotproduct"
    cloud: str = "aws"
    region: str = "us-east-1"
    host: str = ""
    client: Any = Field(default=None, exclude=True)
    async_client: Any = Field(default=None, exclude=True)
    index: Optional[Any] = Field(default=None, exclude=True)
    ServerlessSpec: Any = Field(default=None, exclude=True)
    namespace: Optional[str] = ""
    base_url: Optional[str] = "https://api.pinecone.io"

    def __init__(
        self,
        api_key: Optional[str] = None,
        index_name: str = "index",
        dimensions: Optional[int] = None,
        metric: str = "dotproduct",
        cloud: str = "aws",
        region: str = "us-east-1",
        host: str = "",
        namespace: Optional[str] = "",
        base_url: Optional[str] = "https://api.pinecone.io",
        init_async_index: bool = False,
    ):
        super().__init__()
        self.index_name = index_name
        self.dimensions = dimensions
        self.metric = metric
        self.cloud = cloud
        self.region = region
        self.host = host
        if namespace == "sr_config":
            raise ValueError("Namespace 'sr_config' is reserved for internal use.")
        self.namespace = namespace
        self.type = "pinecone"
        self.api_key = api_key or os.getenv("PINECONE_API_KEY")
        self.base_url = base_url

        logger.warning(
            "Default region changed from us-west-2 to us-east-1 in v0.1.0.dev6"
        )

        if self.api_key is None:
            raise ValueError("Pinecone API key is required.")

        self.client = self._initialize_client(api_key=self.api_key)
        if init_async_index:
            self.async_client = self._initialize_async_client(api_key=self.api_key)
        else:
            self.async_client = None
        # try initializing index
        self.index = self._init_index()

    def _initialize_client(self, api_key: Optional[str] = None):
        try:
            from pinecone import Pinecone, ServerlessSpec

            self.ServerlessSpec = ServerlessSpec
        except ImportError:
            raise ImportError(
                "Please install pinecone-client to use PineconeIndex. "
                "You can install it with: "
                "`pip install 'semantic-router[pinecone]'`"
            )
        pinecone_args = {
            "api_key": api_key,
            "source_tag": "semanticrouter",
            "host": self.base_url,
        }
        if self.namespace:
            pinecone_args["namespace"] = self.namespace

        return Pinecone(**pinecone_args)

    def _initialize_async_client(self, api_key: Optional[str] = None):
        api_key = api_key or self.api_key
        if api_key is None:
            raise ValueError("Pinecone API key is required.")
        async_client = aiohttp.ClientSession(
            headers={
                "Api-Key": api_key,
                "Content-Type": "application/json",
                "X-Pinecone-API-Version": "2024-07",
                "User-Agent": "source_tag=semanticrouter",
            }
        )
        return async_client

    def _init_index(self, force_create: bool = False) -> Union[Any, None]:
        """Initializing the index can be done after the object has been created
        to allow for the user to set the dimensions and other parameters.

        If the index doesn't exist and the dimensions are given, the index will
        be created. If the index exists, it will be returned. If the index doesn't
        exist and the dimensions are not given, the index will not be created and
        None will be returned.

        :param force_create: If True, the index will be created even if the
            dimensions are not given (which will raise an error).
        :type force_create: bool, optional
        """
        index_exists = self.index_name in self.client.list_indexes().names()
        dimensions_given = self.dimensions is not None
        if dimensions_given and not index_exists:
            # if the index doesn't exist and we have dimension value
            # we create the index
            self.client.create_index(
                name=self.index_name,
                dimension=self.dimensions,
                metric=self.metric,
                spec=self.ServerlessSpec(cloud=self.cloud, region=self.region),
            )
            # wait for index to be created
            while not self.client.describe_index(self.index_name).status["ready"]:
                time.sleep(1)
            index = self.client.Index(self.index_name)
            time.sleep(0.5)
        elif index_exists:
            # if the index exists we just return it
            index = self.client.Index(self.index_name)
            # grab the dimensions from the index
            self.dimensions = index.describe_index_stats()["dimension"]
        elif force_create and not dimensions_given:
            raise ValueError(
                "Cannot create an index without specifying the dimensions."
            )
        else:
            # if the index doesn't exist and we don't have the dimensions
            # we return None
            logger.warning("Index could not be initialized.")
            index = None
        if index is not None:
            self.host = self.client.describe_index(self.index_name)["host"]
        return index

    async def _init_async_index(self, force_create: bool = False):
        index_stats = None
        indexes = await self._async_list_indexes()
        index_names = [i["name"] for i in indexes["indexes"]]
        index_exists = self.index_name in index_names
        if self.dimensions is not None and not index_exists:
            await self._async_create_index(
                name=self.index_name,
                dimension=self.dimensions,
                metric=self.metric,
                cloud=self.cloud,
                region=self.region,
            )
            # TODO describe index and async sleep
            index_ready = "false"
            while index_ready != "true":
                index_stats = await self._async_describe_index(self.index_name)
                index_ready = index_stats["status"]["ready"]
                await asyncio.sleep(1)
        elif index_exists:
            index_stats = await self._async_describe_index(self.index_name)
            # grab dimensions for the index
            self.dimensions = index_stats["dimension"]
        elif force_create and self.dimensions is None:
            raise ValueError(
                "Cannot create an index without specifying the dimensions."
            )
        else:
            # if the index doesn't exist and we don't have the dimensions
            # we raise warning
            logger.warning("Index could not be initialized.")
        self.host = index_stats["host"] if index_stats else ""

    def _batch_upsert(self, batch: List[Dict]):
        """Helper method for upserting a single batch of records.

        :param batch: The batch of records to upsert.
        :type batch: List[Dict]
        """
        if self.index is not None:
            self.index.upsert(vectors=batch, namespace=self.namespace)
        else:
            raise ValueError("Index is None, could not upsert.")

    def add(
        self,
        embeddings: List[List[float]],
        routes: List[str],
        utterances: List[str],
        function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None,
        metadata_list: List[Dict[str, Any]] = [],
        batch_size: int = 100,
        sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None,
        **kwargs,
    ):
        """Add vectors to Pinecone in batches."""
        if self.index is None:
            self.dimensions = self.dimensions or len(embeddings[0])
            self.index = self._init_index(force_create=True)
        vectors_to_upsert = build_records(
            embeddings=embeddings,
            routes=routes,
            utterances=utterances,
            function_schemas=function_schemas,
            metadata_list=metadata_list,
            sparse_embeddings=sparse_embeddings,
        )

        for i in range(0, len(vectors_to_upsert), batch_size):
            batch = vectors_to_upsert[i : i + batch_size]
            self._batch_upsert(batch)

    async def aadd(
        self,
        embeddings: List[List[float]],
        routes: List[str],
        utterances: List[str],
        function_schemas: Optional[Optional[List[Dict[str, Any]]]] = None,
        metadata_list: List[Dict[str, Any]] = [],
        batch_size: int = 100,
        sparse_embeddings: Optional[Optional[List[SparseEmbedding]]] = None,
        **kwargs,
    ):
        """Add vectors to Pinecone in batches."""
        if self.index is None:
            self.dimensions = self.dimensions or len(embeddings[0])
            self.index = await self._init_async_index(force_create=True)
        vectors_to_upsert = build_records(
            embeddings=embeddings,
            routes=routes,
            utterances=utterances,
            function_schemas=function_schemas,
            metadata_list=metadata_list,
            sparse_embeddings=sparse_embeddings,
        )

        for i in range(0, len(vectors_to_upsert), batch_size):
            batch = vectors_to_upsert[i : i + batch_size]
            await self._async_upsert(
                vectors=batch,
                namespace=self.namespace or "",
            )
Logging Overhead

Excessive use of logger.warning with debug information (e.g., "JBTEMP") may clutter logs and impact performance. Consider removing or gating these logs.

    if self.auto_sync:
        self._init_index_state()

def _set_score_threshold(self):
    """Set the score threshold for the HybridRouter. Unlike the base router the
    encoder score threshold is not used directly. Instead, the dense encoder
    score threshold is multiplied by the alpha value, resulting in a lower
    score threshold. This is done to account for the difference in returned
    scores from the hybrid router.
    """
    if self.encoder.score_threshold is not None:
        self.score_threshold = self.encoder.score_threshold * self.alpha
        if self.score_threshold is None:
            logger.warning(
                "No score threshold value found in encoder. Using the default "
                "'None' value can lead to unexpected results."
            )

def add(self, routes: List[Route] | Route):
    """Add a route to the local HybridRouter and index.

    :param route: The route to add.
    :type route: Route
    """
    # TODO: merge into single method within BaseRouter
    current_local_hash = self._get_hash()
    current_remote_hash = self.index._read_hash()
    if current_remote_hash.value == "":
        # if remote hash is empty, the index is to be initialized
        current_remote_hash = current_local_hash
    logger.warning(f"JBTEMP: {routes}")
    if isinstance(routes, Route):
        routes = [routes]
    # create embeddings for all routes
    route_names, all_utterances, all_function_schemas, all_metadata = (
        self._extract_routes_details(routes, include_metadata=True)
    )
    # TODO: to merge, self._encode should probably output a special
    # TODO Embedding type that can be either dense or hybrid
    dense_emb, sparse_emb = self._encode(all_utterances)
    self.index.add(
        embeddings=dense_emb.tolist(),
        routes=route_names,
        utterances=all_utterances,
        function_schemas=all_function_schemas,
        metadata_list=all_metadata,
        sparse_embeddings=sparse_emb,
    )

    self.routes.extend(routes)
    if current_local_hash.value == current_remote_hash.value:
        self._write_hash()  # update current hash in index
    else:
        logger.warning(
            "Local and remote route layers were not aligned. Remote hash "
            f"not updated. Use `{self.__class__.__name__}.get_utterance_diff()` "
            "to see details."
        )

def _execute_sync_strategy(self, strategy: Dict[str, Dict[str, List[Utterance]]]):
    """Executes the provided sync strategy, either deleting or upserting
    routes from the local and remote instances as defined in the strategy.

    :param strategy: The sync strategy to execute.
    :type strategy: Dict[str, Dict[str, List[Utterance]]]
    """
    if strategy["remote"]["delete"]:
        data_to_delete = {}  # type: ignore
        for utt_obj in strategy["remote"]["delete"]:
            data_to_delete.setdefault(utt_obj.route, []).append(utt_obj.utterance)
        # TODO: switch to remove without sync??
        self.index._remove_and_sync(data_to_delete)
    if strategy["remote"]["upsert"]:
        utterances_text = [utt.utterance for utt in strategy["remote"]["upsert"]]
        dense_emb, sparse_emb = self._encode(utterances_text)
        self.index.add(
            embeddings=dense_emb.tolist(),
            routes=[utt.route for utt in strategy["remote"]["upsert"]],
            utterances=utterances_text,
            function_schemas=[
                utt.function_schemas for utt in strategy["remote"]["upsert"]  # type: ignore
            ],
            metadata_list=[utt.metadata for utt in strategy["remote"]["upsert"]],
            sparse_embeddings=sparse_emb,
        )
    if strategy["local"]["delete"]:
        self._local_delete(utterances=strategy["local"]["delete"])
    if strategy["local"]["upsert"]:
        self._local_upsert(utterances=strategy["local"]["upsert"])
    # update hash
    self._write_hash()

def _get_index(self, index: Optional[BaseIndex]) -> BaseIndex:
    if index is None:
        logger.warning("No index provided. Using default HybridLocalIndex.")
        index = HybridLocalIndex()
    else:
        index = index
    return index

def _get_sparse_encoder(
    self, sparse_encoder: Optional[SparseEncoder]
) -> SparseEncoder:
    if sparse_encoder is None:
        logger.warning("No sparse_encoder provided. Using default BM25Encoder.")
        sparse_encoder = BM25Encoder()
    else:
        sparse_encoder = sparse_encoder
    return sparse_encoder

def _encode(self, text: list[str]) -> tuple[np.ndarray, list[SparseEmbedding]]:
    """Given some text, generates dense and sparse embeddings, then scales them
    using the chosen alpha value.
    """
    if self...

Copy link

github-actions bot commented Jan 9, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Score
Possible issue
Import NumPy to avoid runtime errors when using its functions in the fit and evaluate methods

Ensure that np (NumPy) is imported in the file since it is used in the fit and
evaluate methods for array operations.

semantic_router/routers/hybrid.py [316]

+import numpy as np
 emb_d = np.array(self.encoder(X[i : i + batch_size]))
Suggestion importance[1-10]: 10

Why: The np module is used in the fit and evaluate methods but is not imported in the file. Adding the import statement is essential to prevent runtime errors, making this a critical fix.

10
Validate that self.alpha is set and non-zero to avoid runtime errors during threshold calculation

Add a check to ensure that self.alpha is not None or zero before using it in
_set_score_threshold to prevent potential runtime errors.

semantic_router/routers/hybrid.py [77]

+if self.alpha is None or self.alpha == 0:
+    raise ValueError("Alpha value must be set and non-zero.")
 self.score_threshold = self.encoder.score_threshold * self.alpha
Suggestion importance[1-10]: 9

Why: Adding a validation check for self.alpha ensures robustness by preventing potential runtime errors when it is None or zero. This is a critical improvement for the _set_score_threshold method's reliability.

9
Add error handling for asynchronous API calls to ensure robustness

Add error handling for the _async_upsert method to ensure failures during the
asynchronous API call are logged and managed appropriately.

semantic_router/index/pinecone.py [738-745]

 async with self.async_client.post(
     f"https://{self.host}/vectors/upsert",
     json=params,
 ) as response:
+    if response.status != 200:
+        logger.error(f"Failed to upsert vectors: {response.status}")
+        raise RuntimeError("Upsert operation failed")
     res = await response.json(content_type=None)
     logger.warning(f"JBTEMP Pinecone upsert response: {res}")
     return res
Suggestion importance[1-10]: 8

Why: Adding error handling for the _async_upsert method improves the robustness of the code by ensuring failures during asynchronous API calls are logged and managed appropriately. This is a critical enhancement for production reliability.

8
Ensure TEST_ID is defined before usage to prevent runtime errors

Add a check to ensure that TEST_ID is defined before using it in the init_index
function to avoid potential runtime errors.

tests/unit/test_router.py [57]

-index_name = TEST_ID if not index_name else f"{TEST_ID}-{index_name.lower()}"
+index_name = TEST_ID if 'TEST_ID' in globals() and not index_name else f"{TEST_ID}-{index_name.lower()}"
Suggestion importance[1-10]: 7

Why: Adding a check for TEST_ID ensures robustness and prevents potential runtime errors. While the suggestion is valid and improves code reliability, it could be further enhanced by raising a clear error if TEST_ID is not defined.

7
Enhance the readiness check to include all critical attributes

Ensure that the is_ready method checks all necessary conditions for readiness, such
as verifying the initialization of critical attributes like self.index.

semantic_router/index/base.py [163-166]

 def is_ready(self) -> bool:
     """
     Checks if the index is ready to be used.
     """
-    return self.index is not None
+    return self.index is not None and self.namespace is not None
Suggestion importance[1-10]: 6

Why: The suggestion improves the is_ready method by ensuring all critical attributes are checked, which enhances robustness. However, the addition of self.namespace as a condition may not be universally necessary, depending on the context of its usage.

6
Verify the existence of required functions like layer_json() to prevent runtime errors

Ensure that the layer_json() and layer_yaml() functions are defined or imported, as
their absence will cause runtime errors in the test_from_file_json and
test_from_file_yaml methods.

tests/unit/test_router.py [356]

-layer_json()
+layer_json()  # Ensure layer_json() is defined or imported
Suggestion importance[1-10]: 6

Why: Ensuring that layer_json() and layer_yaml() are defined or imported is important to prevent runtime errors. However, the suggestion is more of a reminder and does not provide a concrete implementation, slightly reducing its impact.

6
Security
Replace hardcoded API keys with environment variables to enhance security

Ensure sensitive information like API keys (OPENAI_API_KEY) is not hardcoded in the
codebase, as it poses a security risk. Use environment variables or secure secrets
management instead.

tests/unit/test_router.py [22]

-os.environ["OPENAI_API_KEY"] = "mock-api-key"
+os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "default-mock-api-key")
Suggestion importance[1-10]: 9

Why: Replacing hardcoded API keys with environment variables significantly enhances security by preventing accidental exposure of sensitive information. The suggestion is accurate and directly addresses a critical security concern.

9
General
Replace or remove debugging log statements to ensure clean and meaningful logging in production

Ensure that the logger.warning calls with "JBTEMP" messages are removed or replaced
with meaningful log messages to avoid leaving debugging artifacts in production
code.

semantic_router/routers/hybrid.py [96-257]

-logger.warning(f"JBTEMP: {routes}")
-logger.warning(f"JBTEMP: {scores}")
-logger.warning(f"JBTEMP: {route_names}")
-logger.warning(f"JBTEMP: {top_class}")
-logger.warning(f"JBTEMP: {top_class_scores}")
+# Replace or remove debugging logs
+logger.info(f"Routes processed: {routes}")
+logger.debug(f"Query scores: {scores}")
+logger.debug(f"Route names: {route_names}")
+logger.debug(f"Top class: {top_class}")
+logger.debug(f"Top class scores: {top_class_scores}")
Suggestion importance[1-10]: 8

Why: Debugging log statements with "JBTEMP" are not suitable for production code. Replacing them with meaningful log messages improves code quality and maintainability, ensuring logs are informative and professional.

8
Add a timeout or fail-safe mechanism to prevent infinite loops in retry logic

Add a timeout or maximum retry limit for the retry loops in tests to prevent
infinite loops in case of persistent failures.

tests/unit/test_router.py [314-321]

 while count < RETRY_COUNT:
     try:
         assert len(route_layer.index.get_utterances()) == 5
         break
     except Exception:
         logger.warning(f"Index not populated, waiting for retry (try {count})")
         time.sleep(PINECONE_SLEEP)
         count += 1
+else:
+    raise TimeoutError("Index not populated after maximum retries")
Suggestion importance[1-10]: 8

Why: Adding a timeout or fail-safe mechanism to retry loops prevents potential infinite loops, improving test reliability and robustness. The suggestion is well-justified and provides a concrete improvement to the test logic.

8
Remove unnecessary debug print statements to clean up the code

Remove the debug print statement to avoid exposing sensitive data or cluttering logs
in production.

semantic_router/encoders/bm25.py [60]

-print(f"JBTEMP: {docs}")
+# Debugging print statement removed
Suggestion importance[1-10]: 7

Why: Removing debug print statements is a good practice for production code as it prevents cluttering logs and exposing sensitive data. This suggestion directly addresses the issue and improves code quality.

7
Add error handling for embedding generation to improve robustness and error visibility

Add error handling for potential exceptions during the embedding generation loop in
the fit and evaluate methods to ensure robustness.

semantic_router/routers/hybrid.py [316-319]

-emb_d = np.array(self.encoder(X[i : i + batch_size]))
-emb_s = self.sparse_encoder(X[i : i + batch_size])
+try:
+    emb_d = np.array(self.encoder(X[i : i + batch_size]))
+    emb_s = self.sparse_encoder(X[i : i + batch_size])
+except Exception as e:
+    logger.error(f"Error generating embeddings: {e}")
+    raise
Suggestion importance[1-10]: 7

Why: Adding error handling during embedding generation enhances the robustness of the fit and evaluate methods by ensuring that exceptions are logged and raised appropriately, aiding in debugging and stability.

7
Add error logging for cases where the index readiness check fails

Add a fallback mechanism or error handling for the is_ready method in the call
method to prevent runtime errors if the index is not ready.

semantic_router/routers/base.py [425-426]

 if not self.index.is_ready():
+    logger.error("Index is not ready.")
     raise ValueError("Index is not ready.")
Suggestion importance[1-10]: 5

Why: Adding error logging provides better visibility into why the index readiness check fails, which aids debugging. However, the improvement is minor as the existing code already raises an exception.

5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci enhancement Enhancement to existing features Review effort [1-5]: 5
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant