Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6708][ENG-7286] improved trovesearch_denorm... #841

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion share/search/index_strategy/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def parse_full_index_name(self, index_name: str) -> SpecificIndex:
_strategy = self.with_strategy_check(_strategy_check)
return _strategy.get_index(_etc[0] if _etc else '')

def with_strategy_check(self, strategy_check: str) -> IndexStrategy:
def with_strategy_check(self, strategy_check: str) -> typing.Self:
return dataclasses.replace(self, strategy_check=strategy_check)

def pls_setup(self, *, skip_backfill=False) -> None:
Expand Down
32 changes: 16 additions & 16 deletions share/search/index_strategy/_trovesearch_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from trove.util.iris import get_sufficiently_unique_iri, is_worthwhile_iri
from trove.vocab.namespaces import (
DCTERMS,
OWL,
RDF,
TROVE,
Expand All @@ -44,13 +45,7 @@
'type': 'text',
'index_options': 'offsets', # for highlighting
}
IRI_KEYWORD_MAPPING = {
'type': 'object',
'properties': { # for indexing iri values two ways:
'exact': KEYWORD_MAPPING, # the exact iri value (e.g. "https://foo.example/bar/")
'suffuniq': KEYWORD_MAPPING, # "sufficiently unique" (e.g. "://foo.example/bar")
},
}
TEXT_PATH_DEPTH_MAX = 1


###
Expand Down Expand Up @@ -93,6 +88,15 @@ def iris_synonyms(iris: typing.Iterable[str], rdfdoc: rdf.RdfGraph) -> set[str]:
}


def should_skip_path(path: Propertypath) -> bool:
_last = path[-1]
if _last in SKIPPABLE_PROPERTIES:
return True
if len(path) > 1 and _last == DCTERMS.identifier:
return True
return False


