Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up method AstraDBVectorStoreComponent.reset_database_list by 59% in PR #6048 (bugfix-dev-astradb) #6066

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
131 changes: 75 additions & 56 deletions src/backend/base/langflow/components/tools/duck_duck_go_search_run.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,94 @@
from typing import Any

from langchain.tools import StructuredTool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import ToolException
from pydantic import BaseModel, Field

from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.field_typing import Tool
from langflow.custom import Component
from langflow.inputs import IntInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.message import Message


class DuckDuckGoSearchComponent(Component):
"""Component for performing web searches using DuckDuckGo."""

display_name = "DuckDuckGo Search"
description = "Search the web using DuckDuckGo with customizable result limits"
documentation = "https://python.langchain.com/docs/integrations/tools/ddg"
icon = "DuckDuckGo"

class DuckDuckGoSearchComponent(LCToolComponent):
display_name: str = "DuckDuckGo Search"
description: str = "Perform web searches using the DuckDuckGo search engine with result limiting"
name = "DuckDuckGoSearch"
documentation: str = "https://python.langchain.com/docs/integrations/tools/ddg"
icon: str = "DuckDuckGo"
inputs = [
MessageTextInput(
name="input_value",
display_name="Search Query",
required=True,
info="The search query to execute with DuckDuckGo",
tool_mode=True,
),
IntInput(
name="max_results",
display_name="Max Results",
value=5,
required=False,
advanced=True,
info="Maximum number of search results to return",
),
IntInput(
name="max_snippet_length",
display_name="Max Snippet Length",
value=100,
required=False,
advanced=True,
info="Maximum length of each result snippet",
),
IntInput(name="max_results", display_name="Max Results", value=5, advanced=True),
IntInput(name="max_snippet_length", display_name="Max Snippet Length", value=100, advanced=True),
]

class DuckDuckGoSearchSchema(BaseModel):
query: str = Field(..., description="The search query")
max_results: int = Field(5, description="Maximum number of results to return")
max_snippet_length: int = Field(100, description="Maximum length of each result snippet")
outputs = [
Output(display_name="Data", name="data", method="fetch_content"),
Output(display_name="Text", name="text", method="fetch_content_text"),
]

def _build_wrapper(self):
def _build_wrapper(self) -> DuckDuckGoSearchRun:
"""Build the DuckDuckGo search wrapper."""
return DuckDuckGoSearchRun()

def build_tool(self) -> Tool:
wrapper = self._build_wrapper()
def run_model(self) -> list[Data]:
return self.fetch_content()

def search_func(query: str, max_results: int = 5, max_snippet_length: int = 100) -> list[dict[str, Any]]:
try:
full_results = wrapper.run(f"{query} (site:*)")
result_list = full_results.split("\n")[:max_results]
limited_results = []
for result in result_list:
limited_result = {
"snippet": result[:max_snippet_length],
}
limited_results.append(limited_result)
except Exception as e:
msg = f"Error in DuckDuckGo Search: {e!s}"
raise ToolException(msg) from e
return limited_results
def fetch_content(self) -> list[Data]:
"""Execute the search and return results as Data objects."""
try:
wrapper = self._build_wrapper()

tool = StructuredTool.from_function(
name="duckduckgo_search",
description="Search for recent results using DuckDuckGo with result limiting",
func=search_func,
args_schema=self.DuckDuckGoSearchSchema,
)
self.status = "DuckDuckGo Search Tool created"
return tool
# Execute search and get full results
full_results = wrapper.run(f"{self.input_value} (site:*)")

def run_model(self) -> list[Data]:
tool = self.build_tool()
results = tool.run(
{
"query": self.input_value,
"max_results": self.max_results,
"max_snippet_length": self.max_snippet_length,
}
)
data_list = [Data(data=result, text=result.get("snippet", "")) for result in results]
self.status = data_list # type: ignore[assignment]
return data_list
# Split results and limit to max_results
result_list = full_results.split("\n")[: self.max_results]

# Process and format results
data_results = []
for result in result_list:
if result.strip(): # Only process non-empty results
snippet = result[: self.max_snippet_length]
data_results.append(
Data(
text=snippet,
data={
"content": result,
"snippet": snippet,
},
)
)
except (ValueError, AttributeError) as e:
error_data = [Data(text=str(e), data={"error": str(e)})]
self.status = error_data
return error_data
else:
self.status = data_results
return data_results

