Skip to content

Commit

Permalink
[Releases 2.13] Output hybrid error message in marqo logs (#1108)
Browse files Browse the repository at this point in the history
  • Loading branch information
vicilliar authored Feb 4, 2025
1 parent d81008c commit a8ed8f8
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/marqo/vespa/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .delete_document_response import DeleteDocumentResponse
from .feed_response import FeedBatchDocumentResponse, FeedBatchResponse, FeedDocumentResponse
from .query_result import QueryResult
from .query_result import QueryResult, Error
from .update_response import UpdateDocumentResponse, UpdateDocumentsBatchResponse
from .vespa_document import VespaDocument
37 changes: 24 additions & 13 deletions src/marqo/vespa/vespa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from marqo.core.models import MarqoIndex
from marqo.vespa.exceptions import (VespaStatusError, VespaError, InvalidVespaApplicationError,
VespaTimeoutError, VespaNotConvergedError, VespaActivationConflictError)
from marqo.vespa.models import VespaDocument, QueryResult, FeedBatchDocumentResponse, FeedBatchResponse, \
from marqo.vespa.models import VespaDocument, QueryResult, Error, FeedBatchDocumentResponse, FeedBatchResponse, \
FeedDocumentResponse, UpdateDocumentsBatchResponse, UpdateDocumentResponse, FeedBatchDocumentResponse
from marqo.vespa.models.application_metrics import ApplicationMetrics
from marqo.vespa.models.delete_document_response import DeleteDocumentResponse, DeleteBatchDocumentResponse, \
Expand Down Expand Up @@ -1009,9 +1009,26 @@ async def _delete_document_async(self,

self._raise_for_status(resp)

@classmethod
def _is_timeout_error(cls, error: Error, resp: httpx.Response) -> bool:
"""
Check if the query error is a timeout error.
"""

if error.code == 8 and error.message == "Search request soft doomed during query setup and initialization.":
logger.warn('Detected soft doomed query')
return True
if error.code == 12 and resp.status_code == 504:
return True

return False

def _query_raise_for_status(self, resp: httpx.Response) -> None:
"""
Query API specific raise for status method.
If multiple errors:
If all errors are timeout, raise VespaTimeoutError (504).
If even one error is not timeout, raise VespaStatusError (500).
"""
# See error codes here https://github.com/vespa-engine/vespa/blob/master/container-core/src/main/java/com/yahoo/container/protect/Error.java
try:
Expand All @@ -1023,18 +1040,12 @@ def _query_raise_for_status(self, resp: httpx.Response) -> None:
result.root.errors is not None
and len(result.root.errors) > 0
):
if resp.status_code == 504 and result.root.errors[0].code == 12:
raise VespaTimeoutError(message=resp.text, cause=e) from e
elif (
result.root.errors[0].code == 8
and result.root.errors[
0].message == "Search request soft doomed during query setup and initialization."
):
# The soft doom error is a bug in certain Vespa versions. Newer versions should always return
# a code 12 for timeouts
logger.warn('Detected soft doomed query')
raise VespaTimeoutError(message=resp.text, cause=e) from e

for error in result.root.errors:
if not self._is_timeout_error(error, resp):
# Raise 500 if any error is not timeout
raise VespaStatusError(message=resp.text, cause=e) from e
# Raise 504 if all errors are timeout
raise VespaTimeoutError(message=resp.text, cause=e) from e
raise e
except VespaStatusError:
raise
Expand Down
197 changes: 196 additions & 1 deletion tests/tensor_search/integ_tests/test_hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np

import marqo.core.exceptions as core_exceptions
import marqo.vespa.exceptions as vespa_exceptions
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.models.hybrid_parameters import RetrievalMethod, RankingMethod, HybridParameters
from marqo.core.models.marqo_index import *
Expand All @@ -15,7 +16,9 @@
from marqo.tensor_search.models.search import SearchContext
from tests.marqo_test import MarqoTestCase, TestImageUrls
from fastapi.responses import JSONResponse, ORJSONResponse

from marqo.tensor_search.enums import SearchMethod
import httpx
import json

class TestHybridSearch(MarqoTestCase):
"""
Expand Down Expand Up @@ -1841,6 +1844,198 @@ def test_hybrid_search_unstructured_with_searchable_attributes_fails(self):
self.assertIn("does not support `searchableAttributesTensor` or `searchableAttributesLexical`",
str(e.exception))

def test_lexical_error_raises_correct_hybrid_error(self):
"""
Ensure that the proper error is raised when a lexical search fails in a hybrid search.
The double backslash error is a known 500 in lexical search (400 in vespa), so using
the same query in hybrid search should give the same error code and message.
"""

# TODO: remove when double backslash error is fixed
for index in [self.structured_text_index_score_modifiers, self.semi_structured_default_text_index]:
with self.subTest(index=index.type):

# Adding documents
self.add_documents(
config=self.config,
add_docs_params=AddDocsParams(
index_name=index.name,
docs=[
{"_id": "doc1", "text_field_1": "some text"}
],
tensor_fields=["text_field_1"] if \
isinstance(index, UnstructuredMarqoIndex) else None
)
)

with self.assertRaises(vespa_exceptions.VespaStatusError) as e:
tensor_search.search(
text='\\\\"hi\\\\"', config=self.config, index_name=index.name,
search_method=SearchMethod.HYBRID
)
self.assertIn("Could not create query from YQL", str(e.exception))

def test_hybrid_with_two_errors_returns_both(self):
"""
If vespa query to the hybrid searcher returns a result with 2 errors, both should be in the error message.
If all errors are timeout, raise VespaTimeoutError (504).
If even one error is not timeout, raise VespaStatusError (500).
"""

# Mock Vespa result with 2 errors
test_cases = [
# HTTP 504, first is Vespa 12
(
{
'root': {
'relevance': 1.0,
'fields': {'totalCount': 0},
'errors': [
{
'code': 12,
'summary': 'Timed out',
'source': 'content_default',
'message': "Error in execution of chain 'content_default': Chain timed out."
},
{
'code': 4,
'summary': 'Invalid query parameter',
'message': 'Could not create query from YQL.'
}
]
}
},
504, # HTTP 504
False, # Not a timeout, since 2nd error is not timeout
),
# HTTP 400, second is Vespa 12
(
{
'root': {
'relevance': 1.0,
'fields': {'totalCount': 0},
'errors': [
{
'code': 4,
'summary': 'Invalid query parameter',
'message': 'Could not create query from YQL.'
},
{
'code': 12,
'summary': 'Timed out',
'source': 'content_default',
'message': "Error in execution of chain 'content_default': Chain timed out."
}
]
}
},
400, # HTTP 400
False, # Not a timeout, since 1st error is not timeout
),
# HTTP 504, both Vespa errors 12
(
{
'root': {
'relevance': 1.0,
'fields': {'totalCount': 0},
'errors': [
{
'code': 12,
'summary': 'Timed out',
'source': 'content_default',
'message': "Error in execution of chain 'content_default': Chain timed out."
},
{
'code': 12,
'summary': 'Timed out',
'source': 'content_default',
'message': "Error in execution of chain 'content_default': Chain timed out."
}
]
}
},
504, # HTTP 504
True # Timeout, since both errors are timeout
),
# HTTP 400, both Vespa errors 12 but not timeout
(
{
'root': {
'relevance': 1.0,
'fields': {'totalCount': 0},
'errors': [
{
'code': 12,
'summary': 'Some other error',
'source': 'content_default',
'message': 'Some error message'
},
{
'code': 12,
'summary': 'Some other error',
'source': 'content_default',
'message': 'Some error message'
}
]
}
},
400, # HTTP 400
False # Not a timeout, since not 504 error code
),
# HTTP 504, first 12, second soft doom
(
{
'root': {
'relevance': 1.0,
'fields': {'totalCount': 0},
'errors': [
{
'code': 12,
'summary': 'Timed out',
'source': 'content_default',
'message': "Error in execution of chain 'content_default': Chain timed out."
},
{
'code': 8,
'summary': 'Soft doom',
'message': 'Search request soft doomed during query setup and initialization.'
}
]
}
},
504, # HTTP 504
True # Timeout, since both errors are timeout
)
]

for result_dict, vespa_httpx_code, should_be_timeout in test_cases:
with self.subTest(result_dict=result_dict, vespa_httpx_code=vespa_httpx_code,
should_be_timeout=should_be_timeout):
# If should_be_timeout, raise a VespaTimeoutError (504), else raise a VespaStatusError (500)
mock_vespa_result = httpx.Response(
status_code=vespa_httpx_code,
content=json.dumps(result_dict),
request=httpx.Request("POST", "http://localhost:8080/test-url/")
)

with unittest.mock.patch("httpx.Client.post") as mock_query:
mock_query.return_value = mock_vespa_result

for index in [self.structured_text_index_score_modifiers, self.semi_structured_default_text_index]:
with self.subTest(index=type(index)):
with self.assertRaises(vespa_exceptions.VespaStatusError) as e:
tensor_search.search(
text='dogs', config=self.config, index_name=index.name,
search_method=SearchMethod.HYBRID
)
self.assertEqual(should_be_timeout,
isinstance(e.exception, vespa_exceptions.VespaTimeoutError))

for error in result_dict["root"]["errors"]:
# All error messages should be in final exception
self.assertIn(error["message"], str(e.exception))
self.assertIn(error["summary"], str(e.exception))

def test_hybrid_search_unstructured_with_2_10_fails(self):
"""
Test that hybrid search with unstructured index with version 2.10.0 fails (version is below the minimum).
Expand Down
18 changes: 18 additions & 0 deletions vespa/src/main/java/ai/marqo/search/HybridSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public Result search(Query query, Execution execution) {
+ e.toString());
}