def propertypath_as_keyword(path: Propertypath) -> str:
assert not is_globpath(path)
return json.dumps(path)
Expand Down Expand Up @@ -185,9 +189,10 @@ def _walk_from_subject(
_twoples = self.rdfdoc.tripledict.get(iri, {})
for _next_steps, _obj in walk_twoples(_twoples):
_path = (*path_so_far, *_next_steps)
yield (_path, _obj)
if isinstance(_obj, str): # step further for iri
yield from self._walk_from_subject(_obj, path_so_far=_path)
if not should_skip_path(_path):
yield (_path, _obj)
if isinstance(_obj, str): # step further for iri
yield from self._walk_from_subject(_obj, path_so_far=_path)

@functools.cached_property
def paths_by_iri(self) -> defaultdict[str, set[Propertypath]]:
Expand All @@ -209,16 +214,11 @@ def walk_twoples(
twoples: rdf.RdfTwopleDictionary | rdf.Blanknode,
) -> typing.Iterator[tuple[Propertypath, rdf.RdfObject]]:
if isinstance(twoples, frozenset):
_iter_twoples = (
(_pred, _obj)
for _pred, _obj in twoples
if _pred not in SKIPPABLE_PROPERTIES
)
_iter_twoples = iter(twoples)
else:
_iter_twoples = (
(_pred, _obj)
for _pred, _obj_set in twoples.items()
if _pred not in SKIPPABLE_PROPERTIES
for _obj in _obj_set
)
for _pred, _obj in _iter_twoples:
Expand Down
25 changes: 15 additions & 10 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TrovesearchDenormIndexStrategy(Elastic8IndexStrategy):
CURRENT_STRATEGY_CHECKSUM = ChecksumIri(
checksumalgorithm_name='sha-256',
salt='TrovesearchDenormIndexStrategy',
hexdigest='4c8784ddd08914ec779b33b8f1945b0b2ff026eea355392ab3c4fe2fe10d71fe',
hexdigest='ef44d5bc272589754b3b0753e5ee61719349fd96284b62ecafab1d0cb043bde9',
)

# abstract method from Elastic8IndexStrategy
Expand Down Expand Up @@ -111,7 +111,7 @@ def backfill_message_type(self):
def _index_settings(cls):
return {
'number_of_shards': 5,
'number_of_replicas': 2,
'number_of_replicas': 1,
}

@classmethod
Expand Down Expand Up @@ -208,10 +208,14 @@ def _paths_and_values_mappings(cls):

# override method from Elastic8IndexStrategy
def after_chunk(self, messages_chunk: messages.MessagesChunk, affected_indexnames: Iterable[str]):
_strategy_checks = {
self.parse_full_index_name(_indexname).index_strategy.strategy_check
for _indexname in affected_indexnames
}
task__delete_iri_value_scraps.apply_async(
kwargs={
'index_strategy_name': self.strategy_name,
'indexnames': list(affected_indexnames),
'index_strategy_checks': list(_strategy_checks),
'card_pks': messages_chunk.target_ids_chunk,
'timestamp': messages_chunk.timestamp,
},
Expand Down Expand Up @@ -451,7 +455,9 @@ def _texts_at_properties(self, walk: ts.GraphWalk, properties: Iterable[str]):
def _texts_by_depth(self, walk: ts.GraphWalk):
_by_depth: dict[int, set[str]] = defaultdict(set)
for _path, _text_set in walk.text_values.items():
_by_depth[len(_path)].update(_text.unicode_value for _text in _text_set)
_depth = len(_path)
if _depth <= ts.TEXT_PATH_DEPTH_MAX:
_by_depth[_depth].update(_text.unicode_value for _text in _text_set)
return {
_depth_field_name(_depth): list(_value_set)
for _depth, _value_set in _by_depth.items()
Expand Down Expand Up @@ -978,8 +984,8 @@ def _any_query(queries: abc.Collection[dict]):
def task__delete_iri_value_scraps(
task: celery.Task,
index_strategy_name: str,
index_strategy_checks: list[str],
card_pks: list[int],
indexnames: list[str],
timestamp: int,
):
'''followup task to delete value-docs no longer present
Expand All @@ -994,11 +1000,10 @@ def task__delete_iri_value_scraps(
from share.search.index_strategy import get_strategy
_index_strategy = get_strategy(index_strategy_name)
assert isinstance(_index_strategy, TrovesearchDenormIndexStrategy)
_irivalue_indexnames = {
_index.full_index_name
for _index in _index_strategy.each_live_index(any_strategy_check=True)
if _index.subname == 'iri_values'
}
_irivalue_indexnames = [
_index_strategy.with_strategy_check(_check).irivaluesearch_index().full_index_name
for _check in index_strategy_checks
]
# delete any docs that belong to cards in this chunk but weren't touched by indexing
_delete_resp = _index_strategy.es8_client.delete_by_query(
index=list(_irivalue_indexnames),
Expand Down
6 changes: 6 additions & 0 deletions tests/trove/_input_output_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def missing_case(self, name: str, given_input):
pprint.pformat(_actual_output),
)))

def enterContext(self, context_manager):
# TestCase.enterContext added in python3.11 -- implementing here until then
result = context_manager.__enter__()
self.addCleanup(lambda: context_manager.__exit__(None, None, None))
return result

###
# private details

Expand Down
39 changes: 23 additions & 16 deletions tests/trove/render/test_jsonapi_renderer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from unittest import mock

from trove.render.jsonapi import RdfJsonapiRenderer
from trove.render._rendering import SimpleRendering
Expand All @@ -13,6 +14,12 @@ def _jsonapi_item_sortkey(jsonapi_item: dict):
class _BaseJsonapiRendererTest(_base.TroveJsonRendererTests):
renderer_class = RdfJsonapiRenderer

def setUp(self):
super().setUp()
self.enterContext(
mock.patch('trove.render.jsonapi.time.time_ns', return_value=112358)
)

def _get_rendered_output(self, rendering):
_json = super()._get_rendered_output(rendering)
_included = _json.get('included')
Expand All @@ -28,7 +35,7 @@ class TestJsonapiRenderer(_BaseJsonapiRendererTest):
mediatype='application/vnd.api+json',
rendered_content=json.dumps({
"data": {
"id": "68808d2c76cd5f7ff4e0f470592da8f02be1f615b05a143cc3821c5288e13f11",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmQ=",
"type": "index-card",
"attributes": {
"resourceIdentifier": [
Expand Down Expand Up @@ -60,7 +67,7 @@ class TestJsonapiRenderer(_BaseJsonapiRendererTest):
mediatype='application/vnd.api+json',
rendered_content=json.dumps({
"data": {
"id": "11f60e4d2fceb50ca695c3c77dcd7983ff78116ff2e7a2f315800c8ca645f469",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYVN1YmplY3Q=",
"type": BLARG.aType,
"meta": {
BLARG.hasIri: [BLARG.anIri],
Expand All @@ -83,7 +90,7 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
mediatype='application/vnd.api+json',
rendered_content=json.dumps({
"data": {
"id": "4b79207d8ecd4817c36b75b16cee6c4a1874774cfbcfbd0caede339148403325",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYVNlYXJjaA==",
"type": "index-card-search",
"attributes": {
"totalResultCount": 0,
Expand All @@ -98,7 +105,7 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
mediatype='application/vnd.api+json',
rendered_content=json.dumps({
"data": {
"id": "79183793c0eea20ca6338d71c936deee113b94641ee77346fb66f9c4bcebfe0a",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYVNlYXJjaEZldw==",
"type": "index-card-search",
"attributes": {
"totalResultCount": 3
Expand All @@ -107,15 +114,15 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
"searchResultPage": {
"data": [
{
"id": "dc0604c7e9c07576b57646119784de65e7204fc7c860cc1b9be8ebec5f2b96ba",
"id": "112358-0",
"type": "search-result"
},
{
"id": "367b30e8a0eece555ac15fda82bb28f535f1f8beb97397c01162d619cd7058bc",
"id": "112358-1",
"type": "search-result"
},
{
"id": "26afa96fdbd189e4c4aeac921a42e9d3f09eb94b59ffd4b9ad300c524536cc97",
"id": "112358-2",
"type": "search-result"
}
]
Expand All @@ -127,43 +134,43 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
},
"included": [
{
"id": "dc0604c7e9c07576b57646119784de65e7204fc7c860cc1b9be8ebec5f2b96ba",
"id": "112358-0",
"type": "search-result",
"relationships": {
"indexCard": {
"data": {
"id": "68808d2c76cd5f7ff4e0f470592da8f02be1f615b05a143cc3821c5288e13f11",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmQ=",
"type": "index-card"
}
}
}
},
{
"id": "26afa96fdbd189e4c4aeac921a42e9d3f09eb94b59ffd4b9ad300c524536cc97",
"id": "112358-1",
"type": "search-result",
"relationships": {
"indexCard": {
"data": {
"id": "db657130943f3c9f4cc527b23a6a246b095f62673f2cc7fc906d5914678bd337",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmRk",
"type": "index-card"
}
}
}
},
{
"id": "367b30e8a0eece555ac15fda82bb28f535f1f8beb97397c01162d619cd7058bc",
"id": "112358-2",
"type": "search-result",
"relationships": {
"indexCard": {
"data": {
"id": "4e6134629cc3117a123cee8a8dc633a46401c9725f01d63f689d7b84f2422359",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmRkZA==",
"type": "index-card"
}
}
}
},
{
"id": "68808d2c76cd5f7ff4e0f470592da8f02be1f615b05a143cc3821c5288e13f11",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmQ=",
"type": "index-card",
"meta": {
"foaf:primaryTopic": [
Expand All @@ -190,7 +197,7 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
}
},
{
"id": "db657130943f3c9f4cc527b23a6a246b095f62673f2cc7fc906d5914678bd337",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmRkZA==",
"type": "index-card",
"meta": {
"foaf:primaryTopic": [
Expand All @@ -217,7 +224,7 @@ class TestJsonapiSearchRenderer(_BaseJsonapiRendererTest, _base.TrovesearchJsonR
}
},
{
"id": "4e6134629cc3117a123cee8a8dc633a46401c9725f01d63f689d7b84f2422359",
"id": "aHR0cDovL2JsYXJnLmV4YW1wbGUvdm9jYWIvYUNhcmRk",
"type": "index-card",
"meta": {
"foaf:primaryTopic": [
Expand Down
43 changes: 25 additions & 18 deletions trove/render/jsonapi.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import base64
from collections import defaultdict
import contextlib
import dataclasses
import datetime
import hashlib
import itertools
import json
from typing import Iterable, Union, Any
import time
from typing import Iterable, Union

from primitive_metadata import primitive_rdf

Expand Down Expand Up @@ -33,6 +36,19 @@
_IriOrBlanknode = Union[str, frozenset]


def _resource_ids_defaultdict():
_prefix = str(time.time_ns())
_ints = itertools.count()

def _iter_ids():
while True:
_id = next(_ints)
yield f'{_prefix}-{_id}'

_ids = _iter_ids()
return defaultdict(lambda: next(_ids))


@dataclasses.dataclass
class RdfJsonapiRenderer(BaseRenderer):
'''render rdf data into jsonapi resources, guided by a given rdf vocabulary
Expand All @@ -58,6 +74,10 @@ class RdfJsonapiRenderer(BaseRenderer):
_identifier_object_cache: dict = dataclasses.field(default_factory=dict)
_id_namespace_set: Iterable[primitive_rdf.IriNamespace] = (trove_indexcard_namespace(),)
__to_include: set[primitive_rdf.RdfObject] | None = None
__assigned_blanknode_resource_ids: defaultdict[frozenset, str] = dataclasses.field(
default_factory=_resource_ids_defaultdict,
repr=False,
)

def simple_render_document(self) -> str:
return json.dumps(
Expand Down Expand Up @@ -151,27 +171,14 @@ def _membername_for_iri(self, iri: str):
return self.iri_shorthand.compact_iri(iri)

def _resource_id_for_blanknode(self, blanknode: frozenset, /):
# content-addressed blanknode id
_serializable_twoples = []
for _pred, _obj in blanknode:
_serializable_obj: Any
if isinstance(_obj, primitive_rdf.Literal):
_serializable_obj = [_obj.unicode_value, *sorted(_obj.datatype_iris)]
elif isinstance(_obj, (str, int, float)):
_serializable_obj = _obj
elif isinstance(_obj, frozenset):
_serializable_obj = self._resource_id_for_blanknode(_obj)
else:
raise ValueError(_obj)
_serializable_twoples.append((_pred, _serializable_obj))
return hashlib.sha256(json.dumps(sorted(_serializable_twoples)).encode()).hexdigest()
return self.__assigned_blanknode_resource_ids[blanknode]

def _resource_id_for_iri(self, iri: str):
for _iri_namespace in self._id_namespace_set:
if iri in _iri_namespace:
return primitive_rdf.iri_minus_namespace(iri, namespace=_iri_namespace)
# as fallback, hash the iri for a valid jsonapi member name
return hashlib.sha256(iri.encode()).hexdigest()
# as fallback, encode the iri into a valid jsonapi member name
return base64.urlsafe_b64encode(iri.encode()).decode()

def _render_field(self, predicate_iri, object_set, *, into: dict):
_is_relationship = (predicate_iri, RDF.type, JSONAPI_RELATIONSHIP) in self.thesaurus
Expand Down
7 changes: 7 additions & 0 deletions trove/trovesearch/search_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ def iter_from_searchtext_param(cls, param_name: QueryparamName, param_value: str
if param_name.bracketed_names
else None
)
if _propertypath_set:
if any((is_globpath(_path) and len(_path) > 1) for _path in _propertypath_set):
raise trove_exceptions.InvalidQueryParamName(
str(param_name),
'may not use glob-paths longer than "*" with search-text parameters',
)

for _textsegment in cls.iter_from_text(param_value):
if _propertypath_set:
yield dataclasses.replace(_textsegment, propertypath_set=_propertypath_set)
Expand Down
Loading