def fetch_content_text(self) -> Message:
"""Return search results as a single text message."""
data = self.fetch_content()
result_string = "\n".join(item.text for item in data)
self.status = result_string
return Message(text=result_string)
106 changes: 74 additions & 32 deletions src/backend/base/langflow/components/vectorstores/astradb.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
display_name="Environment",
info="The environment for the Astra DB API Endpoint.",
advanced=True,
real_time_refresh=True,
),
DropdownInput(
name="api_endpoint",
Expand Down Expand Up @@ -315,11 +316,16 @@
# Get the list of databases
db_list = list(admin_client.list_databases())

# Set the environment properly
env_string = ""
if environment and environment != "prod":
env_string = f"-{environment}"

# Generate the api endpoint for each database
db_info_dict = {}
for db in db_list:
try:
api_endpoint = f"https://{db.info.id}-{db.info.region}.apps.astra.datastax.com"
api_endpoint = f"https://{db.info.id}-{db.info.region}.apps.astra{env_string}.datastax.com"
db_info_dict[db.info.name] = {
"api_endpoint": api_endpoint,
"collections": len(
Expand Down Expand Up @@ -473,6 +479,40 @@
for col in collection_list
]

def reset_collection_list(self, build_config: dict):
# Get the list of options we have based on the token provided
collection_options = self._initialize_collection_options()

# If we retrieved options based on the token, show the dropdown
build_config["collection_name"]["options"] = [col["name"] for col in collection_options]
build_config["collection_name"]["options_metadata"] = [
{k: v for k, v in col.items() if k not in ["name"]} for col in collection_options
]

# Reset the selected collection
build_config["collection_name"]["value"] = ""

return build_config

def reset_database_list(self, build_config: dict):
# Get the list of options we have based on the token provided
database_options = self._fetch_and_cache_database_options()

# If we retrieved options based on the token, update the dropdown
build_config["api_endpoint"]["options"] = [db["name"] for db in database_options]
build_config["api_endpoint"]["options_metadata"] = [
{
"collections": db["collections"],
"api_endpoint": db["api_endpoint"],
}
for db in database_options
]

# Reset the selected database
build_config["api_endpoint"]["value"] = ""

return build_config

def reset_build_config(self, build_config: dict):
# Reset the list of databases we have based on the token provided
build_config["api_endpoint"]["options"] = []
Expand All @@ -489,25 +529,17 @@

def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None):
# When the component first executes, this is the update refresh call
first_run = field_name == "collection_name" and not field_value
first_run = field_name == "collection_name" and not field_value and not build_config["api_endpoint"]["options"]

# If the token has not been provided, simply return
if not self.token or field_name == "environment":
if not self.token:
return self.reset_build_config(build_config)

# Refresh the database name options
if first_run or field_name == "token":
# If this is the first execution of the component, reset and build database list
if first_run or field_name in ["token", "environment"]:
# Reset the build config to ensure we are starting fresh
build_config = self.reset_build_config(build_config)

# Get the list of options we have based on the token provided
database_options = self._initialize_database_options()

# If we retrieved options based on the token, show the dropdown
build_config["api_endpoint"]["options"] = [db["name"] for db in database_options]
build_config["api_endpoint"]["options_metadata"] = [
{k: v for k, v in db.items() if k not in ["name"]} for db in database_options
]
build_config = self.reset_database_list(build_config)

# Get list of regions for a given cloud provider
"""
Expand All @@ -526,8 +558,9 @@

# Refresh the collection name options
if field_name == "api_endpoint":
# Reset the selected collection
build_config["collection_name"]["value"] = ""
# If missing, refresh the database options
if not build_config["api_endpoint"]["options"] or not field_value:
return self.update_build_config(build_config, field_value=self.token, field_name="token")

# Set the underlying api endpoint value of the database
if field_value in build_config["api_endpoint"]["options"]:
Expand All @@ -538,21 +571,14 @@
else:
build_config["d_api_endpoint"]["value"] = ""

# Reload the list of collections and metadata associated
collection_options = self._initialize_collection_options(
api_endpoint=build_config["d_api_endpoint"]["value"]
)

# If we have collections, show the dropdown
build_config["collection_name"]["options"] = [col["name"] for col in collection_options]
build_config["collection_name"]["options_metadata"] = [
{k: v for k, v in col.items() if k not in ["name"]} for col in collection_options
]

return build_config
# Reset the list of collections we have based on the token provided
return self.reset_collection_list(build_config)

# Hide embedding model option if opriona_metadata provider is not null
if field_name == "collection_name" and field_value:
# Assume we will be autodetecting the collection:
build_config["autodetect_collection"]["value"] = True

# Set the options for collection name to be the field value if its a new collection
if field_value not in build_config["collection_name"]["options"]:
# Add the new collection to the list of options
Expand All @@ -563,13 +589,9 @@

# Ensure that autodetect collection is set to False, since its a new collection
build_config["autodetect_collection"]["value"] = False
else:
build_config["autodetect_collection"]["value"] = True

# Find the position of the selected collection to align with metadata
index_of_name = build_config["collection_name"]["options"].index(field_value)

# Get the provider value of the selected collection
value_of_provider = build_config["collection_name"]["options_metadata"][index_of_name]["provider"]

# If we were able to determine the Vectorize provider, set it accordingly
Expand Down Expand Up @@ -788,3 +810,23 @@
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}

def __init__(self):
super().__init__()
self.cached_db_options = None

def _fetch_and_cache_database_options(self):
if not self.cached_db_options:
try:
self.cached_db_options = [
{
"name": name,
"collections": info["collections"],
"api_endpoint": info["api_endpoint"],
}
for name, info in self.get_database_list().items()
]
except Exception as e:
raise ValueError(f"Error fetching database options: {e}") from e

Check failure on line 830 in src/backend/base/langflow/components/vectorstores/astradb.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (TRY003)

src/backend/base/langflow/components/vectorstores/astradb.py:830:23: TRY003 Avoid specifying long messages outside the exception class

Check failure on line 830 in src/backend/base/langflow/components/vectorstores/astradb.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (EM102)

src/backend/base/langflow/components/vectorstores/astradb.py:830:34: EM102 Exception must not use an f-string literal, assign to variable first

return self.cached_db_options
Loading
Loading