From 24a696c20f2a6f32313e4a4923f16544313ab7dc Mon Sep 17 00:00:00 2001 From: Farshid Zavareh Date: Thu, 25 Jan 2024 13:27:34 +1100 Subject: [PATCH] Increase max limit and max offset (#735) Increase max limit to 1000 and max offset to 10,000. Return a 400 error if these are violated. --- requirements.txt | 1 + src/marqo/api/models/rollback_request.py | 6 + .../core/index_management/index_management.py | 136 ++++++++++++++++-- .../structured_vespa_index.py | 7 +- src/marqo/core/vespa_index.py | 3 +- src/marqo/documentation.py | 2 +- src/marqo/tensor_search/api.py | 18 +++ src/marqo/tensor_search/configs.py | 3 + src/marqo/tensor_search/enums.py | 3 + src/marqo/tensor_search/index_meta_cache.py | 2 +- .../tensor_search/models/add_docs_objects.py | 1 + src/marqo/tensor_search/on_start_script.py | 10 +- src/marqo/tensor_search/tensor_search.py | 15 +- src/marqo/tensor_search/utils.py | 19 ++- src/marqo/upgrades/upgrade.py | 88 ++++++++++++ src/marqo/upgrades/v2_v0_v2_v1.py | 90 ++++++++++++ src/marqo/upgrades/v2_v1_v2_v0_rollback.py | 73 ++++++++++ src/marqo/version.py | 2 +- .../index_management/test_index_management.py | 85 ++++++++++- .../integ_tests/test_search_structured.py | 22 ++- tests/tensor_search/test_pagination.py | 88 +++++++++++- 21 files changed, 639 insertions(+), 35 deletions(-) create mode 100644 src/marqo/api/models/rollback_request.py create mode 100644 src/marqo/upgrades/upgrade.py create mode 100644 src/marqo/upgrades/v2_v0_v2_v1.py create mode 100644 src/marqo/upgrades/v2_v1_v2_v0_rollback.py diff --git a/requirements.txt b/requirements.txt index c98e46339..230b93226 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ typing-extensions==4.5.0 urllib3==1.26.0 pydantic==1.10.11 httpx==0.25.0 +semver==3.0.2 \ No newline at end of file diff --git a/src/marqo/api/models/rollback_request.py b/src/marqo/api/models/rollback_request.py new file mode 100644 index 000000000..40a438632 --- /dev/null +++ b/src/marqo/api/models/rollback_request.py @@ -0,0 +1,6 @@ +from marqo.base_model import ImmutableStrictBaseModel + + +class RollbackRequest(ImmutableStrictBaseModel): + from_version: str + to_version: str diff --git a/src/marqo/core/index_management/index_management.py b/src/marqo/core/index_management/index_management.py index cd53d8bd6..5630fdd76 100644 --- a/src/marqo/core/index_management/index_management.py +++ b/src/marqo/core/index_management/index_management.py @@ -6,6 +6,9 @@ import marqo.logging import marqo.vespa.vespa_client +from marqo import version +from marqo.base_model import ImmutableStrictBaseModel +from marqo.core import constants from marqo.core.exceptions import IndexExistsError, IndexNotFoundError from marqo.core.models import MarqoIndex from marqo.core.models.marqo_index_request import MarqoIndexRequest @@ -18,8 +21,13 @@ logger = marqo.logging.get_logger(__name__) +class _MarqoConfig(ImmutableStrictBaseModel): + version: str + + class IndexManagement: _MARQO_SETTINGS_SCHEMA_NAME = 'marqo__settings' + _MARQO_CONFIG_DOC_ID = 'marqo__config' _MARQO_SETTINGS_SCHEMA_TEMPLATE = textwrap.dedent( ''' schema %s { @@ -34,23 +42,33 @@ class IndexManagement: } ''' ) + _DEFAULT_QUERY_PROFILE_TEMPLATE = textwrap.dedent( + ''' + + 1000 + 10000 + + ''' + ) def __init__(self, vespa_client: VespaClient): self.vespa_client = vespa_client - def create_settings_schema(self) -> bool: + def bootstrap_vespa(self) -> bool: """ - Create the Marqo settings schema if it does not exist. + Add Marqo configuration to Vespa application package if an existing Marqo configuration is not detected. Returns: - True if schema was created, False if it already existed + True if Vespa was configured, False if it was already configured """ app = self.vespa_client.download_application() - settings_schema_created = self._add_marqo_settings_schema(app) + configured = self._marqo_config_exists(app) - if settings_schema_created: + if not configured: + self._add_marqo_config(app) self.vespa_client.deploy_application(app) self.vespa_client.wait_for_application_convergence() + self._save_marqo_version(version.get_version()) return True return False @@ -70,10 +88,13 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex: InvalidVespaApplicationError: If Vespa application is invalid after applying the index """ app = self.vespa_client.download_application() - settings_schema_created = self._add_marqo_settings_schema(app) + configured = self._marqo_config_exists(app) - if not settings_schema_created and self.index_exists(marqo_index_request.name): + if configured and self.index_exists(marqo_index_request.name): raise IndexExistsError(f"Index {marqo_index_request.name} already exists") + else: + logger.debug('Marqo config does not exist. Configuring Vespa as part of index creation') + self._add_marqo_config(app) vespa_schema = vespa_schema_factory(marqo_index_request) schema, marqo_index = vespa_schema.generate_schema() @@ -86,6 +107,9 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex: self.vespa_client.wait_for_application_convergence() self._save_index_settings(marqo_index) + if not configured: + self._save_marqo_version(version.get_version()) + return marqo_index def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) -> List[MarqoIndex]: @@ -105,9 +129,9 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) -> InvalidVespaApplicationError: If Vespa application is invalid after applying the indexes """ app = self.vespa_client.download_application() - settings_schema_created = self._add_marqo_settings_schema(app) + configured = self._add_marqo_config(app) - if not settings_schema_created: + if not configured: for index in marqo_index_requests: if self.index_exists(index.name): raise IndexExistsError(f"Index {index.name} already exists") @@ -199,8 +223,10 @@ def get_all_indexes(self) -> List[MarqoIndex]: raise InternalError("Unexpected continuation token received") return [ - MarqoIndex.parse_raw(response.fields['settings']) - for response in batch_response.documents + MarqoIndex.parse_raw(document.fields['settings']) + + for document in batch_response.documents + if not document.id.split('::')[-1].startswith(constants.MARQO_RESERVED_PREFIX) ] def get_index(self, index_name) -> MarqoIndex: @@ -241,6 +267,61 @@ def index_exists(self, index_name: str) -> bool: except IndexNotFoundError: return False + def get_marqo_version(self) -> str: + """ + Get the Marqo version Vespa is configured for. + """ + try: + response = self.vespa_client.get_document(self._MARQO_CONFIG_DOC_ID, self._MARQO_SETTINGS_SCHEMA_NAME) + except VespaStatusError as e: + if e.status_code == 404: + logger.debug('Marqo config document does not exist. Assuming Marqo version 2.0.x') + return '2.0' + raise e + + return _MarqoConfig.parse_raw(response.document.fields['settings']).version + + def _marqo_config_exists(self, app) -> bool: + # For Marqo 2.1+, recording Marqo version is the final stage of configuration, and its absence + # indicates an incomplete configuration (e.g., interrupted configuration). However, for Marqo 2.0.x, + # Marqo version is not recorded. Marqo 2.0.x can be detected by an existing Marqo settings schema, + # but no default query profile + settings_schema_exists = os.path.exists(os.path.join(app, 'schemas', f'{self._MARQO_SETTINGS_SCHEMA_NAME}.sd')) + query_profile_exists = os.path.exists( + os.path.join(app, 'search/query-profiles', 'default.xml') + ) + + if settings_schema_exists and not query_profile_exists: + logger.debug('Detected existing Marqo 2.0.x configuration') + return True + + if settings_schema_exists and query_profile_exists: + try: + self.vespa_client.get_document(self._MARQO_CONFIG_DOC_ID, self._MARQO_SETTINGS_SCHEMA_NAME) + return True + except VespaStatusError as e: + if e.status_code == 404: + logger.debug('Marqo config document does not exist. Detected incomplete Marqo configuration') + return False + raise e + + # Settings schema not found, so Marqo config does not exist + return False + + def _add_marqo_config(self, app: str) -> bool: + """ + Add Marqo configuration to Vespa application package. + + Args: + app: Path to Vespa application package + Returns: + True if configuration was added, False if all components already existed + """ + added_settings_schema = self._add_marqo_settings_schema(app) + added_query_profile = self._add_default_query_profile(app) + + return added_settings_schema or added_query_profile + def _add_marqo_settings_schema(self, app: str) -> bool: """ Create the Marqo settings schema if it does not exist. @@ -257,6 +338,7 @@ def _add_marqo_settings_schema(self, app: str) -> bool: self._MARQO_SETTINGS_SCHEMA_NAME, self._MARQO_SETTINGS_SCHEMA_NAME ) + os.makedirs(os.path.dirname(schema_path), exist_ok=True) with open(schema_path, 'w') as f: f.write(schema) self._add_schema_to_services(app, self._MARQO_SETTINGS_SCHEMA_NAME) @@ -264,6 +346,27 @@ def _add_marqo_settings_schema(self, app: str) -> bool: return True return False + def _add_default_query_profile(self, app: str) -> bool: + """ + Create the default query profile if it does not exist. + Args: + app: Path to Vespa application package + + Returns: + True if query profile was created, False if it already existed + """ + profile_path = os.path.join(app, 'search/query-profiles', 'default.xml') + if not os.path.exists(profile_path): + logger.debug('Default query profile does not exist. Creating it') + + query_profile = self._DEFAULT_QUERY_PROFILE_TEMPLATE + os.makedirs(os.path.dirname(profile_path), exist_ok=True) + with open(profile_path, 'w') as f: + f.write(query_profile) + + return True + pass + def _add_schema(self, app: str, name: str, schema: str) -> None: schema_path = os.path.join(app, 'schemas', f'{name}.sd') if os.path.exists(schema_path): @@ -329,6 +432,17 @@ def _add_schema_removal_override(self, app: str) -> None: with open(validation_overrides_path, 'w') as f: f.write(content) + def _save_marqo_version(self, version: str) -> None: + self.vespa_client.feed_document( + VespaDocument( + id=self._MARQO_CONFIG_DOC_ID, + fields={ + 'settings': _MarqoConfig(version=version).json() + } + ), + schema=self._MARQO_SETTINGS_SCHEMA_NAME + ) + def _save_index_settings(self, marqo_index: MarqoIndex) -> None: """ Create or update index settings in Vespa settings schema. diff --git a/src/marqo/core/structured_vespa_index/structured_vespa_index.py b/src/marqo/core/structured_vespa_index/structured_vespa_index.py index d58665b8a..78566467a 100644 --- a/src/marqo/core/structured_vespa_index/structured_vespa_index.py +++ b/src/marqo/core/structured_vespa_index/structured_vespa_index.py @@ -1,5 +1,3 @@ -from typing import List, Dict, Any - import marqo.core.search.search_filter as search_filter from marqo.core.exceptions import InvalidDataTypeError, InvalidFieldNameError, VespaDocumentParsingError from marqo.core.models import MarqoQuery @@ -33,6 +31,9 @@ class StructuredVespaIndex(VespaIndex): _VESPA_DOC_MATCH_FEATURES = 'matchfeatures' _VESPA_DOC_FIELDS_TO_IGNORE = {'sddocname'} + _DEFAULT_MAX_LIMIT = 1000 + _DEFAULT_MAX_OFFSET = 10000 + def __init__(self, marqo_index: StructuredMarqoIndex): self._marqo_index = marqo_index @@ -266,7 +267,7 @@ def _to_vespa_tensor_query(self, marqo_query: MarqoTensorQuery) -> Dict[str, Any 'offset': marqo_query.offset, 'query_features': query_inputs, 'presentation.summary': summary, - 'ranking': ranking + 'ranking': ranking, } query = {k: v for k, v in query.items() if v is not None} diff --git a/src/marqo/core/vespa_index.py b/src/marqo/core/vespa_index.py index 1f1a46d82..bcff104ae 100644 --- a/src/marqo/core/vespa_index.py +++ b/src/marqo/core/vespa_index.py @@ -71,12 +71,13 @@ def get_vector_count_query(self) -> Dict[str, Any]: pass -def for_marqo_index(marqo_index: MarqoIndex): +def for_marqo_index(marqo_index: MarqoIndex) -> VespaIndex: """ Get the VespaIndex implementation for the given MarqoIndex. Args: marqo_index: The MarqoIndex to get the implementation for + marqo_version: The version of Marqo that the index was created with Returns: The VespaIndex implementation for the given MarqoIndex diff --git a/src/marqo/documentation.py b/src/marqo/documentation.py index 40211a289..2a0ee5dca 100644 --- a/src/marqo/documentation.py +++ b/src/marqo/documentation.py @@ -1,6 +1,6 @@ import marqo.version -version = marqo.version.__version__ +version = marqo.version.get_version() base_url = 'https://docs.marqo.ai' diff --git a/src/marqo/tensor_search/api.py b/src/marqo/tensor_search/api.py index d1269e903..dd7af9e54 100644 --- a/src/marqo/tensor_search/api.py +++ b/src/marqo/tensor_search/api.py @@ -13,6 +13,7 @@ from marqo import version from marqo.api import exceptions as api_exceptions from marqo.api.models.health_response import HealthResponse +from marqo.api.models.rollback_request import RollbackRequest from marqo.core import exceptions as core_exceptions from marqo.core.index_management.index_management import IndexManagement from marqo.logging import get_logger @@ -25,6 +26,7 @@ from marqo.tensor_search.telemetry import RequestMetricsStore, TelemetryMiddleware from marqo.tensor_search.throttling.redis_throttle import throttle from marqo.tensor_search.web import api_validation, api_utils +from marqo.upgrades.upgrade import UpgradeRunner, RollbackRunner from marqo.vespa.vespa_client import VespaClient logger = get_logger(__name__) @@ -355,6 +357,22 @@ def delete_all_documents(index_name: str, marqo_config: config.Config = Depends( return {"documentCount": document_count} +@app.post("/upgrade") +@utils.enable_upgrade_api() +def upgrade_marqo(marqo_config: config.Config = Depends(get_config)): + """An internal API used for testing processes. Not to be used by users.""" + upgrade_runner = UpgradeRunner(marqo_config.vespa_client, marqo_config.index_management) + upgrade_runner.upgrade() + + +@app.post("/upgrade") +@utils.enable_upgrade_api() +def rollback_marqo(req: RollbackRequest, marqo_config: config.Config = Depends(get_config)): + """An internal API used for testing processes. Not to be used by users.""" + rollback_runner = RollbackRunner(marqo_config.vespa_client, marqo_config.index_management) + rollback_runner.rollback(from_version=req.from_version, to_version=req.to_version) + + if __name__ == "__main__": uvicorn.run(app, host="localhost", port=8882) diff --git a/src/marqo/tensor_search/configs.py b/src/marqo/tensor_search/configs.py index 3fab06382..35d93cae4 100644 --- a/src/marqo/tensor_search/configs.py +++ b/src/marqo/tensor_search/configs.py @@ -15,6 +15,8 @@ def default_env_vars() -> dict: EnvVars.MARQO_MAX_INDEX_FIELDS: None, EnvVars.MARQO_MAX_DOC_BYTES: 100000, EnvVars.MARQO_MAX_RETRIEVABLE_DOCS: 10000, + EnvVars.MARQO_MAX_SEARCH_LIMIT: 1000, + EnvVars.MARQO_MAX_SEARCH_OFFSET: 10000, EnvVars.MARQO_MODELS_TO_PRELOAD: ["hf/e5-base-v2", "open_clip/ViT-B-32/laion2b_s34b_b79k"], EnvVars.MARQO_MAX_CONCURRENT_INDEX: 8, EnvVars.MARQO_MAX_CONCURRENT_SEARCH: 8, @@ -31,4 +33,5 @@ def default_env_vars() -> dict: EnvVars.MARQO_MAX_NUMBER_OF_REPLICAS: 1, EnvVars.MARQO_DEFAULT_EF_SEARCH: 2000, EnvVars.MARQO_ENABLE_BATCH_APIS: "FALSE", + EnvVars.MARQO_ENABLE_UPGRADE_API: "FALSE", } diff --git a/src/marqo/tensor_search/enums.py b/src/marqo/tensor_search/enums.py index 6f6fff94b..dda95f799 100644 --- a/src/marqo/tensor_search/enums.py +++ b/src/marqo/tensor_search/enums.py @@ -43,6 +43,8 @@ class EnvVars: MARQO_MAX_INDEX_FIELDS = "MARQO_MAX_INDEX_FIELDS" MARQO_MAX_DOC_BYTES = "MARQO_MAX_DOC_BYTES" MARQO_MAX_RETRIEVABLE_DOCS = "MARQO_MAX_RETRIEVABLE_DOCS" + MARQO_MAX_SEARCH_LIMIT = "MARQO_MAX_SEARCH_LIMIT" + MARQO_MAX_SEARCH_OFFSET = "MARQO_MAX_SEARCH_OFFSET" MARQO_MODELS_TO_PRELOAD = "MARQO_MODELS_TO_PRELOAD" MARQO_MAX_CONCURRENT_INDEX = "MARQO_MAX_CONCURRENT_INDEX" MARQO_MAX_CONCURRENT_SEARCH = "MARQO_MAX_CONCURRENT_SEARCH" @@ -60,6 +62,7 @@ class EnvVars: MARQO_DEFAULT_EF_SEARCH = "MARQO_DEFAULT_EF_SEARCH" MARQO_BEST_AVAILABLE_DEVICE = "MARQO_BEST_AVAILABLE_DEVICE" MARQO_ENABLE_BATCH_APIS = "MARQO_ENABLE_BATCH_APIS" + MARQO_ENABLE_UPGRADE_API = "MARQO_ENABLE_UPGRADE_API" class RequestType: diff --git a/src/marqo/tensor_search/index_meta_cache.py b/src/marqo/tensor_search/index_meta_cache.py index 47a0e1316..9600103b8 100644 --- a/src/marqo/tensor_search/index_meta_cache.py +++ b/src/marqo/tensor_search/index_meta_cache.py @@ -108,7 +108,7 @@ def refresh(): # This can happen when settings schema doesn't exist logger.warn( 'Failed to populate index cache due to 400 error from vector store. This can happen ' - 'if Marqo settings schema does not exist. Error: {e}' + f'if Marqo settings schema does not exist. Error: {e}' ) else: logger.warn( diff --git a/src/marqo/tensor_search/models/add_docs_objects.py b/src/marqo/tensor_search/models/add_docs_objects.py index a6fa4287f..00780dde5 100644 --- a/src/marqo/tensor_search/models/add_docs_objects.py +++ b/src/marqo/tensor_search/models/add_docs_objects.py @@ -30,6 +30,7 @@ class Config: documents: Union[Sequence[Union[dict, Any]], np.ndarray] imageDownloadThreadCount: int = 20 + # TODO - Make this configurable MAX_DOCS = 128 diff --git a/src/marqo/tensor_search/on_start_script.py b/src/marqo/tensor_search/on_start_script.py index 437a593d3..b7d49e456 100644 --- a/src/marqo/tensor_search/on_start_script.py +++ b/src/marqo/tensor_search/on_start_script.py @@ -4,7 +4,7 @@ import torch -from marqo import config, documentation +from marqo import config, documentation, version from marqo.api import exceptions from marqo.connections import redis_driver from marqo.s2_inference.s2_inference import vectorise @@ -27,6 +27,7 @@ def on_start(config: config.Config): ModelsForCacheing(), InitializeRedis("localhost", 6379), DownloadFinishText(), + PrintVersion(), MarqoWelcome(), MarqoPhrase(), ) @@ -44,7 +45,7 @@ def __init__(self, config: config.Config): def run(self): try: logger.debug('Creating Marqo settings schema') - created = self.config.index_management.create_settings_schema() + created = self.config.index_management.bootstrap_vespa() if created: logger.debug('Marqo settings schema created') else: @@ -236,6 +237,11 @@ def run(self): print('\n', flush=True) +class PrintVersion: + def run(self): + print(f"Version: {version.__version__}") + + class MarqoPhrase: def run(self): diff --git a/src/marqo/tensor_search/tensor_search.py b/src/marqo/tensor_search/tensor_search.py index 6aff0bee9..88a83eb63 100644 --- a/src/marqo/tensor_search/tensor_search.py +++ b/src/marqo/tensor_search/tensor_search.py @@ -1187,9 +1187,14 @@ def search(config: Config, index_name: str, text: Union[str, dict], # validate query validation.validate_query(q=text, search_method=search_method) - # Validate result_count + offset <= int(max_docs_limit) + # Validate max limits max_docs_limit = utils.read_env_vars_and_defaults(EnvVars.MARQO_MAX_RETRIEVABLE_DOCS) + max_search_limit = utils.read_env_vars_and_defaults(EnvVars.MARQO_MAX_SEARCH_LIMIT) + max_search_offset = utils.read_env_vars_and_defaults(EnvVars.MARQO_MAX_SEARCH_OFFSET) + check_upper = True if max_docs_limit is None else result_count + offset <= int(max_docs_limit) + check_limit = True if max_search_limit is None else result_count <= int(max_search_limit) + check_offset = True if max_search_offset is None else offset <= int(max_search_offset) if not check_upper: upper_bound_explanation = ("The search result limit + offset must be less than or equal to the " f"MARQO_MAX_RETRIEVABLE_DOCS limit of [{max_docs_limit}]. ") @@ -1197,6 +1202,14 @@ def search(config: Config, index_name: str, text: Union[str, dict], raise api_exceptions.IllegalRequestedDocCount( f"{upper_bound_explanation} Marqo received search result limit of `{result_count}` " f"and offset of `{offset}`.") + if not check_limit: + raise api_exceptions.IllegalRequestedDocCount( + f"The search result limit must be less than or equal to the MARQO_MAX_SEARCH_LIMIT limit of " + f"[{max_search_limit}]. Marqo received search result limit of `{result_count}`.") + if not check_offset: + raise api_exceptions.IllegalRequestedDocCount( + f"The search result offset must be less than or equal to the MARQO_MAX_SEARCH_OFFSET limit of " + f"[{max_search_offset}]. Marqo received search result offset of `{offset}`.") t0 = timer() validation.validate_context(context=context, query=text, search_method=search_method) diff --git a/src/marqo/tensor_search/utils.py b/src/marqo/tensor_search/utils.py index f462ea281..0a9d70468 100644 --- a/src/marqo/tensor_search/utils.py +++ b/src/marqo/tensor_search/utils.py @@ -374,7 +374,24 @@ def decorator_function(func): @functools.wraps(func) def wrapper(*args, **kwargs): if read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_BATCH_APIS).lower() != 'true': - raise HTTPException(status_code=403, detail="This API endpoint is disabled. Please set MARQO_ENABLE_BATCH_API to true to enable it.") + raise HTTPException(status_code=403, + detail="This API endpoint is disabled. Please set MARQO_ENABLE_BATCH_API to true to enable it.") return func(*args, **kwargs) + + return wrapper + + return decorator_function + + +def enable_upgrade_api(): + def decorator_function(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_UPGRADE_API).lower() != 'true': + raise HTTPException(status_code=403, + detail="This API endpoint is disabled. Please set MARQO_ENABLE_UPGRADE_API to true to enable it.") + return func(*args, **kwargs) + return wrapper + return decorator_function diff --git a/src/marqo/upgrades/upgrade.py b/src/marqo/upgrades/upgrade.py new file mode 100644 index 000000000..81d07f708 --- /dev/null +++ b/src/marqo/upgrades/upgrade.py @@ -0,0 +1,88 @@ +from abc import ABC, abstractmethod +from typing import Optional + +import semver + +from marqo import version +from marqo.api import exceptions as api_exceptions +from marqo.core.index_management.index_management import IndexManagement +from marqo.vespa.vespa_client import VespaClient + + +class Upgrade(ABC): + """ + Base class for a Marqo upgrade. Instances of this class represent a Marqo upgrade, and are responsible for + performing the upgrade from one version to another. + + Upgrades must be idempotent, i.e. running an upgrade on a Marqo with the target version should have no effect. + + Source version must be forwards-compatible with the target state, so that a rolling upgrade can be performed. + """ + + @abstractmethod + def run(self): + pass + + +class Rollback(Upgrade, ABC): + pass + + +class UpgradeRunner: + def __init__(self, vespa_client: VespaClient, index_management: IndexManagement): + self.vespa_client = vespa_client + self.index_management = index_management + + def upgrade(self): + config_version_full = self.index_management.get_marqo_version() + parsed_version = semver.VersionInfo.parse(config_version_full, optional_minor_and_patch=True) + config_version = f'{parsed_version.major}.{parsed_version.minor}' + + upgrade = self._for_version(config_version) + + if upgrade is None: + raise api_exceptions.BadRequestError( + f'Cannot upgrade from Marqo version {config_version} to {version.get_version()}' + ) + + upgrade.run() + + def _for_version(self, from_version) -> Optional[Upgrade]: + to_version_full = version.get_version() + parsed_version = semver.VersionInfo.parse(to_version_full) + to_version = f'{parsed_version.major}.{parsed_version.minor}' + + if from_version == "2.0" and to_version == '2.1': + from marqo.upgrades.v2_v0_v2_v1 import V2V0V2V1 + return V2V0V2V1(self.vespa_client, self.index_management) + + return None + + +class RollbackRunner: + def __init__(self, vespa_client: VespaClient, index_management: IndexManagement): + self.vespa_client = vespa_client + self.index_management = index_management + + def rollback(self, from_version: str, to_version: str): + parsed_from_version = semver.VersionInfo.parse(from_version, optional_minor_and_patch=True) + parsed_to_version = semver.VersionInfo.parse(to_version, optional_minor_and_patch=True) + + rollback = self._for_versions( + f'{parsed_from_version.major}.{parsed_from_version.minor}', + f'{parsed_to_version.major}.{parsed_to_version.minor}' + ) + + if rollback is None: + raise api_exceptions.BadRequestError( + f'Cannot roll back from Marqo version {from_version} to {to_version}' + ) + + rollback.run() + + def _for_versions(self, from_version, to_version) -> Optional[Rollback]: + if from_version == "2.1" and to_version == '2.0': + from marqo.upgrades.v2_v1_v2_v0_rollback import V2V1V2V0Rollback + return V2V1V2V0Rollback(self.vespa_client, self.index_management) + + return None diff --git a/src/marqo/upgrades/v2_v0_v2_v1.py b/src/marqo/upgrades/v2_v0_v2_v1.py new file mode 100644 index 000000000..fb63eb9c3 --- /dev/null +++ b/src/marqo/upgrades/v2_v0_v2_v1.py @@ -0,0 +1,90 @@ +import os + +from marqo import logging, version +from marqo.api import exceptions as api_exceptions +from marqo.core.index_management.index_management import IndexManagement, _MarqoConfig +from marqo.upgrades.upgrade import Upgrade +from marqo.vespa.models import VespaDocument +from marqo.vespa.vespa_client import VespaClient + +logger = logging.get_logger(__name__) + + +class V2V0V2V1(Upgrade): + """ + Upgrade from Marqo v2.0 to v2.1. + + The migration comprises two steps: + 1. Create the default query profile + 2. Add Marqo version to Marqo settings schema + """ + + def __init__(self, vespa_client: VespaClient, index_management: IndexManagement): + self.vespa_client = vespa_client + self.index_management = index_management + self.settings_schema = IndexManagement._MARQO_SETTINGS_SCHEMA_NAME + self.config_id = IndexManagement._MARQO_CONFIG_DOC_ID + self.default_query_profile = IndexManagement._DEFAULT_QUERY_PROFILE_TEMPLATE + + def run(self): + logger.info("Running upgrade v20v21") + + try: + logger.info("Creating query profile") + self._create_query_profile() + + logger.info("Adding Marqo config") + self._add_marqo_version() + except Exception as e: + raise Exception('Upgrade v20v21 failed. Partial changes may have been applied. This process is ' + 'idempotent. A successful run is required to bring Marqo into a consistent state') from e + + logger.info("Verifying upgrade") + self._verify_query_profile() + self._verify_marqo_version() + + logger.info("Upgrade v20v21 finished") + + def _create_query_profile(self): + app = self.vespa_client.download_application() + + settings_schema_exists = os.path.exists(os.path.join(app, 'schemas', f'{self.settings_schema}.sd')) + if not settings_schema_exists: + raise api_exceptions.BadRequestError(f"Settings schema {self.settings_schema} does not exist. " + f"Has Marqo been bootstraped?") + + profile_path = os.path.join(app, 'search/query-profiles', 'default.xml') + os.makedirs(os.path.dirname(profile_path), exist_ok=True) + with open(profile_path, 'w') as f: + f.write(self.default_query_profile) + + self.vespa_client.deploy_application(app) + self.vespa_client.wait_for_application_convergence() + + def _add_marqo_version(self): + self.vespa_client.feed_document( + VespaDocument( + id=self.config_id, + fields={ + 'settings': _MarqoConfig(version=version.get_version()).json() + } + ), + schema=self.settings_schema + ) + + def _verify_query_profile(self): + app = self.vespa_client.download_application() + profile_path_exists = os.path.exists(os.path.join(app, 'search/query-profiles', 'default.xml')) + if not profile_path_exists: + raise api_exceptions.InternalError( + f"Query profile does not exist. " + f"Upgrade has not been applied correctly" + ) + + def _verify_marqo_version(self): + configured_version = self.index_management.get_marqo_version() + if configured_version != version.get_version(): + raise api_exceptions.InternalError( + f"Marqo version in config is {configured_version}, but Marqo version is {version.get_version()}. " + f"Upgrade has not been applied correctly" + ) diff --git a/src/marqo/upgrades/v2_v1_v2_v0_rollback.py b/src/marqo/upgrades/v2_v1_v2_v0_rollback.py new file mode 100644 index 000000000..4c950efb1 --- /dev/null +++ b/src/marqo/upgrades/v2_v1_v2_v0_rollback.py @@ -0,0 +1,73 @@ +import os +import shutil + +from marqo import logging +from marqo.api import exceptions as api_exceptions +from marqo.core.index_management.index_management import IndexManagement +from marqo.upgrades.upgrade import Rollback + +logger = logging.get_logger(__name__) + + +class V2V1V2V0Rollback(Rollback): + """ + Roll back from Marqo v2.1 to v2.0. + + The rollback comprises two steps: + 1. Delete the default query profile + 2. Remove Marqo version from Marqo settings schema + """ + + def __init__(self, vespa_client, index_management: IndexManagement): + self.vespa_client = vespa_client + self.index_management = index_management + self.settings_schema = IndexManagement._MARQO_SETTINGS_SCHEMA_NAME + self.config_id = IndexManagement._MARQO_CONFIG_DOC_ID + + def run(self): + logger.info("Running rollback v21v20") + + try: + logger.info("Deleting query profile") + self._delete_query_profile() + + logger.info("Removing Marqo config") + self._remove_marqo_version() + except Exception as e: + raise Exception('Rollback v21v20 failed') from e + + logger.info("Verifying rollback") + self._verify_query_profile() + self._verify_marqo_version() + + logger.info("Rollback v21v20 finished") + + def _delete_query_profile(self): + app = self.vespa_client.download_application() + + shutil.rmtree(os.path.join(app, 'search'), ignore_errors=True) + + self.vespa_client.deploy_application(app) + self.vespa_client.wait_for_application_convergence() + + def _remove_marqo_version(self): + self.vespa_client.delete_document( + id=self.config_id, + schema=self.settings_schema + ) + + def _verify_query_profile(self): + app = self.vespa_client.download_application() + profile_path_exists = os.path.exists(os.path.join(app, 'search/query-profiles', 'default.xml')) + if profile_path_exists: + raise api_exceptions.InternalError( + "Query profile exists. Rollback has not been applied correctly" + ) + + def _verify_marqo_version(self): + configured_version = self.index_management.get_marqo_version() + if configured_version != '2.0': + raise api_exceptions.InternalError( + f"Marqo version in config is {configured_version}, expected 2.0. " + f"Rollback has not been applied correctly" + ) diff --git a/src/marqo/version.py b/src/marqo/version.py index 8d9e43396..a493bf2af 100644 --- a/src/marqo/version.py +++ b/src/marqo/version.py @@ -1,4 +1,4 @@ -__version__ = "2.0.1" +__version__ = "2.1.0" def get_version() -> str: diff --git a/tests/core/index_management/test_index_management.py b/tests/core/index_management/test_index_management.py index e158771e6..373d16d3f 100644 --- a/tests/core/index_management/test_index_management.py +++ b/tests/core/index_management/test_index_management.py @@ -1,6 +1,9 @@ +import os +import shutil import uuid from unittest import mock +from marqo import version from marqo.core.exceptions import IndexExistsError from marqo.core.index_management.index_management import IndexManagement from marqo.core.models.marqo_index import * @@ -16,10 +19,10 @@ class TestIndexManagement(MarqoTestCase): def setUp(self): self.index_management = IndexManagement(self.vespa_client) - def test_create_settings_schema_doesNotExist_successful(self): + def test_bootstrap_vespa_doesNotExist_successful(self): settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): - self.assertTrue(self.index_management.create_settings_schema()) + self.assertTrue(self.index_management.bootstrap_vespa()) # Verify settings schema exists try: @@ -36,10 +39,17 @@ def test_create_settings_schema_doesNotExist_successful(self): else: raise e - def test_create_settings_schema_exists_skips(self): + # Verify default query profile exists + app = self.vespa_client.download_application() + query_profile_exists = os.path.exists( + os.path.join(app, 'search/query-profiles', 'default.xml') + ) + self.assertTrue(query_profile_exists, 'Default query profile does not exist') + + def test_bootstrap_vespa_exists_skips(self): settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): - self.assertTrue(self.index_management.create_settings_schema()) + self.assertTrue(self.index_management.bootstrap_vespa()) import httpx @@ -51,10 +61,49 @@ def modified_post(*args, **kwargs): return httpx.post(*args, **kwargs) with mock.patch.object(httpx.Client, 'post', side_effect=modified_post) as mock_post: - self.assertFalse(self.index_management.create_settings_schema()) + self.assertFalse(self.index_management.bootstrap_vespa()) # Sanity check that we're patching the right method self.assertTrue(mock_post.called) + def test_boostrap_vespa_v2Exists_skips(self): + """ + bootstrap_vespa skips when Vespa has been configured with Marqo 2.0.x + """ + # Marqo 2.0.x configuration is detected by presence of settings schema, but absence default query profile + settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') + with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): + app = self.vespa_client.download_application() + + # Clean any query profiles that may exist + shutil.rmtree(os.path.join(app, 'search'), ignore_errors=True) + + self.index_management._add_marqo_settings_schema(app) + self.vespa_client.deploy_application(app) + self.vespa_client.wait_for_application_convergence() + + self.assertFalse( + self.index_management.bootstrap_vespa(), + 'bootstrap_vespa should skip when Marqo 2.0.x configuration is detected' + ) + + def test_bootstrap_vespa_partialConfig_successful(self): + """ + bootstrap_vespa succeeds when Vespa has been partially configured and recovers to a consistent state + """ + settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') + with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): + self.assertTrue(self.index_management.bootstrap_vespa()) + + # Delete marqo config to simulate partial configuration for 2.1+ + self.vespa_client.delete_document( + schema=settings_schema_name, + id=IndexManagement._MARQO_CONFIG_DOC_ID + ) + + self.assertTrue(self.index_management.bootstrap_vespa(), 'bootstrap_vespa should not skip') + # Verify config has been saved + self.assertEqual(version.get_version(), self.index_management.get_marqo_version()) + def test_create_index_settingsSchemaDoesNotExist_successful(self): """ A new index is created successfully when the settings schema does not exist @@ -167,3 +216,29 @@ def test_create_index_indexExists_fails(self): with self.assertRaises(IndexExistsError): self.index_management.create_index(marqo_index_request) + + def test_get_marqo_version_successful(self): + """ + get_marqo_version returns current version + """ + settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') + with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): + self.index_management.bootstrap_vespa() + + self.assertEqual(version.get_version(), self.index_management.get_marqo_version()) + + def test_get_marqo_version_v20_successful(self): + """ + get_marqo_version returns 2.0 when Vespa has been configured with Marqo 2.0.x + """ + settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '') + with mock.patch.object(IndexManagement, '_MARQO_SETTINGS_SCHEMA_NAME', settings_schema_name): + self.index_management.bootstrap_vespa() + + # Delete Marqo config to simulate 2.0 + self.vespa_client.delete_document( + schema=settings_schema_name, + id=IndexManagement._MARQO_CONFIG_DOC_ID + ) + + self.assertEqual(self.index_management.get_marqo_version(), '2.0') diff --git a/tests/tensor_search/integ_tests/test_search_structured.py b/tests/tensor_search/integ_tests/test_search_structured.py index 3d994bc0d..9849b86f6 100644 --- a/tests/tensor_search/integ_tests/test_search_structured.py +++ b/tests/tensor_search/integ_tests/test_search_structured.py @@ -352,13 +352,30 @@ def test_result_count_validation(self): ) with self.assertRaises(errors.IllegalRequestedDocCount): - # too small + # violates total results search_res = tensor_search.search( config=self.config, index_name=self.default_text_index, text="Exact match hehehe", result_count=1000000 ) raise AssertionError + with self.assertRaises(errors.IllegalRequestedDocCount): + # violates max limit of 1000 + search_res = tensor_search.search( + config=self.config, index_name=self.default_text_index, text="Exact match hehehe", + result_count=1001 + ) + raise AssertionError + + with self.assertRaises(errors.IllegalRequestedDocCount): + # violates max offset of 10,000 + search_res = tensor_search.search( + config=self.config, index_name=self.default_text_index, text="Exact match hehehe", + result_count=1, + offset=10001 + ) + raise AssertionError + with self.assertRaises(errors.IllegalRequestedDocCount): # should not work with 0 search_res = tensor_search.search( @@ -1230,6 +1247,5 @@ def test_tensor_search_highlights_format(self): for hit in tensor_search_result['hits']: self.assertIn("_highlights", hit) self.assertTrue(isinstance(hit["_highlights"], list)) - self.assertEqual(1, len(hit["_highlights"])) # We only have 1 highlight now + self.assertEqual(1, len(hit["_highlights"])) # We only have 1 highlight now self.assertTrue(isinstance(hit["_highlights"][0], dict)) - diff --git a/tests/tensor_search/test_pagination.py b/tests/tensor_search/test_pagination.py index 893624699..918973fc8 100644 --- a/tests/tensor_search/test_pagination.py +++ b/tests/tensor_search/test_pagination.py @@ -11,7 +11,7 @@ ) from marqo.core.models.marqo_index import FieldType, FieldFeature, IndexType from marqo.core.models.marqo_index_request import FieldRequest -from marqo.tensor_search import tensor_search +from marqo.tensor_search import tensor_search, utils from marqo.tensor_search.enums import SearchMethod, EnvVars from marqo.tensor_search.models.add_docs_objects import AddDocsParams from tests.marqo_test import MarqoTestCase @@ -56,21 +56,22 @@ def tearDown(self): self.device_patcher.stop() def test_pagination_single_field(self): - num_docs = 400 # TODO - Increase this to 1000 once max doc limit issue has been addressed + num_docs = 400 batch_size = 100 for index in [self.index_structured, self.index_unstructured]: - for _ in range(0, num_docs, 100): - tensor_search.add_documents( + for _ in range(0, num_docs, batch_size): + r = tensor_search.add_documents( config=self.config, add_docs_params=AddDocsParams(index_name=index.name, - docs=[{"title": 'my title'} for i in + docs=[{"title": 'my title', 'desc': 'my title'} for i in range(batch_size)], device="cpu", tensor_fields=['title'] if index.type == IndexType.Unstructured else None ) ) + self.assertFalse(r['errors'], "Errors in add documents call") for search_method in (SearchMethod.LEXICAL, SearchMethod.TENSOR): full_search_results = tensor_search.search( @@ -99,6 +100,83 @@ def test_pagination_single_field(self): # Compare paginated to full results (length only for now) self.assertEqual(len(full_search_results["hits"]), len(paginated_search_results["hits"])) + def test_pagination_high_limit_offset(self): + """ + Test pagination with max device limit and offset (1000 and 10,000) + """ + num_docs = 12000 + batch_size = 100 + + max_limit = 1000 + max_offset = 10000 + + self.assertTrue(num_docs >= max_limit, "Test requires num_docs >= max_limit") + + original_read_var = utils.read_env_vars_and_defaults + + def read_var(var): + if var == EnvVars.MARQO_MAX_RETRIEVABLE_DOCS: + return num_docs + return original_read_var(var) + + # Patch EnvVars.MARQO_MAX_RETRIEVABLE_DOCS so we can test max offset + with mock.patch.object(utils, 'read_env_vars_and_defaults', new=read_var): + for index in [self.index_structured, self.index_unstructured]: + for _ in range(0, num_docs, batch_size): + r = tensor_search.add_documents( + config=self.config, + add_docs_params=AddDocsParams(index_name=index.name, + docs=[{"title": 'my title', 'desc': 'my title'} for i in + range(batch_size)], + device="cpu", + tensor_fields=['title'] if index.type == IndexType.Unstructured + else None, + ) + ) + self.assertFalse(r['errors'], "Errors in add documents call") + + for search_method in [SearchMethod.TENSOR, SearchMethod.LEXICAL]: + offsets_covered = set() + for page_size in [max_limit]: + with self.subTest( + f'Index: {index.type}, Search method: {search_method}, Page size: {page_size}'): + paginated_search_results = {"hits": []} + + pages = math.ceil((max_offset + max_limit) / page_size) + for page_num in range(pages): + lim = page_size + off = page_num * page_size + offsets_covered.add(off) + page_res = tensor_search.search( + search_method=search_method, + config=self.config, + index_name=index.name, + text='my title', + result_count=lim, offset=off, + # Approximate search retrieved ~8400 docs, under investigation + approximate=False if search_method == SearchMethod.TENSOR else None + ) + + paginated_search_results["hits"].extend(page_res["hits"]) + + # Compare paginated to full results (length only for now) + expected_count = (pages - 1) * page_size + page_size + self.assertEqual(expected_count, len(paginated_search_results["hits"])) + + self.assertTrue(max_offset in offsets_covered, "Max offset not covered. Check test parameters") + + def test_pagination_limt_exceeded_error(self): + """ + Verify InvalidArgs error is raised when limit is exceeded + """ + self.fail() + + def test_pagination_offset_exceeded_error(self): + """ + Verify InvalidArgs error is raised when offset is exceeded + """ + self.fail() + @unittest.skip def test_pagination_multi_field(self): # Execute pagination with 3 fields