Skip to content

Commit

Permalink
Increase max limit and max offset (#735)
Browse files Browse the repository at this point in the history
Increase max limit to 1000 and max offset to 10,000. Return a 400 error if these are violated.
  • Loading branch information
farshidz authored Jan 25, 2024
1 parent 702d274 commit 24a696c
Show file tree
Hide file tree
Showing 21 changed files with 639 additions and 35 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ typing-extensions==4.5.0
urllib3==1.26.0
pydantic==1.10.11
httpx==0.25.0
semver==3.0.2
6 changes: 6 additions & 0 deletions src/marqo/api/models/rollback_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from marqo.base_model import ImmutableStrictBaseModel


class RollbackRequest(ImmutableStrictBaseModel):
from_version: str
to_version: str
136 changes: 125 additions & 11 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import marqo.logging
import marqo.vespa.vespa_client
from marqo import version
from marqo.base_model import ImmutableStrictBaseModel
from marqo.core import constants
from marqo.core.exceptions import IndexExistsError, IndexNotFoundError
from marqo.core.models import MarqoIndex
from marqo.core.models.marqo_index_request import MarqoIndexRequest
Expand All @@ -18,8 +21,13 @@
logger = marqo.logging.get_logger(__name__)


class _MarqoConfig(ImmutableStrictBaseModel):
version: str


class IndexManagement:
_MARQO_SETTINGS_SCHEMA_NAME = 'marqo__settings'
_MARQO_CONFIG_DOC_ID = 'marqo__config'
_MARQO_SETTINGS_SCHEMA_TEMPLATE = textwrap.dedent(
'''
schema %s {
Expand All @@ -34,23 +42,33 @@ class IndexManagement:
}
'''
)
_DEFAULT_QUERY_PROFILE_TEMPLATE = textwrap.dedent(
'''
<query-profile id="default">
<field name="maxHits">1000</field>
<field name="maxOffset">10000</field>
</query-profile>
'''
)

def __init__(self, vespa_client: VespaClient):
self.vespa_client = vespa_client

def create_settings_schema(self) -> bool:
def bootstrap_vespa(self) -> bool:
"""
Create the Marqo settings schema if it does not exist.
Add Marqo configuration to Vespa application package if an existing Marqo configuration is not detected.
Returns:
True if schema was created, False if it already existed
True if Vespa was configured, False if it was already configured
"""
app = self.vespa_client.download_application()
settings_schema_created = self._add_marqo_settings_schema(app)
configured = self._marqo_config_exists(app)

if settings_schema_created:
if not configured:
self._add_marqo_config(app)
self.vespa_client.deploy_application(app)
self.vespa_client.wait_for_application_convergence()
self._save_marqo_version(version.get_version())
return True

return False
Expand All @@ -70,10 +88,13 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
InvalidVespaApplicationError: If Vespa application is invalid after applying the index
"""
app = self.vespa_client.download_application()
settings_schema_created = self._add_marqo_settings_schema(app)
configured = self._marqo_config_exists(app)

if not settings_schema_created and self.index_exists(marqo_index_request.name):
if configured and self.index_exists(marqo_index_request.name):
raise IndexExistsError(f"Index {marqo_index_request.name} already exists")
else:
logger.debug('Marqo config does not exist. Configuring Vespa as part of index creation')
self._add_marqo_config(app)

vespa_schema = vespa_schema_factory(marqo_index_request)
schema, marqo_index = vespa_schema.generate_schema()
Expand All @@ -86,6 +107,9 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
self.vespa_client.wait_for_application_convergence()
self._save_index_settings(marqo_index)

if not configured:
self._save_marqo_version(version.get_version())

return marqo_index

def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) -> List[MarqoIndex]:
Expand All @@ -105,9 +129,9 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
InvalidVespaApplicationError: If Vespa application is invalid after applying the indexes
"""
app = self.vespa_client.download_application()
settings_schema_created = self._add_marqo_settings_schema(app)
configured = self._add_marqo_config(app)

if not settings_schema_created:
if not configured:
for index in marqo_index_requests:
if self.index_exists(index.name):
raise IndexExistsError(f"Index {index.name} already exists")
Expand Down Expand Up @@ -199,8 +223,10 @@ def get_all_indexes(self) -> List[MarqoIndex]:
raise InternalError("Unexpected continuation token received")

return [
MarqoIndex.parse_raw(response.fields['settings'])
for response in batch_response.documents
MarqoIndex.parse_raw(document.fields['settings'])

for document in batch_response.documents
if not document.id.split('::')[-1].startswith(constants.MARQO_RESERVED_PREFIX)
]

def get_index(self, index_name) -> MarqoIndex:
Expand Down Expand Up @@ -241,6 +267,61 @@ def index_exists(self, index_name: str) -> bool:
except IndexNotFoundError:
return False

def get_marqo_version(self) -> str:
"""
Get the Marqo version Vespa is configured for.
"""
try:
response = self.vespa_client.get_document(self._MARQO_CONFIG_DOC_ID, self._MARQO_SETTINGS_SCHEMA_NAME)
except VespaStatusError as e:
if e.status_code == 404:
logger.debug('Marqo config document does not exist. Assuming Marqo version 2.0.x')
return '2.0'
raise e

return _MarqoConfig.parse_raw(response.document.fields['settings']).version

def _marqo_config_exists(self, app) -> bool:
# For Marqo 2.1+, recording Marqo version is the final stage of configuration, and its absence
# indicates an incomplete configuration (e.g., interrupted configuration). However, for Marqo 2.0.x,
# Marqo version is not recorded. Marqo 2.0.x can be detected by an existing Marqo settings schema,
# but no default query profile
settings_schema_exists = os.path.exists(os.path.join(app, 'schemas', f'{self._MARQO_SETTINGS_SCHEMA_NAME}.sd'))
query_profile_exists = os.path.exists(
os.path.join(app, 'search/query-profiles', 'default.xml')
)

if settings_schema_exists and not query_profile_exists:
logger.debug('Detected existing Marqo 2.0.x configuration')
return True

if settings_schema_exists and query_profile_exists:
try:
self.vespa_client.get_document(self._MARQO_CONFIG_DOC_ID, self._MARQO_SETTINGS_SCHEMA_NAME)
return True
except VespaStatusError as e:
if e.status_code == 404:
logger.debug('Marqo config document does not exist. Detected incomplete Marqo configuration')
return False
raise e

# Settings schema not found, so Marqo config does not exist
return False

def _add_marqo_config(self, app: str) -> bool:
"""
Add Marqo configuration to Vespa application package.
Args:
app: Path to Vespa application package
Returns:
True if configuration was added, False if all components already existed
"""
added_settings_schema = self._add_marqo_settings_schema(app)
added_query_profile = self._add_default_query_profile(app)

return added_settings_schema or added_query_profile

def _add_marqo_settings_schema(self, app: str) -> bool:
"""
Create the Marqo settings schema if it does not exist.
Expand All @@ -257,13 +338,35 @@ def _add_marqo_settings_schema(self, app: str) -> bool:
self._MARQO_SETTINGS_SCHEMA_NAME,
self._MARQO_SETTINGS_SCHEMA_NAME
)
os.makedirs(os.path.dirname(schema_path), exist_ok=True)
with open(schema_path, 'w') as f:
f.write(schema)
self._add_schema_to_services(app, self._MARQO_SETTINGS_SCHEMA_NAME)

return True
return False

def _add_default_query_profile(self, app: str) -> bool:
"""
Create the default query profile if it does not exist.
Args:
app: Path to Vespa application package
Returns:
True if query profile was created, False if it already existed
"""
profile_path = os.path.join(app, 'search/query-profiles', 'default.xml')
if not os.path.exists(profile_path):
logger.debug('Default query profile does not exist. Creating it')

query_profile = self._DEFAULT_QUERY_PROFILE_TEMPLATE
os.makedirs(os.path.dirname(profile_path), exist_ok=True)
with open(profile_path, 'w') as f:
f.write(query_profile)

return True
pass

def _add_schema(self, app: str, name: str, schema: str) -> None:
schema_path = os.path.join(app, 'schemas', f'{name}.sd')
if os.path.exists(schema_path):
Expand Down Expand Up @@ -329,6 +432,17 @@ def _add_schema_removal_override(self, app: str) -> None:
with open(validation_overrides_path, 'w') as f:
f.write(content)

def _save_marqo_version(self, version: str) -> None:
self.vespa_client.feed_document(
VespaDocument(
id=self._MARQO_CONFIG_DOC_ID,
fields={
'settings': _MarqoConfig(version=version).json()
}
),
schema=self._MARQO_SETTINGS_SCHEMA_NAME
)

def _save_index_settings(self, marqo_index: MarqoIndex) -> None:
"""
Create or update index settings in Vespa settings schema.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import List, Dict, Any

import marqo.core.search.search_filter as search_filter
from marqo.core.exceptions import InvalidDataTypeError, InvalidFieldNameError, VespaDocumentParsingError
from marqo.core.models import MarqoQuery
Expand Down Expand Up @@ -33,6 +31,9 @@ class StructuredVespaIndex(VespaIndex):
_VESPA_DOC_MATCH_FEATURES = 'matchfeatures'
_VESPA_DOC_FIELDS_TO_IGNORE = {'sddocname'}

_DEFAULT_MAX_LIMIT = 1000
_DEFAULT_MAX_OFFSET = 10000

def __init__(self, marqo_index: StructuredMarqoIndex):
self._marqo_index = marqo_index

Expand Down Expand Up @@ -266,7 +267,7 @@ def _to_vespa_tensor_query(self, marqo_query: MarqoTensorQuery) -> Dict[str, Any
'offset': marqo_query.offset,
'query_features': query_inputs,
'presentation.summary': summary,
'ranking': ranking
'ranking': ranking,
}
query = {k: v for k, v in query.items() if v is not None}

Expand Down
3 changes: 2 additions & 1 deletion src/marqo/core/vespa_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def get_vector_count_query(self) -> Dict[str, Any]:
pass


def for_marqo_index(marqo_index: MarqoIndex):
def for_marqo_index(marqo_index: MarqoIndex) -> VespaIndex:
"""
Get the VespaIndex implementation for the given MarqoIndex.
Args:
marqo_index: The MarqoIndex to get the implementation for
marqo_version: The version of Marqo that the index was created with
Returns:
The VespaIndex implementation for the given MarqoIndex
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/documentation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import marqo.version

version = marqo.version.__version__
version = marqo.version.get_version()
base_url = 'https://docs.marqo.ai'


Expand Down
18 changes: 18 additions & 0 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from marqo import version
from marqo.api import exceptions as api_exceptions
from marqo.api.models.health_response import HealthResponse
from marqo.api.models.rollback_request import RollbackRequest
from marqo.core import exceptions as core_exceptions
from marqo.core.index_management.index_management import IndexManagement
from marqo.logging import get_logger
Expand All @@ -25,6 +26,7 @@
from marqo.tensor_search.telemetry import RequestMetricsStore, TelemetryMiddleware
from marqo.tensor_search.throttling.redis_throttle import throttle
from marqo.tensor_search.web import api_validation, api_utils
from marqo.upgrades.upgrade import UpgradeRunner, RollbackRunner
from marqo.vespa.vespa_client import VespaClient

logger = get_logger(__name__)
Expand Down Expand Up @@ -355,6 +357,22 @@ def delete_all_documents(index_name: str, marqo_config: config.Config = Depends(
return {"documentCount": document_count}


@app.post("/upgrade")
@utils.enable_upgrade_api()
def upgrade_marqo(marqo_config: config.Config = Depends(get_config)):
"""An internal API used for testing processes. Not to be used by users."""
upgrade_runner = UpgradeRunner(marqo_config.vespa_client, marqo_config.index_management)
upgrade_runner.upgrade()


@app.post("/upgrade")
@utils.enable_upgrade_api()
def rollback_marqo(req: RollbackRequest, marqo_config: config.Config = Depends(get_config)):
"""An internal API used for testing processes. Not to be used by users."""
rollback_runner = RollbackRunner(marqo_config.vespa_client, marqo_config.index_management)
rollback_runner.rollback(from_version=req.from_version, to_version=req.to_version)


if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8882)

Expand Down
3 changes: 3 additions & 0 deletions src/marqo/tensor_search/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def default_env_vars() -> dict:
EnvVars.MARQO_MAX_INDEX_FIELDS: None,
EnvVars.MARQO_MAX_DOC_BYTES: 100000,
EnvVars.MARQO_MAX_RETRIEVABLE_DOCS: 10000,
EnvVars.MARQO_MAX_SEARCH_LIMIT: 1000,
EnvVars.MARQO_MAX_SEARCH_OFFSET: 10000,
EnvVars.MARQO_MODELS_TO_PRELOAD: ["hf/e5-base-v2", "open_clip/ViT-B-32/laion2b_s34b_b79k"],
EnvVars.MARQO_MAX_CONCURRENT_INDEX: 8,
EnvVars.MARQO_MAX_CONCURRENT_SEARCH: 8,
Expand All @@ -31,4 +33,5 @@ def default_env_vars() -> dict:
EnvVars.MARQO_MAX_NUMBER_OF_REPLICAS: 1,
EnvVars.MARQO_DEFAULT_EF_SEARCH: 2000,
EnvVars.MARQO_ENABLE_BATCH_APIS: "FALSE",
EnvVars.MARQO_ENABLE_UPGRADE_API: "FALSE",
}
3 changes: 3 additions & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class EnvVars:
MARQO_MAX_INDEX_FIELDS = "MARQO_MAX_INDEX_FIELDS"
MARQO_MAX_DOC_BYTES = "MARQO_MAX_DOC_BYTES"
MARQO_MAX_RETRIEVABLE_DOCS = "MARQO_MAX_RETRIEVABLE_DOCS"
MARQO_MAX_SEARCH_LIMIT = "MARQO_MAX_SEARCH_LIMIT"
MARQO_MAX_SEARCH_OFFSET = "MARQO_MAX_SEARCH_OFFSET"
MARQO_MODELS_TO_PRELOAD = "MARQO_MODELS_TO_PRELOAD"
MARQO_MAX_CONCURRENT_INDEX = "MARQO_MAX_CONCURRENT_INDEX"
MARQO_MAX_CONCURRENT_SEARCH = "MARQO_MAX_CONCURRENT_SEARCH"
Expand All @@ -60,6 +62,7 @@ class EnvVars:
MARQO_DEFAULT_EF_SEARCH = "MARQO_DEFAULT_EF_SEARCH"
MARQO_BEST_AVAILABLE_DEVICE = "MARQO_BEST_AVAILABLE_DEVICE"
MARQO_ENABLE_BATCH_APIS = "MARQO_ENABLE_BATCH_APIS"
MARQO_ENABLE_UPGRADE_API = "MARQO_ENABLE_UPGRADE_API"


class RequestType:
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/tensor_search/index_meta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def refresh():
# This can happen when settings schema doesn't exist
logger.warn(
'Failed to populate index cache due to 400 error from vector store. This can happen '
'if Marqo settings schema does not exist. Error: {e}'
f'if Marqo settings schema does not exist. Error: {e}'
)
else:
logger.warn(
Expand Down
1 change: 1 addition & 0 deletions src/marqo/tensor_search/models/add_docs_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Config:
documents: Union[Sequence[Union[dict, Any]], np.ndarray]
imageDownloadThreadCount: int = 20


# TODO - Make this configurable
MAX_DOCS = 128

Expand Down
Loading

0 comments on commit 24a696c

Please sign in to comment.