// Collect errors from both results and return if any.
HitGroup combinedErrors = collectErrorsFromResults(resultLexical, resultTensor);
if (combinedErrors.getError() != null) {
return new Result(query, combinedErrors);
}

logIfVerbose(
"LEXICAL RESULTS: "
+ resultLexical.toString()
Expand Down Expand Up @@ -284,6 +290,18 @@ HitGroup rrf(
return result;
}

HitGroup collectErrorsFromResults(Result resultLexical, Result resultTensor) {
// Return errors if either result list has an error. Make sure all errors are returned.
HitGroup combinedErrors = new HitGroup();
logIfVerbose(
String.format("Tensor Errors found: %s", resultTensor.hits().getError()), true);
logIfVerbose(
String.format("Lexical Errors found: %s", resultLexical.hits().getError()), true);
combinedErrors.addErrorsFrom(resultTensor.hits());
combinedErrors.addErrorsFrom(resultLexical.hits());
return combinedErrors;
}

/**
* Extracts mapped Tensor Address from cell then adds it as key to rank features, with cell value as the value.
* @param cell
Expand Down
60 changes: 60 additions & 0 deletions vespa/src/test/java/ai/marqo/search/HybridSearcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import com.yahoo.component.chain.Chain;
import com.yahoo.search.*;
import com.yahoo.search.query.ranking.RankFeatures;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.result.HitGroup;
import com.yahoo.search.searchchain.*;
import com.yahoo.tensor.Tensor;
import com.yahoo.tensor.TensorAddress;
import com.yahoo.tensor.TensorType;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -379,4 +381,62 @@ private static Query getHybridQuery(
.put("query(marqo__fields_to_rank_tensor)", fieldsToRankTensor);
return query;
}

@Nested
class CollectErrorsFromResultsTest {
@Test
void shouldRaiseErrorIfLexicalResultHasError() {
Result resultLexical =
new Result(
new Query(),
ErrorMessage.createInternalServerError("Example lexical error"));
Result resultTensor = new Result(new Query());
HitGroup combinedErrors =
hybridSearcher.collectErrorsFromResults(resultLexical, resultTensor);

assertThat(combinedErrors.getError().getDetailedMessage())
.contains("Example lexical error");
}

@Test
void shouldRaiseErrorIfTensorResultHasError() {
Result resultLexical = new Result(new Query());
Result resultTensor =
new Result(
new Query(),
ErrorMessage.createInternalServerError("Example tensor error"));
HitGroup combinedErrors =
hybridSearcher.collectErrorsFromResults(resultLexical, resultTensor);

assertThat(combinedErrors.getError().getDetailedMessage())
.contains("Example tensor error");
}

@Test
void shouldRaiseErrorIfBothResultsHaveError() {
Result resultLexical =
new Result(
new Query(),
ErrorMessage.createInternalServerError("Example lexical error"));
Result resultTensor =
new Result(
new Query(),
ErrorMessage.createInternalServerError("Example tensor error"));
HitGroup combinedErrors =
hybridSearcher.collectErrorsFromResults(resultLexical, resultTensor);

Iterator<ErrorMessage> iterator = combinedErrors.getErrorHit().errors().iterator();
assertThat(iterator.next().getDetailedMessage()).contains("Example tensor error");
assertThat(iterator.next().getDetailedMessage()).contains("Example lexical error");
}

@Test
void shouldNotRaiseErrorIfNeitherResultHasError() {
Result resultLexical = new Result(new Query());
Result resultTensor = new Result(new Query());
HitGroup combinedErrors =
hybridSearcher.collectErrorsFromResults(resultLexical, resultTensor);
assertThat(combinedErrors.getError()).isNull();
}
}
}

0 comments on commit a8ed8f8

Please sign in to comment.