Skip to content

Commit

Permalink
consistents cursors and walks
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Nov 11, 2024
1 parent aeff22a commit 5378787
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 326 deletions.
5 changes: 1 addition & 4 deletions share/search/index_strategy/_trovesearch_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@
LABEL_PROPERTIES = (RDFS.label, SKOS.prefLabel, SKOS.altLabel)
NAMELIKE_PROPERTIES = (*TITLE_PROPERTIES, *NAME_PROPERTIES, *LABEL_PROPERTIES)

VALUESEARCH_MAX = 234
CARDSEARCH_MAX = 9997

KEYWORD_LENGTH_MAX = 8191 # skip keyword terms that might exceed lucene's internal limit
# (see https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html)
KEYWORD_MAPPING = {'type': 'keyword', 'ignore_above': KEYWORD_LENGTH_MAX}
Expand Down Expand Up @@ -168,7 +165,7 @@ def __post_init__(self):
if XSD.integer in _walk_obj.datatype_iris:
self.integer_values[_walk_path].add(_walk_obj)
if {RDF.string, RDF.langString}.intersection(_walk_obj.datatype_iris):
self.text_values[_walk_path].add(_walk_obj.unicode_value)
self.text_values[_walk_path].add(_walk_obj)
# try for date in a date property, regardless of the above
if is_date_property(_walk_path[-1]) and isinstance(_walk_obj, (str, rdf.Literal)):
_date_str = (
Expand Down
153 changes: 49 additions & 104 deletions share/search/index_strategy/trove_indexcard_flats.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import base64
from collections import defaultdict
import contextlib
import dataclasses
import datetime
import json
import logging
import re
import uuid
from typing import Iterable, Optional, Iterator
from typing import Iterable, Iterator, Any

from django.conf import settings
import elasticsearch8
Expand All @@ -18,9 +17,11 @@
from share.search.index_strategy.elastic8 import Elastic8IndexStrategy
from share.util.checksum_iri import ChecksumIri
from trove import models as trove_db
from trove.trovesearch.cursor import (
from trove.trovesearch.page_cursor import (
MANY_MORE,
OffsetCursor,
CardsearchCursor,
PageCursor,
ReproduciblyRandomSampleCursor,
)
from trove.trovesearch.search_params import (
CardsearchParams,
Expand All @@ -40,15 +41,15 @@
)
from trove.util.iris import get_sufficiently_unique_iri, is_worthwhile_iri, iri_path_as_keyword
from trove.vocab.osfmap import is_date_property
from trove.vocab.namespaces import TROVE, RDF, OWL
from trove.vocab.namespaces import RDF, OWL
from ._trovesearch_util import (
latest_rdf_for_indexcard_pks,
GraphWalk,
TITLE_PROPERTIES,
NAME_PROPERTIES,
LABEL_PROPERTIES,
NAMELIKE_PROPERTIES,
KEYWORD_LENGTH_MAX,
SKIPPABLE_PROPERTIES,
)


Expand Down Expand Up @@ -165,22 +166,16 @@ def _build_sourcedoc(self, indexcard_rdf):
_nested_iris = defaultdict(set)
_nested_dates = defaultdict(set)
_nested_texts = defaultdict(set)
_pathset = set()
for _walk_path, _walk_obj in _PredicatePathWalker(_rdfdoc.tripledict).walk_from_subject(indexcard_rdf.focus_iri):
_pathset.add(_walk_path)
if isinstance(_walk_obj, str):
_nested_iris[_NestedIriKey.for_iri_at_path(_walk_path, _walk_obj, _rdfdoc)].add(_walk_obj)
elif isinstance(_walk_obj, datetime.date):
_nested_dates[_walk_path].add(datetime.date.isoformat(_walk_obj))
elif is_date_property(_walk_path[-1]):
try:
datetime.date.fromisoformat(_walk_obj.unicode_value)
except ValueError:
logger.debug('skipping malformatted date "%s" in %s', _walk_obj.unicode_value, indexcard_rdf)
else:
_nested_dates[_walk_path].add(_walk_obj.unicode_value)
elif isinstance(_walk_obj, primitive_rdf.Literal):
_nested_texts[(_walk_path, tuple(_walk_obj.datatype_iris))].add(_walk_obj.unicode_value)
_walk = GraphWalk(_rdfdoc, indexcard_rdf.focus_iri)
for _walk_path, _walk_iris in _walk.iri_values.items():
for _iri_obj in _walk_iris:
_nested_iris[_NestedIriKey.for_iri_at_path(_walk_path, _iri_obj, _rdfdoc)].add(_iri_obj)
for _walk_path, _walk_dates in _walk.date_values.items():
for _date_obj in _walk_dates:
_nested_dates[_walk_path].add(datetime.date.isoformat(_date_obj))
for _walk_path, _walk_texts in _walk.text_values.items():
for _text_obj in _walk_texts:
_nested_texts[(_walk_path, tuple(_text_obj.datatype_iris))].add(_text_obj.unicode_value)
_focus_iris = {indexcard_rdf.focus_iri}
_suffuniq_focus_iris = {get_sufficiently_unique_iri(indexcard_rdf.focus_iri)}
for _identifier in indexcard_rdf.indexcard.focus_identifier_set.all():
Expand All @@ -196,11 +191,11 @@ def _build_sourcedoc(self, indexcard_rdf):
'flat_iri_values_suffuniq': self._flattened_iris_suffuniq(_nested_iris),
'iri_paths_present': [
iri_path_as_keyword(_path)
for _path in _pathset
for _path in _walk.paths_walked
],
'iri_paths_present_suffuniq': [
iri_path_as_keyword(_path, suffuniq=True)
for _path in _pathset
for _path in _walk.paths_walked
],
'nested_iri': list(filter(bool, (
self._iri_nested_sourcedoc(_nested_iri_key, _iris, _rdfdoc)
Expand Down Expand Up @@ -294,7 +289,7 @@ def pls_handle_search__sharev2_backcompat(self, request_body=None, request_query
)

def pls_handle_cardsearch(self, cardsearch_params: CardsearchParams) -> CardsearchResponse:
_cursor = CardsearchCursor.from_params(cardsearch_params)
_cursor = self._cardsearch_cursor(cardsearch_params)
_sort = self._cardsearch_sort(cardsearch_params.sort_list)
_query = self._cardsearch_query(
cardsearch_params.cardsearch_filter_set,
Expand All @@ -303,7 +298,7 @@ def pls_handle_cardsearch(self, cardsearch_params: CardsearchParams) -> Cardsear
)
_from_offset = (
_cursor.start_offset
if _cursor.is_first_page()
if _cursor.is_first_page() or not isinstance(_cursor, ReproduciblyRandomSampleCursor)
else _cursor.start_offset - len(_cursor.first_page_ids)
)
_search_kwargs = dict(
Expand All @@ -326,7 +321,7 @@ def pls_handle_cardsearch(self, cardsearch_params: CardsearchParams) -> Cardsear
return self._cardsearch_response(cardsearch_params, _es8_response, _cursor)

def pls_handle_valuesearch(self, valuesearch_params: ValuesearchParams) -> ValuesearchResponse:
_cursor = OffsetCursor.from_page_param(valuesearch_params.page)
_cursor = OffsetCursor.from_cursor(valuesearch_params.page_cursor)
_is_date_search = is_date_property(valuesearch_params.valuesearch_propertypath[-1])
_search_kwargs = dict(
query=self._cardsearch_query(
Expand Down Expand Up @@ -357,11 +352,21 @@ def pls_handle_valuesearch(self, valuesearch_params: ValuesearchParams) -> Value
###
# query implementation

def _cardsearch_cursor(self, cardsearch_params: CardsearchParams) -> OffsetCursor:
_request_cursor = cardsearch_params.page_cursor
if (
_request_cursor.is_basic()
and not cardsearch_params.sort_list
and not cardsearch_params.cardsearch_textsegment_set
):
return ReproduciblyRandomSampleCursor.from_cursor(_request_cursor)
return OffsetCursor.from_cursor(_request_cursor)

def _cardsearch_query(
self,
filter_set, textsegment_set, *,
additional_filters=None,
cardsearch_cursor: Optional[CardsearchCursor] = None,
cardsearch_cursor: PageCursor | None = None,
) -> dict:
_bool_query = {
'filter': additional_filters or [],
Expand All @@ -383,12 +388,12 @@ def _cardsearch_query(
else:
raise ValueError(f'unknown filter operator {_searchfilter.operator}')
_textq_builder = self._NestedTextQueryBuilder(
relevance_matters=bool(cardsearch_cursor and not cardsearch_cursor.random_sort),
relevance_matters=not isinstance(cardsearch_cursor, ReproduciblyRandomSampleCursor),
)
for _textsegment in textsegment_set:
for _boolkey, _textqueries in _textq_builder.textsegment_boolparts(_textsegment).items():
_bool_query[_boolkey].extend(_textqueries)
if not cardsearch_cursor or not cardsearch_cursor.random_sort:
if not isinstance(cardsearch_cursor, ReproduciblyRandomSampleCursor):
# no need for randomness
return {'bool': _bool_query}
if not cardsearch_cursor.first_page_ids:
Expand Down Expand Up @@ -432,7 +437,7 @@ def _cardsearch_aggs(self, cardsearch_params):
return _aggs

def _valuesearch_iri_aggs(self, valuesearch_params: ValuesearchParams, cursor: OffsetCursor):
_nested_iri_bool = {
_nested_iri_bool: dict[str, Any] = {
'filter': [{'term': {'nested_iri.suffuniq_path_from_focus': iri_path_as_keyword(
valuesearch_params.valuesearch_propertypath,
suffuniq=True,
Expand Down Expand Up @@ -534,24 +539,25 @@ def _valuesearch_response(
# WARNING: terribly inefficient pagination (part two)
_page_end_index = cursor.start_offset + cursor.page_size
_bucket_page = _buckets[cursor.start_offset:_page_end_index] # discard prior pages
cursor.result_count = (
-1 # "many more"
cursor.total_count = (
MANY_MORE
if (_bucket_count > _page_end_index) # agg includes one more, if there
else _bucket_count
)
return ValuesearchResponse(
cursor=cursor,
search_result_page=[
self._valuesearch_iri_result(_iri_bucket)
for _iri_bucket in _bucket_page
],
cursor=cursor,
)
else: # assume date
_year_buckets = (
es8_response['aggregations']['in_nested_date']
['value_at_propertypath']['count_by_year']['buckets']
)
return ValuesearchResponse(
cursor=PageCursor(len(_year_buckets)),
search_result_page=[
self._valuesearch_date_result(_year_bucket)
for _year_bucket in _year_buckets
Expand Down Expand Up @@ -681,46 +687,24 @@ def _cardsearch_response(
self,
cardsearch_params: CardsearchParams,
es8_response: dict,
cursor: CardsearchCursor,
cursor: OffsetCursor,
) -> CardsearchResponse:
_es8_total = es8_response['hits']['total']
if _es8_total['relation'] != 'eq':
cursor.result_count = -1 # "too many"
cursor.total_count = MANY_MORE
elif isinstance(cursor, ReproduciblyRandomSampleCursor) and not cursor.is_first_page():
# account for the filtered-out first page
cursor.total_count = _es8_total['value'] + len(cursor.first_page_ids)
else: # exact (and small) count
cursor.result_count = _es8_total['value']
if cursor.random_sort and not cursor.is_first_page():
# account for the filtered-out first page
cursor.result_count += len(cursor.first_page_ids)
cursor.total_count = _es8_total['value']
_results = []
for _es8_hit in es8_response['hits']['hits']:
_card_iri = _es8_hit['_id']
_results.append(CardsearchResult(
card_iri=_card_iri,
text_match_evidence=list(self._gather_textmatch_evidence(_es8_hit)),
))
if cursor.is_first_page() and cursor.first_page_ids:
# revisiting first page; reproduce original random order
_uuid_index = {
_uuid: _i
for (_i, _uuid) in enumerate(cursor.first_page_ids)
}
_results.sort(key=lambda _r: _uuid_index[_r.card_uuid])
else:
_should_start_reproducible_randomness = (
cursor.random_sort
and cursor.is_first_page()
and not cursor.first_page_ids
and any(
not _filter.is_type_filter() # look for a non-default filter
for _filter in cardsearch_params.cardsearch_filter_set
)
)
if _should_start_reproducible_randomness:
cursor.first_page_ids = tuple(
_result.card_uuid
for _result in _results
)
_relatedproperty_list = []
_relatedproperty_list: list[PropertypathUsage] = []
if cardsearch_params.related_property_paths:
_relatedproperty_list.extend(
PropertypathUsage(property_path=_path, usage_count=0)
Expand All @@ -734,14 +718,10 @@ def _cardsearch_response(
_path = tuple(json.loads(_bucket['key']))
_relatedproperty_by_path[_path].usage_count += _bucket['doc_count']
return CardsearchResponse(
total_result_count=(
TROVE['ten-thousands-and-more']
if cursor.has_many_more()
else cursor.result_count
),
cursor=cursor,
search_result_page=_results,
related_propertypath_results=_relatedproperty_list,
cursor=cursor,
cardsearch_params=cardsearch_params,
)

def _gather_textmatch_evidence(self, es8_hit) -> Iterable[TextMatchEvidence]:
Expand Down Expand Up @@ -913,41 +893,6 @@ def _pathset_as_nestedvalue_filter(propertypath_set: frozenset[tuple[str, ...]],
return {'terms': {f'{nested_path}.suffuniq_path_from_focus': _suffuniq_iri_paths}}


class _PredicatePathWalker:
WalkYield = tuple[tuple[str, ...], primitive_rdf.RdfObject]
_visiting: set[str | frozenset]

def __init__(self, tripledict: primitive_rdf.RdfTripleDictionary):
self.tripledict = tripledict
self._visiting = set()

def walk_from_subject(self, iri_or_blanknode, last_path: tuple[str, ...] = ()) -> Iterable[WalkYield]:
'''walk the graph from the given subject, yielding (pathkey, obj) for every reachable object
'''
with self._visit(iri_or_blanknode):
_twopledict = (
primitive_rdf.twopledict_from_twopleset(iri_or_blanknode)
if isinstance(iri_or_blanknode, frozenset)
else self.tripledict.get(iri_or_blanknode, {})
)
for _predicate_iri, _obj_set in _twopledict.items():
if _predicate_iri not in SKIPPABLE_PROPERTIES:
_path = (*last_path, _predicate_iri)
for _obj in _obj_set:
if not isinstance(_obj, frozenset): # omit the blanknode as a value
yield (_path, _obj)
if isinstance(_obj, (str, frozenset)) and (_obj not in self._visiting):
# step further for iri or blanknode
yield from self.walk_from_subject(_obj, last_path=_path)

@contextlib.contextmanager
def _visit(self, focus_obj):
assert focus_obj not in self._visiting
self._visiting.add(focus_obj)
yield
self._visiting.discard(focus_obj)


@dataclasses.dataclass(frozen=True)
class _NestedIriKey:
'''if this is the same for multiple iri values, they can be combined in one `nested_iri` doc
Expand Down
Loading

0 comments on commit 5378787

Please sign in to comment.